What I have:
- A Java application that is subscribing to topics with rules
- One multiconsumer queue
- A package with functions SUBSCRIBE, UNSUBSCRIBE, ENQUEUE, DEQUEUE
Subscription behavior:
- Since I need/want dynamic subscriber names I'm generating them during subscription
PROCEDURE SUBSCRIBE(
queueName IN VARCHAR2,
rule IN VARCHAR2,
subscriberName OUT NOCOPY VARCHAR2,
errorNumber OUT NUMBER
) AS
aqAgent SYS.AQ$_AGENT;
BEGIN
SELECT 'S' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF6') INTO subscriberName FROM DUAL;
aqAgent := SYS.AQ$_AGENT(subscriberName, NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(
QUEUE_NAME => queueName,
SUBSCRIBER => aqAgent,
RULE => rule
);
EXCEPTION WHEN OTHERS THEN
errorNumber := SQLCODE;
RAISE;
END SUBSCRIBE;
- These subscriptions get removed via a shutdownhook in Java that call this procedure
PROCEDURE UNSUBSCRIBE(
queueName IN VARCHAR2,
subscriberName IN VARCHAR2,
errorNumber OUT NUMBER
) AS
aqAgent SYS.AQ$_AGENT;
BEGIN
aqAgent := SYS.AQ$_AGENT(subscriberName, NULL, NULL);
DBMS_AQADM.REMOVE_SUBSCRIBER(queueName, aqAgent);
EXCEPTION WHEN OTHERS THEN
errorNumber := SQLCODE;
RAISE;
END UNSUBSCRIBE;
Everything works great to this point, BUT if someone kills the Java process that is not having the time to unsubscribe I'm getting messages queued for the subscriber that is "dead". A new client start would result in new subscriber names.
Now how do I deal with the dead subscribers? I want to have a solution on DB side and can't rely on client to clean it up.
I can't drop and recreate my queue and table because there may be clients connected in the dequeue (WAIT = DBMS_AQ.FOREVER) dropping couldn't get executed or even when it would kill the active client subscriptions. That means I have to snipe the dead subscribers only.
I have tried removing the dead subscribers that I get from the "AQ$<queue-table-name>" view which have the 'MSG_STATE' 'EXPIRED' - well that worked, BUT the exception queue is still full of messages that have NOT been removed. So I dequeued all the messages from the exception queue but these messages are not getting removed... (aq_tm_processes is set to 1 atm) Messages that have been dequeued from the client are getting removed correctly.
Anyone having same issues? I'm dealing for so many hours with this problem to find a solution...
Thanks in advance!