Skip to content

Commit 3a94168

Browse files
Added multiple consumer AQ example.
1 parent 898b480 commit 3a94168

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

samples/MultiConsumerAQ.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#------------------------------------------------------------------------------
2+
# Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
3+
#
4+
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
5+
#
6+
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
7+
# Canada. All rights reserved.
8+
#------------------------------------------------------------------------------
9+
10+
#------------------------------------------------------------------------------
11+
# MultiConsumerAQ.py
12+
# This script demonstrates how to use multi-consumer
13+
# advanced queuing using cx_Oracle. It makes use of a RAW queue
14+
# created in the sample setup.
15+
#
16+
# This script requires cx_Oracle 7.2 and higher.
17+
#------------------------------------------------------------------------------
18+
19+
import cx_Oracle
20+
import SampleEnv
21+
22+
QUEUE_NAME = "DEMO_RAW_QUEUE_MULTI"
23+
PAYLOAD_DATA = [
24+
"The first message",
25+
"The second message",
26+
"The third message",
27+
"The fourth and final message"
28+
]
29+
30+
# connect to database
31+
connection = cx_Oracle.connect(SampleEnv.GetMainConnectString())
32+
cursor = connection.cursor()
33+
34+
# create queue
35+
queue = connection.queue(QUEUE_NAME)
36+
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
37+
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
38+
39+
# enqueue a few messages
40+
print("Enqueuing messages...")
41+
for data in PAYLOAD_DATA:
42+
print(data)
43+
queue.enqOne(connection.msgproperties(payload=data))
44+
connection.commit()
45+
print()
46+
47+
# dequeue the messages for consumer A
48+
print("Dequeuing the messages for consumer A...")
49+
queue.deqOptions.consumername = "SUBSCRIBER_A"
50+
while True:
51+
props = queue.deqOne()
52+
if not props:
53+
break
54+
print(props.payload.decode())
55+
connection.commit()
56+
print()
57+
58+
# dequeue the message for consumer B
59+
print("Dequeuing the messages for consumer B...")
60+
queue.deqOptions.consumername = "SUBSCRIBER_B"
61+
while True:
62+
props = queue.deqOne()
63+
if not props:
64+
break
65+
print(props.payload.decode())
66+
connection.commit()
67+
68+
print("\nDone.")

samples/sql/SetupSamplesExec.sql

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*-----------------------------------------------------------------------------
2-
* Copyright 2017, 2019, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright 2017, 2020, Oracle and/or its affiliates. All rights reserved.
33
*---------------------------------------------------------------------------*/
44

55
/*-----------------------------------------------------------------------------
@@ -203,7 +203,7 @@ create table &main_user..PlsqlSessionCallbacks (
203203
)
204204
/
205205

206-
-- create queue table and queues for demonstrating advanced queuing
206+
-- create queue table, queues and subscribers for demonstrating Advanced Queuing
207207
begin
208208

209209
dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE_TAB',
@@ -217,6 +217,17 @@ begin
217217
'&main_user..RAW_QUEUE_TAB');
218218
dbms_aqadm.start_queue('&main_user..DEMO_RAW_QUEUE');
219219

220+
dbms_aqadm.create_queue_table('&main_user..RAW_QUEUE_MULTI_TAB', 'RAW',
221+
multiple_consumers => true);
222+
dbms_aqadm.create_queue('&main_user..DEMO_RAW_QUEUE_MULTI',
223+
'&main_user..RAW_QUEUE_MULTI_TAB');
224+
dbms_aqadm.start_queue('&main_user..DEMO_RAW_QUEUE_MULTI');
225+
226+
dbms_aqadm.add_subscriber('&main_user..DEMO_RAW_QUEUE_MULTI',
227+
sys.aq$_agent('SUBSCRIBER_A', null, null));
228+
dbms_aqadm.add_subscriber('&main_user..DEMO_RAW_QUEUE_MULTI',
229+
sys.aq$_agent('SUBSCRIBER_B', null, null));
230+
220231
end;
221232
/
222233

0 commit comments

Comments
 (0)