Search This Blog

Sunday, February 23, 2014

Advanced Queues - AQ's







Steps to create AQ's.


1. Create payload to be enqueued and dequeued.  [ user defined object type ]


CREATE type Message_typ as object (subject VARCHAR2(30),
                                   text    VARCHAR2(80));  

2. Create a Queue table to hold the payload from step 1 on enqueue.


begin
 DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table ='demo_qtable',
                                queue_payload_type => 'Message_typ');
 commit;
end;


3. Create a Queue and attach Queue Table from step 2.

begin
       DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue',
                                queue_table => 'demo_qtable');
   commit;
end;


DBMS_AQADM.CREATE_QUEUE (
   queue_name          IN       VARCHAR2,
   queue_table         IN       VARCHAR2,
   queue_type          IN       BINARY_INTEGER DEFAULT NORMAL_QUEUE,
   max_retries         IN       NUMBER         DEFAULT NULL,
   retry_delay         IN       NUMBER         DEFAULT 0,
   retention_time      IN       NUMBER         DEFAULT 0,
   dependency_tracking IN       BOOLEAN        DEFAULT FALSE,
   comment             IN       VARCHAR2       DEFAULT NULL,
   auto_commit         IN       BOOLEAN        DEFAULT TRUE);


queue_type
Specifies whether the queue being created is an exception queue or a normal queue. NORMAL_QUEUE means the queue is a normal queue. This is the default. EXCEPTION_QUEUE means it is an exception queue. Only the dequeue operation is allowed on the exception queue.

retention_time
Number of seconds for which a message is retained in the queue table after being dequeued from the queue. INFINITE means the message is retained forever. NUMBER is the number of seconds for which to retain the messages. The default is 0, no retention.



4. Start the Queue

begin
    DBMS_AQADM.START_QUEUE (queue_name => 'demo_queue');
    commit;
end;

5. Add subscriber (agent) to the Queue (Q-listener).

 declare
    subscriber aq$_agent;
 begin
    subscriber:=aq$_agent(name =>'demo_agent', 
                          address => NULL, 
                          protocol => NULL);
    dbms_aqadm.add_Subscriber(queue_name => 'demo_queue',
                              subscriber => subscriber);
  commit;
 end;



6. Enqueue the payload to Q-Table


declare
   enqueue_options     dbms_aq.enqueue_options_t;
   message_properties  dbms_aq.message_properties_t;
   message_handle      RAW(16);
   message             aq.message_typ;
begin
dbms_aq.enqueue(queue_name => 'demo_queue',           
                enqueue_options      => enqueue_options,       
                message_properties   => message_properties,     
                payload              => message,               
                msgid                => message_handle);
 commit;
end;

7. Dequeue payload from Q-Table



DECLARE
   dequeue_options     dbms_aq.dequeue_options_t;
   message_properties  dbms_aq.message_properties_t;
   message_handle      RAW(16);
   message             aq.message_typ;

BEGIN
   DBMS_AQ.DEQUEUE(queue_name => 'demo_queue',
                   dequeue_options    => dequeue_options,
                   message_properties => message_properties,
                   payload            => message,
                   msgid              => message_handle);

   DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject ||
                                      ' ... ' || message.text );
   COMMIT;
END;

8. Setting Message Properties 

TYPE MESSAGE_PROPERTIES_T IS RECORD (
   priority               BINARY_INTEGER  DEFAULT 1,
   delay                  BINARY_INTEGER  DEFAULT NO_DELAY,
   expiration             BINARY_INTEGER  DEFAULT NEVER,
   correlation            VARCHAR2(128)   DEFAULT NULL,
   attempts               BINARY_INTEGER,
   recipient_list         AQ$_RECIPIENT_LIST_T,
   exception_queue        VARCHAR2(51)    DEFAULT NULL,
   enqueue_time           DATE,
   state                  BINARY_INTEGER,
   sender_id              AQ$_AGENT       DEFAULT NULL, 
   original_msgid         RAW(16)         DEFAULT NULL);

Example : 

declare
    message_properties  dbms_aq.message_properties_t;
begin
     message_properties.priority := 30;

     message_properties.delay := 7*24*60*60;

     message_properties.expiration := 2*7*24*60*60;

    /* Expiration is calculated from the earliest dequeue time. So, if an application wants a message to be dequeued no earlier than a week from now, but no later than 3 weeks from now, this requires setting the expiration time for 2 weeks. */


     -- enqueue payload ---
end;

9. Setting Enqueue Properties

TYPE  ENQUEUE_OPTIONS_T IS RECORD (
   visibility            BINARY_INTEGER  DEFAULT ON_COMMIT,
   relative_msgid        RAW(16)         DEFAULT NULL,
   sequence_deviation    BINARY_INTEGER  DEFAULT NULL,
   transformation        VARCHAR2(60)    DEFAULT NULL);
visibility

Specifies the transactional behavior of the enqueue request. The possible settings follow:
ON_COMMIT: The enqueue is part of the current transaction. The operation is complete when the transaction commits. This setting is the default.
IMMEDIATE: The enqueue is not part of the current transaction. The operation constitutes a transaction on its own. This is the only value allowed when enqueuing to a non-persistent queue.
10. Setting Dequeue Properties
TYPE DEQUEUE_OPTIONS_T IS RECORD (
   consumer_name     VARCHAR2(30)    DEFAULT NULL,
   dequeue_mode      BINARY_INTEGER  DEFAULT REMOVE,
   navigation        BINARY_INTEGER  DEFAULT NEXT_MESSAGE,
   visibility        BINARY_INTEGER  DEFAULT ON_COMMIT,
   wait              BINARY_INTEGER  DEFAULT FOREVER,
   msgid             RAW(16)         DEFAULT NULL,
   correlation       VARCHAR2(128)   DEFAULT NULL,
   deq_condition     VARCHAR2(4000)  DEFAULT NULL,
   transformation    VARCHAR2(60)    DEFAULT NULL);

 Example 

     dequeue_options.navigation := FIRST_MESSAGE;
     dequeue_options.navigation := NEXT_MESSAGE;
     exception : 
     dequeue_options.navigation := DBMS_AQ.NEXT_TRANSACTION;
consumer_name

Name of the consumer. Only those messages matching the consumer name are accessed. If a queue is not set up for multiple consumers, then this field should be set to NULL.
For secure queues, consumer_name must be a valid AQ Agent

dequeue_mode

Specifies the locking behavior associated with the dequeue. The possible settings follow:
BROWSE: Read the message without acquiring any lock on the message. This specification is equivalent to a select statement.
LOCKED: Read and obtain a write lock on the message. The lock lasts for the duration of the transaction. This setting is equivalent to a select for update statement.
REMOVE: Read the message and update or delete it. This setting is the default. The message can be retained in the queue table based on the retention properties.
REMOVE_NODATA: Mark the message as updated or deleted. The message can be retained in the queue table based on the retention properties.

11. Purge Queue Table

TYPE AQ$_PURGE_OPTIONS_T is RECORD (block  BOOLEAN  DEFAULT FALSE
                                    delivery_mode   PLS_INTEGER   DEFAULT PERSISTENT);

block
TRUE/FALSE.
  • If block is TRUE, then an exclusive lock on all the queues in the queue table is held while purging the queue table. This will cause concurrent enqueuers and dequeuers to block while the queue table is purged. The purge call always succeeds if block is TRUE.
  • The default for block is FALSE. This will not block enqueuers and dequeuers, but it can cause the purge to fail with an error during high concurrency times.
delivery_mode
Kind of messages to purge, either DBMS_AQ.BUFFERED or DBMS_AQ.PERSISTENT

DECLARE
   PurgeOptions dbms_aqadm.AQ$_PURGE_OPTIONS_T;
BEGIN
   PurgeOptions.block:=True;
   DBMS_AQADM.PURGE_QUEUE_TABLE (queue_table => 'demo_queue',
                                 purge_options => PurgeOptions);
END;
/