Steps to create AQ's.
1. Create payload to be enqueued and dequeued. [ user defined object type ]
CREATE type Message_typ as object (subject VARCHAR2(30),
text VARCHAR2(80));
2. Create a Queue table to hold the payload from step 1 on enqueue.
begin
DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table ='demo_qtable',
queue_payload_type => 'Message_typ');
commit;
end;
3. Create a Queue and attach Queue Table from step 2.
begin
DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue',
queue_table => 'demo_qtable');
commit;
end;
DBMS_AQADM.CREATE_QUEUE ( queue_name IN VARCHAR2, queue_table IN VARCHAR2, queue_type IN BINARY_INTEGER DEFAULT NORMAL_QUEUE, max_retries IN NUMBER DEFAULT NULL, retry_delay IN NUMBER DEFAULT 0, retention_time IN NUMBER DEFAULT 0, dependency_tracking IN BOOLEAN DEFAULT FALSE, comment IN VARCHAR2 DEFAULT NULL, auto_commit IN BOOLEAN DEFAULT TRUE);
queue_type |
Specifies whether the queue being created is an exception queue or a normal queue.
NORMAL_QUEUE means the queue is a normal queue. This is the default. EXCEPTION_QUEUE means it is an exception queue. Only the dequeue operation is allowed on the exception queue. |
retention_time |
Number of seconds for which a message is retained in the queue table after being dequeued from the queue.
INFINITE means the message is retained forever. NUMBER is the number of seconds for which to retain the messages. The default is 0, no retention. |
4. Start the Queue
begin
DBMS_AQADM.START_QUEUE (queue_name => 'demo_queue');
commit;
end;
5. Add subscriber (agent) to the Queue (Q-listener).
declare
subscriber aq$_agent;
begin
subscriber:=aq$_agent(name =>'demo_agent',
address => NULL,
protocol => NULL);
dbms_aqadm.add_Subscriber(queue_name => 'demo_queue',
subscriber => subscriber);
commit;
end;
6. Enqueue the payload to Q-Table
declare
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message aq.message_typ;
begin
dbms_aq.enqueue(queue_name => 'demo_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle);
commit;
end;
7. Dequeue payload from Q-Table
DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN DBMS_AQ.DEQUEUE(queue_name => 'demo_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END;
8. Setting Message Properties
TYPE MESSAGE_PROPERTIES_T IS RECORD ( priority BINARY_INTEGER DEFAULT 1, delay BINARY_INTEGER DEFAULT NO_DELAY, expiration BINARY_INTEGER DEFAULT NEVER, correlation VARCHAR2(128) DEFAULT NULL, attempts BINARY_INTEGER, recipient_list AQ$_RECIPIENT_LIST_T, exception_queue VARCHAR2(51) DEFAULT NULL, enqueue_time DATE, state BINARY_INTEGER, sender_id AQ$_AGENT DEFAULT NULL, original_msgid RAW(16) DEFAULT NULL);
Example :declaremessage_properties dbms_aq.message_properties_t;beginmessage_properties.priority := 30;message_properties.delay := 7*24*60*60;message_properties.expiration := 2*7*24*60*60;/* Expiration is calculated from the earliest dequeue time. So, if an application wants a message to be dequeued no earlier than a week from now, but no later than 3 weeks from now, this requires setting the expiration time for 2 weeks. */
-- enqueue payload ---end;9. Setting Enqueue Properties
TYPE ENQUEUE_OPTIONS_T IS RECORD ( visibility BINARY_INTEGER DEFAULT ON_COMMIT, relative_msgid RAW(16) DEFAULT NULL, sequence_deviation BINARY_INTEGER DEFAULT NULL, transformation VARCHAR2(60) DEFAULT NULL);
10. Setting Dequeue PropertiesTYPE DEQUEUE_OPTIONS_T IS RECORD ( consumer_name VARCHAR2(30) DEFAULT NULL, dequeue_mode BINARY_INTEGER DEFAULT REMOVE, navigation BINARY_INTEGER DEFAULT NEXT_MESSAGE, visibility BINARY_INTEGER DEFAULT ON_COMMIT, wait BINARY_INTEGER DEFAULT FOREVER, msgid RAW(16) DEFAULT NULL, correlation VARCHAR2(128) DEFAULT NULL, deq_condition VARCHAR2(4000) DEFAULT NULL, transformation VARCHAR2(60) DEFAULT NULL);
Example
dequeue_options.navigation := FIRST_MESSAGE;dequeue_options.navigation := NEXT_MESSAGE;exception :dequeue_options.navigation := DBMS_AQ.NEXT_TRANSACTION;11. Purge Queue Table
TYPE AQ$_PURGE_OPTIONS_T is RECORD (block BOOLEAN DEFAULT FALSE delivery_mode PLS_INTEGER DEFAULT PERSISTENT);
block | TRUE/FALSE .
|
delivery_mode |
Kind of messages to purge, either
DBMS_AQ.BUFFERED or DBMS_AQ.PERSISTENT |
DECLARE PurgeOptions dbms_aqadm.AQ$_PURGE_OPTIONS_T; BEGIN PurgeOptions.block:=True; DBMS_AQADM.PURGE_QUEUE_TABLE (queue_table => 'demo_queue',
purge_options => PurgeOptions); END; /