|
| 1 | +#------------------------------------------------------------------------------ |
| 2 | +# Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. |
| 3 | +#------------------------------------------------------------------------------ |
| 4 | + |
| 5 | +"""Module for testing AQ Bulk enqueue/dequeue.""" |
| 6 | + |
| 7 | +import TestEnv |
| 8 | + |
| 9 | +import cx_Oracle |
| 10 | +import decimal |
| 11 | +import threading |
| 12 | + |
| 13 | +RAW_QUEUE_NAME = "TEST_RAW_QUEUE" |
| 14 | +RAW_PAYLOAD_DATA = [ |
| 15 | + "The first message", |
| 16 | + "The second message", |
| 17 | + "The third message", |
| 18 | + "The fourth message", |
| 19 | + "The fifth message", |
| 20 | + "The sixth message", |
| 21 | + "The seventh message", |
| 22 | + "The eighth message", |
| 23 | + "The ninth message", |
| 24 | + "The tenth message", |
| 25 | + "The eleventh message", |
| 26 | + "The twelfth and final message" |
| 27 | +] |
| 28 | + |
| 29 | +class TestCase(TestEnv.BaseTestCase): |
| 30 | + |
| 31 | + def __deqInThread(self, results): |
| 32 | + connection = TestEnv.GetConnection(threaded=True) |
| 33 | + queue = connection.queue(RAW_QUEUE_NAME) |
| 34 | + queue.deqOptions.wait = 10 |
| 35 | + queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG |
| 36 | + while len(results) < len(RAW_PAYLOAD_DATA): |
| 37 | + messages = queue.deqMany(5) |
| 38 | + if not messages: |
| 39 | + break |
| 40 | + for m in messages: |
| 41 | + results.append(m.payload.decode(connection.encoding)) |
| 42 | + connection.commit() |
| 43 | + |
| 44 | + def __getAndClearRawQueue(self): |
| 45 | + queue = self.connection.queue(RAW_QUEUE_NAME) |
| 46 | + queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT |
| 47 | + queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG |
| 48 | + while queue.deqOne(): |
| 49 | + pass |
| 50 | + self.connection.commit() |
| 51 | + return queue |
| 52 | + |
| 53 | + def testEnqAndDeq(self): |
| 54 | + "test bulk enqueue and dequeue" |
| 55 | + queue = self.__getAndClearRawQueue() |
| 56 | + messages = [self.connection.msgproperties(payload=d) \ |
| 57 | + for d in RAW_PAYLOAD_DATA] |
| 58 | + queue.enqMany(messages) |
| 59 | + messages = queue.deqMany(len(RAW_PAYLOAD_DATA)) |
| 60 | + data = [m.payload.decode(self.connection.encoding) for m in messages] |
| 61 | + self.connection.commit() |
| 62 | + self.assertEqual(data, RAW_PAYLOAD_DATA) |
| 63 | + |
| 64 | + def testDequeueEmpty(self): |
| 65 | + "test empty bulk dequeue" |
| 66 | + queue = self.__getAndClearRawQueue() |
| 67 | + messages = queue.deqMany(5) |
| 68 | + self.connection.commit() |
| 69 | + self.assertEqual(messages, []) |
| 70 | + |
| 71 | + def testDeqWithWait(self): |
| 72 | + "test bulk dequeue with wait" |
| 73 | + queue = self.__getAndClearRawQueue() |
| 74 | + results = [] |
| 75 | + thread = threading.Thread(target=self.__deqInThread, args=(results,)) |
| 76 | + thread.start() |
| 77 | + messages = [self.connection.msgproperties(payload=d) \ |
| 78 | + for d in RAW_PAYLOAD_DATA] |
| 79 | + queue.enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE |
| 80 | + queue.enqMany(messages) |
| 81 | + thread.join() |
| 82 | + self.assertEqual(results, RAW_PAYLOAD_DATA) |
| 83 | + |
| 84 | + def testEnqAndDeqMultipleTimes(self): |
| 85 | + "test enqueue and dequeue multiple times" |
| 86 | + queue = self.__getAndClearRawQueue() |
| 87 | + dataToEnqueue = RAW_PAYLOAD_DATA |
| 88 | + for num in (2, 6, 4): |
| 89 | + messages = [self.connection.msgproperties(payload=d) \ |
| 90 | + for d in dataToEnqueue[:num]] |
| 91 | + dataToEnqueue = dataToEnqueue[num:] |
| 92 | + queue.enqMany(messages) |
| 93 | + self.connection.commit() |
| 94 | + allData = [] |
| 95 | + for num in (3, 5, 10): |
| 96 | + messages = queue.deqMany(num) |
| 97 | + allData.extend(m.payload.decode(self.connection.encoding) \ |
| 98 | + for m in messages) |
| 99 | + self.connection.commit() |
| 100 | + self.assertEqual(allData, RAW_PAYLOAD_DATA) |
| 101 | + |
| 102 | + def testEnqAndDeqVisibility(self): |
| 103 | + "test visibility option for enqueue and dequeue" |
| 104 | + queue = self.__getAndClearRawQueue() |
| 105 | + |
| 106 | + # first test with ENQ_ON_COMMIT (commit required) |
| 107 | + queue.enqOptions.visibility = cx_Oracle.ENQ_ON_COMMIT |
| 108 | + props1 = self.connection.msgproperties(payload="A first message") |
| 109 | + props2 = self.connection.msgproperties(payload="A second message") |
| 110 | + queue.enqMany([props1, props2]) |
| 111 | + otherConnection = TestEnv.GetConnection() |
| 112 | + otherQueue = otherConnection.queue(RAW_QUEUE_NAME) |
| 113 | + otherQueue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT |
| 114 | + otherQueue.deqOptions.visibility = cx_Oracle.DEQ_ON_COMMIT |
| 115 | + messages = otherQueue.deqMany(5) |
| 116 | + self.assertEqual(len(messages), 0) |
| 117 | + self.connection.commit() |
| 118 | + messages = otherQueue.deqMany(5) |
| 119 | + self.assertEqual(len(messages), 2) |
| 120 | + otherConnection.rollback() |
| 121 | + |
| 122 | + # second test with ENQ_IMMEDIATE (no commit required) |
| 123 | + queue.enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE |
| 124 | + otherQueue.deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE |
| 125 | + queue.enqMany([props1, props2]) |
| 126 | + messages = otherQueue.deqMany(5) |
| 127 | + self.assertEqual(len(messages), 4) |
| 128 | + otherConnection.rollback() |
| 129 | + messages = otherQueue.deqMany(5) |
| 130 | + self.assertEqual(len(messages), 0) |
| 131 | + |
| 132 | +if __name__ == "__main__": |
| 133 | + TestEnv.RunTestCases() |
0 commit comments