Skip to content

Commit 6567d1d

Browse files
committed
update queue to 3.13.3
1 parent a5214a0 commit 6567d1d

File tree

2 files changed

+475
-1
lines changed

2 files changed

+475
-1
lines changed

Lib/queue.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@
1010
except ImportError:
1111
SimpleQueue = None
1212

13-
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
13+
__all__ = [
14+
'Empty',
15+
'Full',
16+
'ShutDown',
17+
'Queue',
18+
'PriorityQueue',
19+
'LifoQueue',
20+
'SimpleQueue',
21+
]
1422

1523

1624
try:
@@ -25,6 +33,10 @@ class Full(Exception):
2533
pass
2634

2735

36+
class ShutDown(Exception):
37+
'''Raised when put/get with shut-down queue.'''
38+
39+
2840
class Queue:
2941
'''Create a queue object with a given maximum size.
3042
@@ -54,6 +66,9 @@ def __init__(self, maxsize=0):
5466
self.all_tasks_done = threading.Condition(self.mutex)
5567
self.unfinished_tasks = 0
5668

69+
# Queue shutdown state
70+
self.is_shutdown = False
71+
5772
def task_done(self):
5873
'''Indicate that a formerly enqueued task is complete.
5974
@@ -65,6 +80,9 @@ def task_done(self):
6580
have been processed (meaning that a task_done() call was received
6681
for every item that had been put() into the queue).
6782
83+
shutdown(immediate=True) calls task_done() for each remaining item in
84+
the queue.
85+
6886
Raises a ValueError if called more times than there were items
6987
placed in the queue.
7088
'''
@@ -129,15 +147,21 @@ def put(self, item, block=True, timeout=None):
129147
Otherwise ('block' is false), put an item on the queue if a free slot
130148
is immediately available, else raise the Full exception ('timeout'
131149
is ignored in that case).
150+
151+
Raises ShutDown if the queue has been shut down.
132152
'''
133153
with self.not_full:
154+
if self.is_shutdown:
155+
raise ShutDown
134156
if self.maxsize > 0:
135157
if not block:
136158
if self._qsize() >= self.maxsize:
137159
raise Full
138160
elif timeout is None:
139161
while self._qsize() >= self.maxsize:
140162
self.not_full.wait()
163+
if self.is_shutdown:
164+
raise ShutDown
141165
elif timeout < 0:
142166
raise ValueError("'timeout' must be a non-negative number")
143167
else:
@@ -147,6 +171,8 @@ def put(self, item, block=True, timeout=None):
147171
if remaining <= 0.0:
148172
raise Full
149173
self.not_full.wait(remaining)
174+
if self.is_shutdown:
175+
raise ShutDown
150176
self._put(item)
151177
self.unfinished_tasks += 1
152178
self.not_empty.notify()
@@ -161,14 +187,21 @@ def get(self, block=True, timeout=None):
161187
Otherwise ('block' is false), return an item if one is immediately
162188
available, else raise the Empty exception ('timeout' is ignored
163189
in that case).
190+
191+
Raises ShutDown if the queue has been shut down and is empty,
192+
or if the queue has been shut down immediately.
164193
'''
165194
with self.not_empty:
195+
if self.is_shutdown and not self._qsize():
196+
raise ShutDown
166197
if not block:
167198
if not self._qsize():
168199
raise Empty
169200
elif timeout is None:
170201
while not self._qsize():
171202
self.not_empty.wait()
203+
if self.is_shutdown and not self._qsize():
204+
raise ShutDown
172205
elif timeout < 0:
173206
raise ValueError("'timeout' must be a non-negative number")
174207
else:
@@ -178,6 +211,8 @@ def get(self, block=True, timeout=None):
178211
if remaining <= 0.0:
179212
raise Empty
180213
self.not_empty.wait(remaining)
214+
if self.is_shutdown and not self._qsize():
215+
raise ShutDown
181216
item = self._get()
182217
self.not_full.notify()
183218
return item
@@ -198,6 +233,29 @@ def get_nowait(self):
198233
'''
199234
return self.get(block=False)
200235

236+
def shutdown(self, immediate=False):
237+
'''Shut-down the queue, making queue gets and puts raise ShutDown.
238+
239+
By default, gets will only raise once the queue is empty. Set
240+
'immediate' to True to make gets raise immediately instead.
241+
242+
All blocked callers of put() and get() will be unblocked. If
243+
'immediate', a task is marked as done for each item remaining in
244+
the queue, which may unblock callers of join().
245+
'''
246+
with self.mutex:
247+
self.is_shutdown = True
248+
if immediate:
249+
while self._qsize():
250+
self._get()
251+
if self.unfinished_tasks > 0:
252+
self.unfinished_tasks -= 1
253+
# release all blocked threads in `join()`
254+
self.all_tasks_done.notify_all()
255+
# All getters need to re-check queue-empty to raise ShutDown
256+
self.not_empty.notify_all()
257+
self.not_full.notify_all()
258+
201259
# Override these methods to implement other queue organizations
202260
# (e.g. stack or priority queue).
203261
# These will only be called with appropriate locks held

0 commit comments

Comments
 (0)