Home » Server Options » Streams & AQ » AQ messages are in ready state - Not dequeued (Oracle Database 11g Enterprise Edition Release 11.1.0.7.0 - 64bit Production, Client -windows (sql dev), server - UNIX)
AQ messages are in ready state - Not dequeued [message #562027] Fri, 27 July 2012 02:49 Go to next message
ind9
Messages: 65
Registered: January 2009
Member
Dear All,
I am new to AQ and I am practising the feautures that have been offered by AQ. I am implementing AQ within oracle database.
I have 2 users CUST as main user - Owner for queues
CUST_UPD - To enqueue messages in the queue owned by CUST user.

I have created AQ methods and it perfectly works when I do 200 updates but if I go for mass updates my queue got struck and callback procedure is not dequueing messges anymore.

All the messages are in READY state which have been enqueued at 25-JUL-12 05.37.08.338410000 PM.
Moreover I am unable to compile callback procedures also as it is abending with an error maximum wait time exceeded which means callback procedure is running but for some reason dequeue not happening.

Please help where I went wrong.
Thank you.

GRANT AQ_ADMINISTRATOR_ROLE TO Cust
/
grant EXECUTE on DBMS_AQ to Cust
/
GRANT EXECUTE ON DBMS_AQADM to Cust
/
GRANT EXECUTE ON dbms_aq TO PUBLIC
/
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','Cust',TRUE);
EXECUTE dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','Cust',TRUE);

EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','cust_upd',FALSE);
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','PUBLIC',FALSE);


REM =====================================================
[b]REM Create type CREATE_queue_table, CREATE_queue, start_queue[/b]
REM =====================================================

CREATE type msg_q_type
AS Object
  (
STORE_KEY                       NUMBER(5) ,                                                                                                                                                                                    
CUST_NO                         NUMBER(12)    ,                                                                                                                                                                               
TABLE_NM                     VARCHAR2(10 ))
;
/


[b]--Create Q table[/b]
BEGIN
  dbms_aqadm.CREATE_queue_table
          ( queue_table => 'msg_q_tbl', 
            --sort_list => 'priority',
            queue_payload_type => 'msg_q_type',
            multiple_consumers => TRUE,              
            COMMENT => 'Creating input queue table');
END;
/
[b]--My Queue[/b]
BEGIN
  dbms_aqadm.CREATE_queue( queue_name => 'msg_queue', queue_table => 'msg_q_tbl', COMMENT => 'Test Queue');
END;
/


BEGIN
  [b]dbms_aqadm.start_queue( queue_name => 'msg_queue');[/b]
END;
/


REM ==============================================
REM SETUP complete 
REM ==============================================

[b]PROMPT Create procedure ENQUEUE_MSG[/b]
CREATE OR REPLACE
PROCEDURE ENQUEUE_MSG(
    pi_payload_info msg_q_type,
    pi_Owner Varchar2 Default 'CUST',
    pi_Q_Name Varchar2 Default 'msg_queue')
AS
PRAGMA AUTONOMOUS_TRANSACTION;
  eopt dbms_aq.enqueue_options_t;
  mprop dbms_aq.message_properties_t;
  enq_msgid RAW(16); 
  
  --priority NUMBER;
BEGIN
  --SELECT aq_sequence.nextval INTO priority FROM dual;
  --mprop.priority := priority;
  [i]dbms_aq.enqueue( queue_name         => pi_Owner||'.'||pi_Q_Name
                  ,enqueue_options    => eopt 
                  ,message_properties => mprop
                  ,payload            => pi_payload_info
                  ,msgid => enq_msgid);[/i]  
--Debug_MSG ('enqueue enq_msgid='||enq_msgid );     
  
  INSERT INTO CUST_MOVER_LOG ( LOG_ID , LOG_DATE, LOG_TYPE, LOG_LABEL, LOG_COMMENT )
   VALUES ( lex_seq_1.NEXTVAL
           , Systimestamp
           , 'E'
           , enq_msgid
           ,'StoreKey:' || pi_payload_info.STORE_KEY ||' CustNO:'
                           || pi_payload_info.CUST_NO||' CH:'
                           || ' TABLE_NM='||pi_payload_info.TABLE_NM
                           
            );
                        
  COMMIT;
  Exception
  	 When Others Then
  	 	 Rollback;
  	 	 Raise;
END ENQUEUE_MSG;
/



 
[b]--Call back procedure[/b]
CREATE Or Replace PROCEDURE msg_q_callback_proc(
                 context  RAW,
                 reginfo  SYS.AQ$_REG_INFO,
                 descr    SYS.AQ$_DESCRIPTOR,
                 payload  RAW,
                 payloadl NUMBER
                 ) AS

   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            msg_q_type;

BEGIN
  --Debug_MSG ('msg_q_callback_proc running');
  
   r_dequeue_options.msgid := descr.msg_id;
   r_dequeue_options.consumer_name := descr.consumer_name;
  

   [i]DBMS_AQ.DEQUEUE(
      queue_name         => descr.queue_name,
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );[/i] 
 Update CUST_MOVER_LOG
  Set LOG_TYPE = 'D'
  Where LOG_LABEL = v_message_handle;
 
 If sql%rowcount = 0 Then 
  INSERT INTO CUST_MOVER_LOG ( LOG_ID , LOG_DATE, LOG_TYPE, LOG_LABEL, LOG_COMMENT )
   VALUES ( lex_seq_1.NEXTVAL
           , Systimestamp
           , 'B'
           , v_message_handle
           ,'StoreKey:' || o_payload.STORE_KEY ||' CustNO:'
                           || o_payload.CUST_NO||' CH:'                          
                           || ' for table='||o_payload.TABLE_NM
            );
  End if;             
     
    /*  Debug_MSG ('msg_q_callback_proc=dequeued at [' || TO_CHAR( SYSTIMESTAMP,
                                        'DD-MON-YYYY HH24:MI:SS.FF3' ) || '] STORE_KEY-'||o_payload.STORE_KEY 
                                        ||' ORIG_SYSTEM_REFERENCE='||o_payload.ORIG_SYSTEM_REFERENCE);*/
                                        
   COMMIT;
   --Debug_MSG ('Call to check start');
 
Exception
	 When Others Then
	
	 	 Raise;	
END msg_q_callback_proc;
/

[b]--Add an agent to process requests[/b]
BEGIN

   DBMS_AQADM.ADD_SUBSCRIBER (
      queue_name => 'msg_queue',
      subscriber => SYS.AQ$_AGENT(
                       'msg_queue_subscriber',
                       NULL,
                       NULL )
      );

   [b] DBMS_AQ.REGISTER (
       SYS.AQ$_REG_INFO_LIST(
          SYS.AQ$_REG_INFO(
             'msg_queue:msg_queue_subscriber',
             DBMS_AQ.NAMESPACE_AQ,
             'plsql://msg_q_callback_proc',
             HEXTORAW('FF')
             )
          ),
       1
       );[/b]
END;
/


Create Public synonym msg_q_type for msg_q_type;
Grant all on msg_q_type to Public;
Create Public synonym ENQUEUE_MSG for ENQUEUE_MSG;
Grant execute on ENQUEUE_MSG to Public;

Prompt Altering table structure of CUST_MOVER_LOG 
Alter table CUST_MOVER_LOG Modify LOG_DATE Timestamp;
Alter table CUST_MOVER_LOG Modify LOG_ID NUMBER;
Re: AQ messages are in ready state - Not dequeued [message #562029 is a reply to message #562027] Fri, 27 July 2012 02:59 Go to previous message
ind9
Messages: 65
Registered: January 2009
Member
Hi,
I forgot to post my mass update procude... here it is
Input parameter: 20000

create or replace
procedure mass_update (prownum number) is
Begin

 Update customer
  set last_name = last_name||' LC' 
  where rownum < prownum;
  Commit;
End mass_update;


Thanks in advance...
Previous Topic: Data Clean up in Persistent Queue
Next Topic: Oracle Streams 11g , not working
Goto Forum:
  


Current Time: Tue Jan 07 20:30:21 CST 2025