From cd8ceafc1b7e5eb2be601b40342b4234f6ecd0aa Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 1 Sep 2022 18:33:18 +1000 Subject: [PATCH 01/12] Add threading queue shutdown * Include docs --- Doc/library/queue.rst | 36 +++++++++++++++++++++++++++++ Lib/queue.py | 52 ++++++++++++++++++++++++++++++++++++++++++ Lib/test/test_queue.py | 35 ++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index b2b787c5a8260c..d9952f1882ae05 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -93,6 +93,14 @@ The :mod:`queue` module defines the following classes and exceptions: on a :class:`Queue` object which is full. +.. exception:: ShutDown + + Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on + a :class:`Queue` object which has been shut down. + + .. versionadded:: 3.12 + + .. _queueobjects: Queue Objects @@ -135,6 +143,8 @@ provide the public methods described below. immediately available, else raise the :exc:`Full` exception (*timeout* is ignored in that case). + Raises :exc:`ShutDown` if the queue has been shut down. + .. method:: Queue.put_nowait(item) @@ -155,6 +165,9 @@ provide the public methods described below. an uninterruptible wait on an underlying lock. This means that no exceptions can occur, and in particular a SIGINT will not trigger a :exc:`KeyboardInterrupt`. + Raises :exc:`ShutDown` if the queue has been shut down and is empty, or if + the queue has been shut down immediately. + .. method:: Queue.get_nowait() @@ -177,6 +190,8 @@ fully processed by daemon consumer threads. Raises a :exc:`ValueError` if called more times than there were items placed in the queue. + Raises :exc:`ShutDown` if the queue has been shut down immediately. + .. method:: Queue.join() @@ -187,6 +202,8 @@ fully processed by daemon consumer threads. indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, :meth:`join` unblocks. + Raises :exc:`ShutDown` if the queue has been shut down immediately. + Example of how to wait for enqueued tasks to be completed:: @@ -214,6 +231,25 @@ Example of how to wait for enqueued tasks to be completed:: print('All work completed') +Terminating queues +^^^^^^^^^^^^^^^^^^ + +:class:`Queue` objects can be made to prevent further interaction by shutting +them down. + +.. method:: Queue.shutdown(immediate=False) + + Shut-down the queue, making queue gets and puts raise :exc:`ShutDown`. + + By default, gets will only raise once the queue is empty. Set + *immediate* to true to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if *immediate* is true. + + .. versionadded:: 3.12 + + SimpleQueue Objects ------------------- diff --git a/Lib/queue.py b/Lib/queue.py index 55f50088460f9e..f6af7cb6df5cff 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -25,6 +25,15 @@ class Full(Exception): pass +class ShutDown(Exception): + '''Raised when put/get with shut-down queue.''' + + +_queue_alive = "alive" +_queue_shutdown = "shutdown" +_queue_shutdown_immediate = "shutdown-immediate" + + class Queue: '''Create a queue object with a given maximum size. @@ -54,6 +63,9 @@ def __init__(self, maxsize=0): self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 + # Queue shut-down state + self.shutdown_state = _queue_alive + def task_done(self): '''Indicate that a formerly enqueued task is complete. @@ -87,6 +99,8 @@ def join(self): ''' with self.all_tasks_done: while self.unfinished_tasks: + if self.shutdown_state == _queue_shutdown_immediate: + return self.all_tasks_done.wait() def qsize(self): @@ -130,6 +144,8 @@ def put(self, item, block=True, timeout=None): is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' + if self.shutdown_state != _queue_alive: + raise ShutDown with self.not_full: if self.maxsize > 0: if not block: @@ -138,6 +154,8 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() + if self.shutdown_state != _queue_alive: + raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -147,6 +165,8 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) + if self.shutdown_state != _queue_alive: + raise ShutDown self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() @@ -162,22 +182,36 @@ def get(self, block=True, timeout=None): available, else raise the Empty exception ('timeout' is ignored in that case). ''' + if self.shutdown_state == _queue_shutdown_immediate: + raise ShutDown with self.not_empty: if not block: if not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown raise Empty elif timeout is None: while not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown self.not_empty.wait() + if self.shutdown_state != _queue_alive: + raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) + if self.shutdown_state != _queue_alive: + raise ShutDown + if self.shutdown_state == _queue_shutdown_immediate: + raise ShutDown item = self._get() self.not_full.notify() return item @@ -198,6 +232,24 @@ def get_nowait(self): ''' return self.get(block=False) + def shutdown(self, immediate=False): + '''Shut-down the queue, making queue gets and puts raise. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if 'immediate'. The ShutDown exception is raised. + ''' + with self.mutex: + if immediate: + self.shutdown_state = _queue_shutdown_immediate + self.not_empty.notify_all() + self.all_tasks_done.notify_all() + else: + self.shutdown_state = _queue_shutdown + self.not_full.notify_all() + # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 33113a72e6b6a9..354299b9a5b16a 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -241,6 +241,41 @@ def test_shrinking_queue(self): with self.assertRaises(self.queue.Full): q.put_nowait(4) + def test_shutdown_empty(self): + q = self.type2test() + q.shutdown() + try: + q.put("data") + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + + def test_shutdown_nonempty(self): + q = self.type2test() + q.put("data") + q.shutdown() + q.get() + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + + def test_shutdown_immediate(self): + q = self.type2test() + q.put("data") + q.shutdown(immediate=True) + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + class QueueTest(BaseQueueTestMixin): def setUp(self): From 9c2971be6fe5894856c2fd8f636d53185bf18693 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:52:33 +0100 Subject: [PATCH 02/12] Fix queue shutdown * Include raised exception in docstrings * Handle queue shutdown in task_done and join * Factor out queue-state checks and updates to methods * Logic fixes in qsize, get and shutdown * Don't set unfinished_tasks to 0 on immediate shutdown * Updated tests * Document feature added in 3.13 --- Doc/library/queue.rst | 4 +- Lib/queue.py | 68 ++++--- Lib/test/test_queue.py | 395 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 429 insertions(+), 38 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index d9952f1882ae05..33d6f2f4c85e1c 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -98,7 +98,7 @@ The :mod:`queue` module defines the following classes and exceptions: Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on a :class:`Queue` object which has been shut down. - .. versionadded:: 3.12 + .. versionadded:: 3.13 .. _queueobjects: @@ -247,7 +247,7 @@ them down. All blocked callers of put() will be unblocked, and also get() and join() if *immediate* is true. - .. versionadded:: 3.12 + .. versionadded:: 3.13 SimpleQueue Objects diff --git a/Lib/queue.py b/Lib/queue.py index f6af7cb6df5cff..f8a7ba072247f0 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -33,7 +33,6 @@ class ShutDown(Exception): _queue_shutdown = "shutdown" _queue_shutdown_immediate = "shutdown-immediate" - class Queue: '''Create a queue object with a given maximum size. @@ -63,7 +62,7 @@ def __init__(self, maxsize=0): self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 - # Queue shut-down state + # Queue shutdown state self.shutdown_state = _queue_alive def task_done(self): @@ -79,8 +78,12 @@ def task_done(self): Raises a ValueError if called more times than there were items placed in the queue. + + Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: + if self._is_shutdown_immediate(): + raise ShutDown unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: @@ -96,12 +99,16 @@ def join(self): to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. + + Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: + if self._is_shutdown_immediate(): + raise ShutDown while self.unfinished_tasks: - if self.shutdown_state == _queue_shutdown_immediate: - return self.all_tasks_done.wait() + if self._is_shutdown_immediate(): + raise ShutDown def qsize(self): '''Return the approximate size of the queue (not reliable!).''' @@ -143,10 +150,12 @@ def put(self, item, block=True, timeout=None): Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). + + Raises ShutDown if the queue has been shut down. ''' - if self.shutdown_state != _queue_alive: - raise ShutDown with self.not_full: + if not self._is_alive(): + raise ShutDown if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: @@ -154,7 +163,7 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() - if self.shutdown_state != _queue_alive: + if not self._is_alive(): raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") @@ -165,7 +174,7 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) - if self.shutdown_state != _queue_alive: + if not self._is_alive(): raise ShutDown self._put(item) self.unfinished_tasks += 1 @@ -181,37 +190,33 @@ def get(self, block=True, timeout=None): Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). + + Raises ShutDown if the queue has been shut down and is empty, + or if the queue has been shut down immediately. ''' - if self.shutdown_state == _queue_shutdown_immediate: - raise ShutDown with self.not_empty: + if self._is_shutdown_immediate() or\ + (self._is_shutdown() and not self._qsize()): + raise ShutDown if not block: if not self._qsize(): - if self.shutdown_state != _queue_alive: - raise ShutDown raise Empty elif timeout is None: while not self._qsize(): - if self.shutdown_state != _queue_alive: - raise ShutDown self.not_empty.wait() - if self.shutdown_state != _queue_alive: + if self._is_shutdown_immediate(): raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): - if self.shutdown_state != _queue_alive: - raise ShutDown remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self.shutdown_state != _queue_alive: + if self._is_shutdown_immediate(): raise ShutDown - if self.shutdown_state == _queue_shutdown_immediate: - raise ShutDown item = self._get() self.not_full.notify() return item @@ -242,14 +247,33 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The ShutDown exception is raised. ''' with self.mutex: + if self._is_shutdown_immediate(): + return if immediate: - self.shutdown_state = _queue_shutdown_immediate + self._set_shutdown_immediate() self.not_empty.notify_all() + # release all blocked threads in `join()` self.all_tasks_done.notify_all() else: - self.shutdown_state = _queue_shutdown + self._set_shutdown() self.not_full.notify_all() + def _is_alive(self): + return self.shutdown_state == _queue_alive + + def _is_shutdown(self): + return self.shutdown_state == _queue_shutdown + + def _is_shutdown_immediate(self): + return self.shutdown_state == _queue_shutdown_immediate + + def _set_shutdown(self): + self.shutdown_state = _queue_shutdown + + def _set_shutdown_immediate(self): + self.shutdown_state = _queue_shutdown_immediate + + # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 354299b9a5b16a..d9e840a7c861ed 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -244,38 +244,405 @@ def test_shrinking_queue(self): def test_shutdown_empty(self): q = self.type2test() q.shutdown() - try: + with self.assertRaises(self.queue.ShutDown): q.put("data") - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass def test_shutdown_nonempty(self): q = self.type2test() q.put("data") q.shutdown() q.get() - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass def test_shutdown_immediate(self): q = self.type2test() q.put("data") q.shutdown(immediate=True) - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") + + def test_shutdown_allowed_transitions(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.type2test() + self.assertEqual("alive", q.shutdown_state) + + q.shutdown() + self.assertEqual("shutdown", q.shutdown_state) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q.shutdown_state) + + q.shutdown(immediate=False) + self.assertNotEqual("shutdown", q.shutdown_state) + + def _shutdown_all_methods_in_one_thread(self, immediate): + q = self.type2test(2) + q.put("L") + q.put_nowait("O") + q.shutdown(immediate) + + with self.assertRaises(self.queue.ShutDown): + q.put("E") + with self.assertRaises(self.queue.ShutDown): + q.put_nowait("W") + if immediate: + with self.assertRaises(self.queue.ShutDown): + q.get() + with self.assertRaises(self.queue.ShutDown): + q.get_nowait() + with self.assertRaises(self.queue.ShutDown): + q.task_done() + with self.assertRaises(self.queue.ShutDown): + q.join() + else: + self.assertIn(q.get(), "LO") + q.task_done() + self.assertIn(q.get(), "LO") + q.task_done() + q.join() + # on shutdown(immediate=False) + # when queue is empty, should raise ShutDown Exception + with self.assertRaises(self.queue.ShutDown): + q.get() # p.get(True) + with self.assertRaises(self.queue.ShutDown): + q.get_nowait() # p.get(False) + with self.assertRaises(self.queue.ShutDown): + q.get(True, 1.0) + + def test_shutdown_all_methods_in_one_thread(self): + return self._shutdown_all_methods_in_one_thread(False) + + def test_shutdown_immediate_all_methods_in_one_thread(self): + return self._shutdown_all_methods_in_one_thread(True) + + def _write_msg_thread(self, q, n, results, delay, + i_when_exec_shutdown, + event_start, event_end): + event_start.wait() + for i in range(1, n+1): + try: + q.put((i, "YDLO")) + results.append(True) + except self.queue.ShutDown: + results.append(False) + # triggers shutdown of queue + if i == i_when_exec_shutdown: + event_end.set() + time.sleep(delay) + # end of all puts + try: + q.join() + except self.queue.ShutDown: + pass + + def _read_msg_thread(self, q, nb, results, delay, event_start): + event_start.wait() + block = True + while nb: + time.sleep(delay) + try: + # Get at least one message + q.get(block) + block = False + q.task_done() + results.append(True) + nb -= 1 + except self.queue.ShutDown: + results.append(False) + nb -= 1 + except self.queue.Empty: + pass + try: + q.join() + except self.queue.ShutDown: + pass + + def _shutdown_thread(self, q, event_end, immediate): + event_end.wait() + q.shutdown(immediate) + try: + q.join() except self.queue.ShutDown: pass + def _join_thread(self, q, delay, event_start): + event_start.wait() + time.sleep(delay) + try: + q.join() + except self.queue.ShutDown: + pass + + def _shutdown_all_methods_in_many_threads(self, immediate): + q = self.type2test() + ps = [] + ev_start = threading.Event() + ev_exec_shutdown = threading.Event() + res_puts = [] + res_gets = [] + delay = 1e-4 + read_process = 4 + nb_msgs = read_process * 16 + nb_msgs_r = nb_msgs // read_process + when_exec_shutdown = nb_msgs // 2 + lprocs = ( + (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay, + when_exec_shutdown, + ev_start, ev_exec_shutdown)), + (self._read_msg_thread, read_process, (q, nb_msgs_r, + res_gets, delay*2, + ev_start)), + (self._join_thread, 2, (q, delay*2, ev_start)), + (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), + ) + # start all threds + for func, n, args in lprocs: + for i in range(n): + ps.append(threading.Thread(target=func, args=args)) + ps[-1].start() + # set event in order to run q.shutdown() + ev_start.set() + + if not immediate: + assert(len(res_gets) == len(res_puts)) + assert(res_gets.count(True) == res_puts.count(True)) + else: + assert(len(res_gets) <= len(res_puts)) + assert(res_gets.count(True) <= res_puts.count(True)) + + def test_shutdown_all_methods_in_many_threads(self): + return self._shutdown_all_methods_in_many_threads(False) + + def test_shutdown_immediate_all_methods_in_many_threads(self): + return self._shutdown_all_methods_in_many_threads(True) + + def _get(self, q, go, results, shutdown=False): + go.wait() + try: + msg = q.get() + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _get_shutdown(self, q, go, results): + return self._get(q, go, results, True) + + def _get_task_done(self, q, go, results): + go.wait() + try: + msg = q.get() + q.task_done() + results.append(True) + return msg + except self.queue.ShutDown: + results.append(False) + return False + + def _put(self, q, msg, go, results, shutdown=False): + go.wait() + try: + q.put(msg) + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _put_shutdown(self, q, msg, go, results): + return self._put(q, msg, go, results, True) + + def _join(self, q, results, shutdown=False): + try: + q.join() + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _join_shutdown(self, q, results): + return self._join(q, results, True) + + def _shutdown_get(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + # queue full + + if immediate: + thrds = ( + (self._get_shutdown, (q, go, results)), + (self._get_shutdown, (q, go, results)), + ) + else: + thrds = ( + # on shutdown(immediate=False) + # one of these threads shoud raise Shutdown + (self._get, (q, go, results)), + (self._get, (q, go, results)), + (self._get, (q, go, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + q.shutdown(immediate) + go.set() + for t in threads: + t.join() + if immediate: + self.assertListEqual(results, [True, True]) + else: + self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1)) + + def test_shutdown_get(self): + return self._shutdown_get(False) + + def test_shutdown_immediate_get(self): + return self._shutdown_get(True) + + def _shutdown_put(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + # queue fulled + + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._put_shutdown, (q, "W", go, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + q.shutdown() + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_put(self): + return self._shutdown_put(False) + + def test_shutdown_immediate_put(self): + return self._shutdown_put(True) + + def _shutdown_join(self, immediate): + q = self.type2test() + results = [] + q.put("Y") + go = threading.Event() + nb = q.qsize() + + if immediate: + thrds = ( + (self._join_shutdown, (q, results)), + (self._join_shutdown, (q, results)), + ) + else: + thrds = ( + (self._join, (q, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + if not immediate: + res = [] + for i in range(nb): + threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res))) + threads[-1].start() + q.shutdown(immediate) + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_immediate_join(self): + return self._shutdown_join(True) + + def test_shutdown_join(self): + return self._shutdown_join(False) + + def _shutdown_put_join(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + nb = q.qsize() + # queue not fulled + + if immediate: + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._join_shutdown, (q, results)), + ) + else: + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + if not immediate: + self.assertEqual(q.unfinished_tasks, nb) + for i in range(nb): + t = threading.Thread(target=q.task_done) + t.start() + threads.append(t) + go.set() + q.shutdown(immediate) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_immediate_put_join(self): + return self._shutdown_put_join(True) + + def test_shutdown_put_join(self): + return self._shutdown_put_join(False) + + def test_shutdown_get_task_done_join(self): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + self.assertEqual(q.unfinished_tasks, q.qsize()) + + thrds = ( + (self._get_task_done, (q, go, results)), + (self._get_task_done, (q, go, results)), + (self._join, (q, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + go.set() + q.shutdown(False) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + class QueueTest(BaseQueueTestMixin): def setUp(self): From 089eb960ac306d0cb2fd5f681565461ce12e58a0 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 6 May 2023 04:57:15 +0000 Subject: [PATCH 03/12] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst diff --git a/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst b/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst new file mode 100644 index 00000000000000..0ec0fa65e7d91b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst @@ -0,0 +1 @@ +Add queue.Queue termination with ``shutdown`` method From ca118d7245e12ed262a4944178333f1ff232a753 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 23 May 2023 00:37:47 +1000 Subject: [PATCH 04/12] Shut-down immediate consumes queue --- Lib/queue.py | 49 ++++++++------------------------ Lib/test/test_queue.py | 63 ++++++++++++++---------------------------- 2 files changed, 31 insertions(+), 81 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index f8a7ba072247f0..69d5481e4d36e0 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -29,10 +29,6 @@ class ShutDown(Exception): '''Raised when put/get with shut-down queue.''' -_queue_alive = "alive" -_queue_shutdown = "shutdown" -_queue_shutdown_immediate = "shutdown-immediate" - class Queue: '''Create a queue object with a given maximum size. @@ -63,7 +59,7 @@ def __init__(self, maxsize=0): self.unfinished_tasks = 0 # Queue shutdown state - self.shutdown_state = _queue_alive + self.is_shutdown = False def task_done(self): '''Indicate that a formerly enqueued task is complete. @@ -82,8 +78,6 @@ def task_done(self): Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: - if self._is_shutdown_immediate(): - raise ShutDown unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: @@ -103,12 +97,8 @@ def join(self): Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: - if self._is_shutdown_immediate(): - raise ShutDown while self.unfinished_tasks: self.all_tasks_done.wait() - if self._is_shutdown_immediate(): - raise ShutDown def qsize(self): '''Return the approximate size of the queue (not reliable!).''' @@ -154,7 +144,7 @@ def put(self, item, block=True, timeout=None): Raises ShutDown if the queue has been shut down. ''' with self.not_full: - if not self._is_alive(): + if self.is_shutdown: raise ShutDown if self.maxsize > 0: if not block: @@ -163,7 +153,7 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() - if not self._is_alive(): + if self.is_shutdown: raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") @@ -174,7 +164,7 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) - if not self._is_alive(): + if self.is_shutdown: raise ShutDown self._put(item) self.unfinished_tasks += 1 @@ -195,8 +185,7 @@ def get(self, block=True, timeout=None): or if the queue has been shut down immediately. ''' with self.not_empty: - if self._is_shutdown_immediate() or\ - (self._is_shutdown() and not self._qsize()): + if self.is_shutdown and not self._qsize(): raise ShutDown if not block: if not self._qsize(): @@ -204,7 +193,7 @@ def get(self, block=True, timeout=None): elif timeout is None: while not self._qsize(): self.not_empty.wait() - if self._is_shutdown_immediate(): + if self.is_shutdown and not self._qsize(): raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") @@ -215,7 +204,7 @@ def get(self, block=True, timeout=None): if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self._is_shutdown_immediate(): + if self.is_shutdown and not self._qsize(): raise ShutDown item = self._get() self.not_full.notify() @@ -247,32 +236,16 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The ShutDown exception is raised. ''' with self.mutex: - if self._is_shutdown_immediate(): - return + self.is_shutdown = True if immediate: - self._set_shutdown_immediate() + while self._qsize(): + self._get() + self.unfinished_tasks = 0 self.not_empty.notify_all() # release all blocked threads in `join()` self.all_tasks_done.notify_all() - else: - self._set_shutdown() self.not_full.notify_all() - def _is_alive(self): - return self.shutdown_state == _queue_alive - - def _is_shutdown(self): - return self.shutdown_state == _queue_shutdown - - def _is_shutdown_immediate(self): - return self.shutdown_state == _queue_shutdown_immediate - - def _set_shutdown(self): - self.shutdown_state = _queue_shutdown - - def _set_shutdown_immediate(self): - self.shutdown_state = _queue_shutdown_immediate - # Override these methods to implement other queue organizations # (e.g. stack or priority queue). diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index d9e840a7c861ed..2c6e6dd692bf76 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -267,16 +267,15 @@ def test_shutdown_immediate(self): def test_shutdown_allowed_transitions(self): # allowed transitions would be from alive via shutdown to immediate q = self.type2test() - self.assertEqual("alive", q.shutdown_state) + self.assertFalse(q.is_shutdown) q.shutdown() - self.assertEqual("shutdown", q.shutdown_state) + self.assertTrue(q.is_shutdown) q.shutdown(immediate=True) - self.assertEqual("shutdown-immediate", q.shutdown_state) + self.assertTrue(q.is_shutdown) q.shutdown(immediate=False) - self.assertNotEqual("shutdown", q.shutdown_state) def _shutdown_all_methods_in_one_thread(self, immediate): q = self.type2test(2) @@ -293,10 +292,9 @@ def _shutdown_all_methods_in_one_thread(self, immediate): q.get() with self.assertRaises(self.queue.ShutDown): q.get_nowait() - with self.assertRaises(self.queue.ShutDown): + with self.assertRaises(ValueError): q.task_done() - with self.assertRaises(self.queue.ShutDown): - q.join() + q.join() else: self.assertIn(q.get(), "LO") q.task_done() @@ -333,10 +331,7 @@ def _write_msg_thread(self, q, n, results, delay, event_end.set() time.sleep(delay) # end of all puts - try: - q.join() - except self.queue.ShutDown: - pass + q.join() def _read_msg_thread(self, q, nb, results, delay, event_start): event_start.wait() @@ -355,26 +350,17 @@ def _read_msg_thread(self, q, nb, results, delay, event_start): nb -= 1 except self.queue.Empty: pass - try: - q.join() - except self.queue.ShutDown: - pass + q.join() def _shutdown_thread(self, q, event_end, immediate): event_end.wait() q.shutdown(immediate) - try: - q.join() - except self.queue.ShutDown: - pass + q.join() def _join_thread(self, q, delay, event_start): event_start.wait() time.sleep(delay) - try: - q.join() - except self.queue.ShutDown: - pass + q.join() def _shutdown_all_methods_in_many_threads(self, immediate): q = self.type2test() @@ -413,6 +399,9 @@ def _shutdown_all_methods_in_many_threads(self, immediate): assert(len(res_gets) <= len(res_puts)) assert(res_gets.count(True) <= res_puts.count(True)) + for thread in ps[1:]: + thread.join() + def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False) @@ -544,15 +533,9 @@ def _shutdown_join(self, immediate): go = threading.Event() nb = q.qsize() - if immediate: - thrds = ( - (self._join_shutdown, (q, results)), - (self._join_shutdown, (q, results)), - ) - else: - thrds = ( - (self._join, (q, results)), - (self._join, (q, results)), + thrds = ( + (self._join, (q, results)), + (self._join, (q, results)), ) threads = [] for func, params in thrds: @@ -584,21 +567,15 @@ def _shutdown_put_join(self, immediate): nb = q.qsize() # queue not fulled - if immediate: - thrds = ( - (self._put_shutdown, (q, "E", go, results)), - (self._join_shutdown, (q, results)), - ) - else: - thrds = ( - (self._put_shutdown, (q, "E", go, results)), - (self._join, (q, results)), - ) + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._join, (q, results)), + ) threads = [] for func, params in thrds: threads.append(threading.Thread(target=func, args=params)) threads[-1].start() - if not immediate: + if not immediate or immediate: # TODO: dedent (minimising Git diff) self.assertEqual(q.unfinished_tasks, nb) for i in range(nb): t = threading.Thread(target=q.task_done) From 88f918d50f7eca1a8570604bf24cc5d7439bbbfe Mon Sep 17 00:00:00 2001 From: Laurie O Date: Fri, 8 Dec 2023 21:43:54 +1000 Subject: [PATCH 05/12] Update test typing --- Lib/test/test_queue.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 2c6e6dd692bf76..74bcc151bcdeac 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -10,6 +10,16 @@ from test.support import import_helper from test.support import threading_helper +import typing as t + +if t.TYPE_CHECKING: + import queue + + parent_class = unittest.TestCase + +else: + parent_class = object + # queue module depends on threading primitives threading_helper.requires_working_threading(module=True) @@ -55,7 +65,10 @@ def run(self): # is supposed to raise an exception, call do_exceptional_blocking_test() # instead. -class BlockingTestMixin: +class BlockingTestMixin(parent_class): + if t.TYPE_CHECKING: + queue = queue + type2test: t.Type[queue.Queue] def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): thread = _TriggerThread(trigger_func, trigger_args) @@ -797,7 +810,10 @@ class CFailingQueueTest(FailingQueueTest, unittest.TestCase): queue = c_queue -class BaseSimpleQueueTest: +class BaseSimpleQueueTest(parent_class): + if t.TYPE_CHECKING: + queue = queue + type2test: t.Type[queue.Queue] def setUp(self): self.q = self.type2test() From ab8d9753ac8b3fcc3e3a351d218a7d0e67980429 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Mon, 5 Feb 2024 11:29:02 +1000 Subject: [PATCH 06/12] Remove queue-size tasks instead of setting to zero --- Lib/queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/queue.py b/Lib/queue.py index 69d5481e4d36e0..6d7bf7f07aa59e 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -238,9 +238,10 @@ def shutdown(self, immediate=False): with self.mutex: self.is_shutdown = True if immediate: + n_items = self._qsize() while self._qsize(): self._get() - self.unfinished_tasks = 0 + self.unfinished_tasks -= n_items self.not_empty.notify_all() # release all blocked threads in `join()` self.all_tasks_done.notify_all() From 971f6994a9af9dac77d35a987018bd8fee3b9411 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 8 Feb 2024 19:33:14 +1000 Subject: [PATCH 07/12] Improve doc wording, reference methods --- Doc/library/queue.rst | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index 33d6f2f4c85e1c..1421fc2e552f0e 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -239,13 +239,15 @@ them down. .. method:: Queue.shutdown(immediate=False) - Shut-down the queue, making queue gets and puts raise :exc:`ShutDown`. + Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` raise + :exc:`ShutDown`. - By default, gets will only raise once the queue is empty. Set - *immediate* to true to make gets raise immediately instead. + By default, :meth:`~Queue.get` on a shut down queue will only raise once the + queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise + immediately instead. - All blocked callers of put() will be unblocked, and also get() - and join() if *immediate* is true. + All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate* + is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`. .. versionadded:: 3.13 From ddfb8c2390831e62baf5a9693427d989b09f017f Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 8 Feb 2024 19:34:37 +1000 Subject: [PATCH 08/12] Reference method in news entry --- .../next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst b/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst index 0ec0fa65e7d91b..0bace8d8bd425c 100644 --- a/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst +++ b/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst @@ -1 +1 @@ -Add queue.Queue termination with ``shutdown`` method +Add :py:class:`queue.Queue` termination with :py:meth:`~queue.Queue.shutdown`. From 3570bd8f2bd5fb2a150e856086eedd208dd111db Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 8 Feb 2024 19:42:05 +1000 Subject: [PATCH 09/12] Remove typing in test script --- Lib/test/test_queue.py | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 74bcc151bcdeac..2c6e6dd692bf76 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -10,16 +10,6 @@ from test.support import import_helper from test.support import threading_helper -import typing as t - -if t.TYPE_CHECKING: - import queue - - parent_class = unittest.TestCase - -else: - parent_class = object - # queue module depends on threading primitives threading_helper.requires_working_threading(module=True) @@ -65,10 +55,7 @@ def run(self): # is supposed to raise an exception, call do_exceptional_blocking_test() # instead. -class BlockingTestMixin(parent_class): - if t.TYPE_CHECKING: - queue = queue - type2test: t.Type[queue.Queue] +class BlockingTestMixin: def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): thread = _TriggerThread(trigger_func, trigger_args) @@ -810,10 +797,7 @@ class CFailingQueueTest(FailingQueueTest, unittest.TestCase): queue = c_queue -class BaseSimpleQueueTest(parent_class): - if t.TYPE_CHECKING: - queue = queue - type2test: t.Type[queue.Queue] +class BaseSimpleQueueTest: def setUp(self): self.q = self.type2test() From 9072c6f03844d2b99912402b2e5e61021959d27b Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 8 Feb 2024 20:48:12 +1000 Subject: [PATCH 10/12] Fix join after task-done with no get Also shut down before put-join in shutdown-put-join tests. Also remove indent --- Lib/queue.py | 2 +- Lib/test/test_queue.py | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index 6d7bf7f07aa59e..91a62fba2b8438 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -241,7 +241,7 @@ def shutdown(self, immediate=False): n_items = self._qsize() while self._qsize(): self._get() - self.unfinished_tasks -= n_items + self.unfinished_tasks = max(self.unfinished_tasks - n_items, 0) self.not_empty.notify_all() # release all blocked threads in `join()` self.all_tasks_done.notify_all() diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 2c6e6dd692bf76..e3d4d566cdda48 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -575,14 +575,13 @@ def _shutdown_put_join(self, immediate): for func, params in thrds: threads.append(threading.Thread(target=func, args=params)) threads[-1].start() - if not immediate or immediate: # TODO: dedent (minimising Git diff) - self.assertEqual(q.unfinished_tasks, nb) - for i in range(nb): - t = threading.Thread(target=q.task_done) - t.start() - threads.append(t) - go.set() + self.assertEqual(q.unfinished_tasks, nb) + for i in range(nb): + t = threading.Thread(target=q.task_done) + t.start() + threads.append(t) q.shutdown(immediate) + go.set() for t in threads: t.join() From e0927aae42f4c9032cb11cf68d8ba9c7af5e8dc0 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Fri, 9 Feb 2024 12:24:29 +1000 Subject: [PATCH 11/12] More explicitly update 'q.unfinished_tasks' --- Lib/queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index 91a62fba2b8438..467ff4fcecb134 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -241,13 +241,13 @@ def shutdown(self, immediate=False): n_items = self._qsize() while self._qsize(): self._get() - self.unfinished_tasks = max(self.unfinished_tasks - n_items, 0) + if self.unfinished_tasks > 0: + self.unfinished_tasks -= 1 self.not_empty.notify_all() # release all blocked threads in `join()` self.all_tasks_done.notify_all() self.not_full.notify_all() - # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held From 22adada848a6f06ab5f934c109174c2fe86aae0f Mon Sep 17 00:00:00 2001 From: Laurie O Date: Fri, 9 Feb 2024 12:27:47 +1000 Subject: [PATCH 12/12] Add what's new entry --- Doc/whatsnew/3.13.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index 2ac5afa8ce601c..6434803cdda83b 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -392,6 +392,13 @@ pdb command line option or :envvar:`PYTHONSAFEPATH` environment variable). (Contributed by Tian Gao and Christian Walther in :gh:`111762`.) +queue +----- + +* Add :meth:`queue.Queue.shutdown` (along with :exc:`queue.ShutDown`) for queue + termination. + (Contributed by Laurie Opperman and Yves Duprat in :gh:`104750`.) + re -- * Rename :exc:`!re.error` to :exc:`re.PatternError` for improved clarity.