Advanced Queueing

From Oracle FAQ
⧼orafaq-jumptonavigation⧽⧼orafaq-jumptosearch⧽

Oracle Advanced Queueing (AQ) is the Oracle database's queue management feature. AQ provides a message queuing infrastructure as integral part of the Oracle server engine. It provides an API for enqueing messages to database queues. These messages can later be dequeued for asynchronous processing. Oracle AQ also provides functionality to preserve, track, document, correlate, and query messages in queues.

History

Oracle AQ was first introduced in Oracle 8.

As of Oracle release 9.2, AQ is bundled with Standard Edition and Enterprise Edition at no extra cost.

As of Oracle release 10.1, AQ is integrated into Oracle Streams, and is called "Oracle Streams AQ".

Interfaces

Oracle AQ is available from within PL/SQL programs through the DBMS_AQADM and DBMS_AQ built-in packages.

Some of the Oracle features that utilize AQ:

With Data Guard, the primary database's queue monitor (qmn0) process interacts with AQ.

Oracle AQ is used as the internal Java Message Service provider in the Oracle Enterprise Service Bus.

Example

Prepare database user:

CONN / AS SYSDBA
CREATE USER testq IDENTIFIED BY x;
GRANT connect, resource, dba TO testq;
GRANT aq_administrator_role, aq_user_role  TO testq;
GRANT create type TO testq;

Create a queue:

CONN testq/x@ora10gtest
SET SERVEROUTPUT ON

CREATE OR REPLACE TYPE event_msg_type AS OBJECT (
  name            VARCHAR2(10),
  current_status  NUMBER(5),
  next_status     NUMBER(5)
);
/

EXECUTE DBMS_AQADM.create_queue_table( -
   queue_table        =>  'testq.event_queue_tab', -
   queue_payload_type =>  'testq.event_msg_type');

EXECUTE DBMS_AQADM.create_queue( -
   queue_name         =>  'testq.event_queue', -
   queue_table        =>  'testq.event_queue_tab');

EXECUTE DBMS_AQADM.start_queue( -
   queue_name         => 'testq.event_queue', -
   enqueue            => TRUE);

Test an enqueue...

DECLARE
  l_enqueue_options    DBMS_AQ.enqueue_options_t;
  l_message_properties DBMS_AQ.message_properties_t;
  l_message_handle     RAW(16);
  l_event_msg          TESTQ.event_msg_type;
BEGIN
  l_event_msg := TESTQ.event_msg_type('REPORTER', 1, 2);
  DBMS_AQ.enqueue(queue_name         => 'testq.event_queue',
                  enqueue_options    => l_enqueue_options,
                  message_properties => l_message_properties,
                  payload            => l_event_msg,
                  msgid              => l_message_handle);
  COMMIT;
END;
/

Test a dequeue:

DECLARE
  l_dequeue_options    DBMS_AQ.dequeue_options_t;
  l_message_properties DBMS_AQ.message_properties_t;
  l_message_handle     RAW(16);
  l_event_msg          TESTQ.event_msg_type;
BEGIN
  DBMS_AQ.dequeue(queue_name         => 'testq.event_queue',
                  dequeue_options    => l_dequeue_options,
                  message_properties => l_message_properties,
                  payload            => l_event_msg,
                  msgid              => l_message_handle);
  DBMS_OUTPUT.put_line('Event Name    : ' || l_event_msg.name);
  DBMS_OUTPUT.put_line('Current Status: ' || l_event_msg.current_status);
  DBMS_OUTPUT.put_line('Next Status   : ' || l_event_msg.next_status);
  COMMIT;
END;
/

External links