Skip to content

Commit 258a633

Browse files
authored
Delivery workflow
Delivery workflow by using classic queue that demonstrates a point to point queue creation, enqueue, and dequeue
1 parent d208ace commit 258a633

File tree

1 file changed

+252
-0
lines changed

1 file changed

+252
-0
lines changed
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
package com.example.demo;
2+
3+
import java.sql.Connection;
4+
import java.sql.DriverManager;
5+
import java.sql.SQLException;
6+
import java.sql.Statement;
7+
import java.util.Enumeration;
8+
import java.util.Random;
9+
10+
import javax.jms.JMSException;
11+
import javax.jms.Message;
12+
import javax.jms.MessageProducer;
13+
import javax.jms.Queue;
14+
import javax.jms.QueueBrowser;
15+
import javax.jms.QueueConnection;
16+
import javax.jms.QueueConnectionFactory;
17+
import javax.jms.QueueReceiver;
18+
import javax.jms.QueueSession;
19+
import javax.jms.Session;
20+
import javax.jms.TextMessage;
21+
22+
import com.ClassicQueue.DTO.UserDetails;
23+
import com.ClassicQueue.config.JsonUtils;
24+
25+
import oracle.AQ.AQException;
26+
import oracle.AQ.AQQueueTable;
27+
import oracle.AQ.AQQueueTableProperty;
28+
import oracle.jdbc.pool.OracleDataSource;
29+
import oracle.jms.AQjmsDestination;
30+
import oracle.jms.AQjmsDestinationProperty;
31+
import oracle.jms.AQjmsFactory;
32+
import oracle.jms.AQjmsSession;
33+
34+
public class DeliveryExample {
35+
36+
static String username = "ADMIN";
37+
static String password = "MayankTayal1234";
38+
39+
static String userQueueTable = "UserQueueTable";
40+
static String userQueueName = "UserQueueName";
41+
42+
static String deliveryQueueTable = "deliveryQueueTable";
43+
static String deliveryQueueName = "deliveryQueueName";
44+
45+
static String jdbcURL = "jdbc:oracle:thin:@aqlivelabtest_high?TNS_ADMIN=/Users/mayanktayal/Code/Database/Wallet_AQLivelabTest";
46+
47+
private static String query = null;
48+
49+
public static void main(String[] args) throws JMSException, AQException, SQLException, ClassNotFoundException {
50+
51+
QueueConnectionFactory qcf = null;
52+
QueueConnection qconn = null;
53+
AQjmsSession qsession = null;
54+
Queue userQueue = null;
55+
Queue deliveryQueue = null;
56+
57+
OracleDataSource ds = new OracleDataSource();
58+
ds.setUser(username);
59+
ds.setPassword(password);
60+
ds.setURL(jdbcURL);
61+
62+
qcf = AQjmsFactory.getQueueConnectionFactory(ds);
63+
qconn = qcf.createQueueConnection(username, password);
64+
qsession = (AQjmsSession) qconn.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
65+
qconn.start();
66+
67+
userQueue = setup(qsession, username, userQueueTable, userQueueName);
68+
deliveryQueue = setup(qsession, username, deliveryQueueTable, deliveryQueueName);
69+
System.out.println("Setup is complete");
70+
71+
// Step 1: Enqueue user
72+
Random rnd = new Random();
73+
int otp = rnd.nextInt(9999);
74+
int orderId = rnd.nextInt(999);
75+
76+
UserDetails userDetails = new UserDetails(orderId, "Mayank", otp, "Pending", "US");
77+
String query = "insert into USERDETAILS values('" + orderId + "', '" + userDetails.getUsername() + "', '" + otp
78+
+ "', '" + userDetails.getDeliveryStatus() + "', '" + userDetails.getDeliveryLocation() + "')";
79+
databaseOperations(query);
80+
enqueueMessages(qsession, userQueue, userDetails);
81+
System.out.println("Step 1: Enqueue user is complete");
82+
83+
// Step 2: Enqueue delivery boy
84+
UserDetails deliveryDetails = new UserDetails(userDetails.getOrderId(), null, 0,
85+
userDetails.getDeliveryStatus(), userDetails.getDeliveryLocation());
86+
enqueueMessages(qsession, deliveryQueue, deliveryDetails);
87+
System.out.println("Step 2: Enqueue delivery boy is complete");
88+
89+
// Step 3: Dequeue User
90+
System.out.println("Step 3: Dequeue user initiated");
91+
dequeueMessages(qsession, username, userQueue, deliveryQueue, userDetails, deliveryDetails);
92+
System.out.println("All dequeue is complete");
93+
94+
cleanup(qsession, username);
95+
qsession.close();
96+
qconn.close();
97+
System.out.println("End of Delivery");
98+
}
99+
100+
public static Queue setup(QueueSession session, String username, String queueTable, String queueName)
101+
throws JMSException, AQException {
102+
AQQueueTableProperty qtprop = null;
103+
AQQueueTable qtable = null;
104+
AQjmsDestinationProperty dprop = null;
105+
Queue queue = null;
106+
107+
try {
108+
qtable = ((AQjmsSession) session).getQueueTable(username, queueTable);
109+
if (qtable != null)
110+
qtable.drop(true);
111+
} catch (Exception e) {
112+
}
113+
114+
qtprop = new AQQueueTableProperty("SYS.ANYDATA");
115+
qtprop.setMultiConsumer(false);
116+
qtprop.setCompatible("9.2.0.0.0");
117+
qtable = ((AQjmsSession) session).createQueueTable(username, queueTable, qtprop);
118+
119+
dprop = new AQjmsDestinationProperty();
120+
queue = ((AQjmsSession) session).createQueue(qtable, queueName, dprop);
121+
System.out.println("Created queue queueName");
122+
((AQjmsDestination) queue).start(session, true, true);
123+
124+
return queue;
125+
}
126+
127+
public static void enqueueMessages(QueueSession session, Queue queue, UserDetails user)
128+
throws JMSException, java.sql.SQLException {
129+
TextMessage adt_message = null;
130+
MessageProducer producer = session.createProducer(queue);
131+
adt_message = ((AQjmsSession) session).createTextMessage();
132+
String msg = JsonUtils.writeValueAsString(user);
133+
adt_message.setText(msg);
134+
producer.send(adt_message);
135+
System.out.println("Sent AdtMessage");
136+
137+
session.commit();
138+
producer.close();
139+
// QueueSender sender = session.createSender(queue);
140+
// sender.send(adt_message);
141+
// sender.close();
142+
}
143+
144+
public static void dequeueMessages(QueueSession session, String username, Queue userQueue, Queue deliveryQueue,
145+
UserDetails userData, UserDetails deliveryData)
146+
throws JMSException, java.lang.ClassNotFoundException, java.sql.SQLException {
147+
148+
QueueReceiver userReceiver = null;
149+
QueueReceiver deliveryReceiver = null;
150+
QueueReceiver appReceiver = null;
151+
152+
Message userMessage = null;
153+
Message deliveryMessage = null;
154+
155+
// Step 4: Dequeue browse for user
156+
QueueBrowser userBrowser = session.createBrowser(userQueue);
157+
Enumeration userEnum = userBrowser.getEnumeration();
158+
while (userEnum.hasMoreElements()) {
159+
userMessage = (TextMessage) userEnum.nextElement();
160+
}
161+
System.out.println("Step 4: Dequeue Browse for user: [" + ((TextMessage) userMessage).getText() + "]");
162+
163+
UserDetails userDetails = JsonUtils.read(((TextMessage) userMessage).getText(), UserDetails.class);
164+
165+
// Step 5: Enqueue for app
166+
deliveryData.setOtp(userDetails.getOtp());
167+
enqueueMessages(session, deliveryQueue, deliveryData);
168+
System.out.println("Step 5: Enqueue Browse for app: ");
169+
170+
// Step 6: Dequeue browse by app
171+
QueueBrowser deliveryBrowser = session.createBrowser(deliveryQueue);
172+
Enumeration deliveryEnum = deliveryBrowser.getEnumeration();
173+
while (deliveryEnum.hasMoreElements()) {
174+
deliveryMessage = (TextMessage) deliveryEnum.nextElement();
175+
}
176+
System.out.println("Step 6: Dequeue Browse by app: [" + ((TextMessage) deliveryMessage).getText() + "]");
177+
178+
UserDetails deliveryDetails = JsonUtils.read(((TextMessage) deliveryMessage).getText(), UserDetails.class);
179+
query = "UPDATE USERDETAILS set Delivery_Status = 'DELIVERED' WHERE ORDERID = '" + deliveryDetails.getOrderId()
180+
+ "' AND OTP = '" + deliveryDetails.getOtp() + "'";
181+
182+
// Step 7: Match user OTP and app OTP
183+
if (userDetails.getOtp() == deliveryDetails.getOtp()) {
184+
System.out.println("Step 7: OTP matched" + userDetails.getOtp() + " == " + deliveryDetails.getOtp());
185+
186+
// Step 8: dequeue remove
187+
userReceiver = session.createReceiver(userQueue);
188+
TextMessage userMsg = (TextMessage) userReceiver.receive();
189+
System.out.println("Dequeue user receiver: " + userMsg.getText());
190+
191+
deliveryReceiver = session.createReceiver(deliveryQueue);
192+
TextMessage deliveryMsg = (TextMessage) deliveryReceiver.receive();
193+
System.out.println("Dequeue delivery receiver: " + deliveryMsg.getText());
194+
195+
appReceiver = session.createReceiver(deliveryQueue);
196+
TextMessage appMsg = (TextMessage) appReceiver.receive();
197+
System.out.println("Dequeue app receiver: " + appMsg.getText());
198+
199+
System.out.println("Step 8: Dequeue all receiver");
200+
201+
// Step 9: Update DB
202+
query = "UPDATE USERDETAILS set Delivery_Status = 'DELIVERED' WHERE ORDERID = '"
203+
+ deliveryDetails.getOrderId() + "' AND OTP = '" + deliveryDetails.getOtp() + "'";
204+
databaseOperations(query);
205+
System.out.println("Step 9: Update Delivery Status as DELIVERED in DB");
206+
207+
} else {
208+
// Step 9: Update DB
209+
query = "UPDATE USERDETAILS set Delivery_Status = 'FAILED' WHERE ORDERID = '" + deliveryDetails.getOrderId()
210+
+ "' AND OTP = '" + deliveryDetails.getOtp() + "'";
211+
databaseOperations(query);
212+
System.out.println("Step 9: Update Delivery Status as FAILED in DB");
213+
214+
}
215+
userBrowser.close();
216+
deliveryBrowser.close();
217+
218+
userReceiver.close();
219+
deliveryReceiver.close();
220+
appReceiver.close();
221+
222+
session.commit();
223+
}
224+
225+
private static void cleanup(QueueSession session, String user) throws JMSException, AQException {
226+
227+
AQQueueTable userTable = ((AQjmsSession) session).getQueueTable(user, userQueueTable);
228+
AQQueueTable deliveryTable = ((AQjmsSession) session).getQueueTable(user, deliveryQueueTable);
229+
230+
if (userTable != null && deliveryTable != null) {
231+
userTable.drop(true);
232+
deliveryTable.drop(true);
233+
System.out.println("Queue tables dropped successfully");
234+
} else {
235+
System.out.println("Queue tables dropped failed");
236+
}
237+
}
238+
239+
public static void databaseOperations(String queryData) throws ClassNotFoundException, SQLException {
240+
Class.forName("oracle.jdbc.driver.OracleDriver");
241+
Connection con = DriverManager.getConnection(jdbcURL, username, password);
242+
Statement stmt = con.createStatement();
243+
244+
int x = stmt.executeUpdate(queryData);
245+
if (x > 0) {
246+
System.out.println("Successfully executed database operation");
247+
} else {
248+
System.out.println("Failed database operation");
249+
}
250+
con.close();
251+
}
252+
}

0 commit comments

Comments
 (0)