10
10
except ImportError :
11
11
SimpleQueue = None
12
12
13
- __all__ = ['Empty' , 'Full' , 'Queue' , 'PriorityQueue' , 'LifoQueue' , 'SimpleQueue' ]
13
+ __all__ = [
14
+ 'Empty' ,
15
+ 'Full' ,
16
+ 'ShutDown' ,
17
+ 'Queue' ,
18
+ 'PriorityQueue' ,
19
+ 'LifoQueue' ,
20
+ 'SimpleQueue' ,
21
+ ]
14
22
15
23
16
24
try :
@@ -25,6 +33,10 @@ class Full(Exception):
25
33
pass
26
34
27
35
36
+ class ShutDown (Exception ):
37
+ '''Raised when put/get with shut-down queue.'''
38
+
39
+
28
40
class Queue :
29
41
'''Create a queue object with a given maximum size.
30
42
@@ -54,6 +66,9 @@ def __init__(self, maxsize=0):
54
66
self .all_tasks_done = threading .Condition (self .mutex )
55
67
self .unfinished_tasks = 0
56
68
69
+ # Queue shutdown state
70
+ self .is_shutdown = False
71
+
57
72
def task_done (self ):
58
73
'''Indicate that a formerly enqueued task is complete.
59
74
@@ -65,6 +80,9 @@ def task_done(self):
65
80
have been processed (meaning that a task_done() call was received
66
81
for every item that had been put() into the queue).
67
82
83
+ shutdown(immediate=True) calls task_done() for each remaining item in
84
+ the queue.
85
+
68
86
Raises a ValueError if called more times than there were items
69
87
placed in the queue.
70
88
'''
@@ -129,15 +147,21 @@ def put(self, item, block=True, timeout=None):
129
147
Otherwise ('block' is false), put an item on the queue if a free slot
130
148
is immediately available, else raise the Full exception ('timeout'
131
149
is ignored in that case).
150
+
151
+ Raises ShutDown if the queue has been shut down.
132
152
'''
133
153
with self .not_full :
154
+ if self .is_shutdown :
155
+ raise ShutDown
134
156
if self .maxsize > 0 :
135
157
if not block :
136
158
if self ._qsize () >= self .maxsize :
137
159
raise Full
138
160
elif timeout is None :
139
161
while self ._qsize () >= self .maxsize :
140
162
self .not_full .wait ()
163
+ if self .is_shutdown :
164
+ raise ShutDown
141
165
elif timeout < 0 :
142
166
raise ValueError ("'timeout' must be a non-negative number" )
143
167
else :
@@ -147,6 +171,8 @@ def put(self, item, block=True, timeout=None):
147
171
if remaining <= 0.0 :
148
172
raise Full
149
173
self .not_full .wait (remaining )
174
+ if self .is_shutdown :
175
+ raise ShutDown
150
176
self ._put (item )
151
177
self .unfinished_tasks += 1
152
178
self .not_empty .notify ()
@@ -161,14 +187,21 @@ def get(self, block=True, timeout=None):
161
187
Otherwise ('block' is false), return an item if one is immediately
162
188
available, else raise the Empty exception ('timeout' is ignored
163
189
in that case).
190
+
191
+ Raises ShutDown if the queue has been shut down and is empty,
192
+ or if the queue has been shut down immediately.
164
193
'''
165
194
with self .not_empty :
195
+ if self .is_shutdown and not self ._qsize ():
196
+ raise ShutDown
166
197
if not block :
167
198
if not self ._qsize ():
168
199
raise Empty
169
200
elif timeout is None :
170
201
while not self ._qsize ():
171
202
self .not_empty .wait ()
203
+ if self .is_shutdown and not self ._qsize ():
204
+ raise ShutDown
172
205
elif timeout < 0 :
173
206
raise ValueError ("'timeout' must be a non-negative number" )
174
207
else :
@@ -178,6 +211,8 @@ def get(self, block=True, timeout=None):
178
211
if remaining <= 0.0 :
179
212
raise Empty
180
213
self .not_empty .wait (remaining )
214
+ if self .is_shutdown and not self ._qsize ():
215
+ raise ShutDown
181
216
item = self ._get ()
182
217
self .not_full .notify ()
183
218
return item
@@ -198,6 +233,29 @@ def get_nowait(self):
198
233
'''
199
234
return self .get (block = False )
200
235
236
+ def shutdown (self , immediate = False ):
237
+ '''Shut-down the queue, making queue gets and puts raise ShutDown.
238
+
239
+ By default, gets will only raise once the queue is empty. Set
240
+ 'immediate' to True to make gets raise immediately instead.
241
+
242
+ All blocked callers of put() and get() will be unblocked. If
243
+ 'immediate', a task is marked as done for each item remaining in
244
+ the queue, which may unblock callers of join().
245
+ '''
246
+ with self .mutex :
247
+ self .is_shutdown = True
248
+ if immediate :
249
+ while self ._qsize ():
250
+ self ._get ()
251
+ if self .unfinished_tasks > 0 :
252
+ self .unfinished_tasks -= 1
253
+ # release all blocked threads in `join()`
254
+ self .all_tasks_done .notify_all ()
255
+ # All getters need to re-check queue-empty to raise ShutDown
256
+ self .not_empty .notify_all ()
257
+ self .not_full .notify_all ()
258
+
201
259
# Override these methods to implement other queue organizations
202
260
# (e.g. stack or priority queue).
203
261
# These will only be called with appropriate locks held
0 commit comments