Skip to content

Commit 72a2446

Browse files
authored
Lab1 for AQ and TEQ workflows
Basic AQ example that demonstrates queue creation, enqueue, dequeue browse, and dequeue remove.
1 parent 3271bb7 commit 72a2446

File tree

1 file changed

+310
-0
lines changed

1 file changed

+310
-0
lines changed

aqteqworkflows/Lab1/ClassicQueue.java

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
package com.example.demo;
2+
3+
import java.sql.Connection;
4+
import java.sql.DriverManager;
5+
import java.sql.SQLException;
6+
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import oracle.AQ.AQAgent;
11+
import oracle.AQ.AQDequeueOption;
12+
import oracle.AQ.AQDriverManager;
13+
import oracle.AQ.AQEnqueueOption;
14+
import oracle.AQ.AQException;
15+
import oracle.AQ.AQMessage;
16+
import oracle.AQ.AQMessageProperty;
17+
import oracle.AQ.AQOracleSession;
18+
import oracle.AQ.AQQueue;
19+
import oracle.AQ.AQQueueProperty;
20+
import oracle.AQ.AQQueueTable;
21+
import oracle.AQ.AQQueueTableProperty;
22+
import oracle.AQ.AQRawPayload;
23+
import oracle.AQ.AQSession;
24+
25+
public class ClassicQueue {
26+
27+
static Logger logger = LoggerFactory.getLogger(QueueExamples.class);
28+
29+
public static void main(String[] args){
30+
AQSession aq_sess = null;
31+
try {
32+
aq_sess = createSession();
33+
34+
createQueue(aq_sess);
35+
logger.info("---------------createQueue successfully-------------------");
36+
37+
useCreatedQueue(aq_sess);
38+
logger.info("---------------useCreatedQueue successfully---------------");
39+
40+
enqueueAndDequeue(aq_sess);
41+
logger.info("---------------enqueueAndDequeue successfully-------------");
42+
43+
multiuserQueue(aq_sess);
44+
logger.info("---------------multiuserQueue successfully----------------");
45+
46+
enqueueRAWMessages(aq_sess);
47+
logger.info("---------------enqueueRAWMessages successfully------------");
48+
49+
dequeueRawMessages(aq_sess);
50+
logger.info("---------------dequeueRAWMessages successfully------------");
51+
52+
dequeueMessagesBrowseMode(aq_sess);
53+
logger.info("---------------dequeueMessagesBrowseMode successfully------");
54+
55+
enqueueMessagesWithPriority(aq_sess);
56+
logger.info("---------------enqueueMessagesWithPriority successfully----");
57+
58+
} catch (Exception ex) {
59+
logger.info("Exception-1: " + ex);
60+
ex.printStackTrace();
61+
}
62+
}
63+
64+
public static AQSession createSession() {
65+
Connection db_conn;
66+
AQSession aq_sess = null;
67+
try {
68+
Class.forName("oracle.jdbc.driver.OracleDriver");
69+
db_conn = DriverManager.getConnection(
70+
"jdbc:oracle:thin:@teqtest_high?TNS_ADMIN=/Users/mayanktayal/Code/Database/Wallet_TEQTest","WINGARDIUM", "password");
71+
72+
logger.info("JDBC Connection opened ");
73+
db_conn.setAutoCommit(false);
74+
Class.forName("oracle.AQ.AQOracleDriver");
75+
76+
/* Creating an AQ Session: */
77+
aq_sess = AQDriverManager.createAQSession(db_conn);
78+
79+
logger.info("Successfully created AQSession ");
80+
} catch (Exception ex) {
81+
logger.info("Exception: " + ex);
82+
ex.printStackTrace();
83+
}
84+
return aq_sess;
85+
}
86+
87+
public static void createQueue(AQSession aq_sess) throws AQException {
88+
AQQueueTableProperty qtable_prop = new AQQueueTableProperty("RAW");
89+
AQQueueProperty queue_prop = new AQQueueProperty();
90+
91+
AQQueueTable q_table = aq_sess.createQueueTable("WINGARDIUM", "classicRawQueueTable", qtable_prop);
92+
logger.info("Successfully created classicRawQueueTable in WINGARDIUM schema");
93+
94+
AQQueue queue = aq_sess.createQueue(q_table, "classicRawQueue", queue_prop);
95+
logger.info("Successfully created classicRawQueue in classicRawQueue");
96+
}
97+
98+
public static void useCreatedQueue(AQSession aq_sess) throws AQException {
99+
100+
AQQueueTable q_table = aq_sess.getQueueTable("WINGARDIUM", "classicRawQueueTable");
101+
logger.info("Successful getQueueTable");
102+
103+
AQQueue queue = aq_sess.getQueue("WINGARDIUM", "classicRawQueue1");
104+
logger.info("Successful getQueue"+ queue.getQueueTableName() +"------"+queue.getName());
105+
}
106+
107+
public static void enqueueAndDequeue(AQSession aq_sess) throws AQException {
108+
109+
AQQueueTableProperty qtable_prop = new AQQueueTableProperty("RAW");
110+
// qtable_prop.setCompatible("8.1");
111+
112+
AQQueueTable q_table = aq_sess.createQueueTable("WINGARDIUM", "classicRawQueueTable2", qtable_prop);
113+
logger.info("Successful createQueueTable");
114+
115+
AQQueueProperty queue_prop = new AQQueueProperty();
116+
117+
AQQueue queue = aq_sess.createQueue(q_table, "classicRawQueue2", queue_prop);
118+
logger.info("Successful createQueue");
119+
120+
queue.start(true, true);
121+
logger.info("Successful start queue");
122+
123+
queue.grantQueuePrivilege("ENQUEUE", "WINGARDIUM");
124+
logger.info("Successful grantQueuePrivilege");
125+
}
126+
127+
public static void multiuserQueue(AQSession aq_sess) throws AQException {
128+
129+
AQAgent subs111, subs222;
130+
131+
/* Creating a AQQueueTable property object (payload type - RAW): */
132+
AQQueueTableProperty qtable_prop = new AQQueueTableProperty("RAW");
133+
134+
/* Creating a new AQQueueProperty object: */
135+
AQQueueProperty queue_prop = new AQQueueProperty();
136+
137+
/* Set multiconsumer flag to true: */
138+
qtable_prop.setMultiConsumer(true);
139+
140+
/* Creating a queue table in WINGARDIUM schema: */
141+
AQQueueTable q_table = aq_sess.createQueueTable("WINGARDIUM", "classicRawMultiUserQueueTable1", qtable_prop);
142+
logger.info("Successful createQueueTable");
143+
144+
AQQueue queue = aq_sess.createQueue(q_table, "classicRawMultiUserQueue1", queue_prop);
145+
logger.info("Successful createQueue");
146+
147+
/* Enable enqueue/dequeue on this queue: */
148+
queue.start();
149+
logger.info("Successful start queue");
150+
151+
/* Add subscribers to this queue: */
152+
subs111 = new AQAgent("GREEN", null, 0);
153+
subs222 = new AQAgent("BLUE", null, 0);
154+
155+
queue.addSubscriber(subs111, null); /* no rule */
156+
logger.info("Successful addSubscriber 111");
157+
158+
queue.addSubscriber(subs222, "priority < 2"); /* with rule */
159+
logger.info("Successful addSubscriber 222");
160+
}
161+
162+
public static void enqueueRAWMessages(AQSession aq_sess) throws AQException, SQLException {
163+
164+
String test_data = "new message";
165+
byte[] b_array;
166+
167+
Connection db_conn = ((AQOracleSession) aq_sess).getDBConnection();
168+
169+
/* Get a handle to a queue-classicRawMultiUserQueue in WINGARDIUM schema: */
170+
AQQueue queue = aq_sess.getQueue("WINGARDIUM", "classicRawMultiUserQueue");
171+
logger.info("Successful getQueue");
172+
173+
/* Creating a message to contain raw payload: */
174+
AQMessage message = queue.createMessage();
175+
176+
/* Get handle to the AQRawPayload object and populate it with raw data: */
177+
b_array = test_data.getBytes();
178+
179+
AQRawPayload raw_payload = message.getRawPayload();
180+
181+
raw_payload.setStream(b_array, b_array.length);
182+
183+
/* Creating a AQEnqueueOption object with default options: */
184+
AQEnqueueOption enq_option = new AQEnqueueOption();
185+
186+
/* Enqueue the message: */
187+
queue.enqueue(enq_option, message);
188+
189+
db_conn.commit();
190+
}
191+
192+
public static void dequeueRawMessages(AQSession aq_sess) throws AQException, SQLException {
193+
194+
String test_data = "new message";
195+
byte[] b_array;
196+
197+
Connection db_conn = ((AQOracleSession) aq_sess).getDBConnection();
198+
AQQueue queue = aq_sess.getQueue("WINGARDIUM", "classicRawMultiUserQueue");
199+
200+
AQMessage message = queue.createMessage();
201+
b_array = test_data.getBytes();
202+
AQRawPayload raw_payload = message.getRawPayload();
203+
raw_payload.setStream(b_array, b_array.length);
204+
AQEnqueueOption enq_option = new AQEnqueueOption();
205+
queue.enqueue(enq_option, message);
206+
db_conn.commit();
207+
208+
/* Creating a AQDequeueOption object with default options: */
209+
AQDequeueOption deq_option = new AQDequeueOption();
210+
211+
/* Dequeue a message: */
212+
deq_option.setConsumerName("GREEN");
213+
message = queue.dequeue(deq_option);
214+
215+
/* Retrieve raw data from the message: */
216+
raw_payload = message.getRawPayload();
217+
218+
b_array = raw_payload.getBytes();
219+
220+
db_conn.commit();
221+
}
222+
223+
public static void dequeueMessagesBrowseMode(AQSession aq_sess) throws AQException, SQLException {
224+
225+
String test_data = "new message";
226+
byte[] b_array;
227+
228+
Connection db_conn = ((AQOracleSession) aq_sess).getDBConnection();
229+
230+
/* Get a handle to a queue in WINGARDIUM schema: */
231+
AQQueue queue = aq_sess.getQueue("WINGARDIUM", "classicRawMultiUserQueue");
232+
logger.info("Successful getQueue");
233+
234+
/* Creating a message to contain raw payload: */
235+
AQMessage message = queue.createMessage();
236+
237+
/* Get handle to the AQRawPayload object and populate it with raw data: */
238+
b_array = test_data.getBytes();
239+
240+
AQRawPayload raw_payload = message.getRawPayload();
241+
raw_payload.setStream(b_array, b_array.length);
242+
243+
AQEnqueueOption enq_option = new AQEnqueueOption();
244+
245+
queue.enqueue(enq_option, message);
246+
logger.info("Successful enqueue");
247+
248+
db_conn.commit();
249+
250+
AQDequeueOption deq_option = new AQDequeueOption();
251+
252+
deq_option.setConsumerName("GREEN");
253+
254+
/* Set dequeue mode to BROWSE: */
255+
deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE);
256+
257+
/* Set wait time to 10 seconds: */
258+
deq_option.setWaitTime(10);
259+
260+
/* Dequeue a message: */
261+
message = queue.dequeue(deq_option);
262+
263+
/* Retrieve raw data from the message: */
264+
raw_payload = message.getRawPayload();
265+
b_array = raw_payload.getBytes();
266+
267+
String ret_value = new String(b_array);
268+
logger.info("Dequeued message: " + ret_value);
269+
270+
db_conn.commit();
271+
}
272+
273+
public static void enqueueMessagesWithPriority(AQSession aq_sess) throws AQException, SQLException {
274+
275+
String test_data;
276+
byte[] b_array;
277+
Connection db_conn = ((AQOracleSession) aq_sess).getDBConnection();
278+
279+
/* Get a handle to a queue in WINGARDIUM schema: */
280+
AQQueue queue = aq_sess.getQueue("WINGARDIUM", "classicRawMultiUserQueue");
281+
logger.info("Successful getQueue");
282+
283+
/* Enqueue 5 messages with priorities with different priorities: */
284+
for (int i = 0; i < 5; i++) {
285+
/* Creating a message to contain raw payload: */
286+
AQMessage message = queue.createMessage();
287+
288+
test_data = "Small_message_" + (i + 1); /* some test data */
289+
290+
b_array = test_data.getBytes();
291+
292+
AQRawPayload raw_payload = message.getRawPayload();
293+
raw_payload.setStream(b_array, b_array.length);
294+
AQMessageProperty m_property = message.getMessageProperty();
295+
296+
if (i < 2)
297+
m_property.setPriority(2);
298+
else
299+
m_property.setPriority(3);
300+
301+
AQEnqueueOption enq_option = new AQEnqueueOption();
302+
303+
/* Enqueue the message: */
304+
queue.enqueue(enq_option, message);
305+
logger.info("Successful enqueue");
306+
}
307+
db_conn.commit();
308+
}
309+
310+
}

0 commit comments

Comments
 (0)