1
1
"""Implementation of a thread-pool working with channels"""
2
2
from thread import WorkerThread
3
- from threading import Lock
3
+
4
+ from threading import (
5
+ Lock ,
6
+ _Condition ,
7
+ _sleep ,
8
+ _time ,
9
+ )
10
+
4
11
from task import InputChannelTask
5
12
from Queue import Queue , Empty
13
+ from collections import deque
6
14
7
15
from graph import (
8
16
Graph ,
18
26
import sys
19
27
20
28
29
+ #{ Utilities
30
+
31
+ class SyncQueue (deque ):
32
+ """Adapter to allow using a deque like a queue, without locking"""
33
+ def get (self , block = True , timeout = None ):
34
+ try :
35
+ return self .pop ()
36
+ except IndexError :
37
+ raise Empty
38
+ # END raise empty
39
+
40
+ def empty (self ):
41
+ return len (self ) == 0
42
+
43
+ put = deque .append
44
+
45
+
46
+ class HSCondition (_Condition ):
47
+ """An attempt to make conditions less blocking, which gains performance
48
+ in return by sleeping less"""
49
+ delay = 0.00002 # reduces wait times, but increases overhead
50
+
51
+ def wait (self , timeout = None ):
52
+ waiter = Lock ()
53
+ waiter .acquire ()
54
+ self .__dict__ ['_Condition__waiters' ].append (waiter )
55
+ saved_state = self ._release_save ()
56
+ try : # restore state no matter what (e.g., KeyboardInterrupt)
57
+ if timeout is None :
58
+ waiter .acquire ()
59
+ else :
60
+ # Balancing act: We can't afford a pure busy loop, so we
61
+ # have to sleep; but if we sleep the whole timeout time,
62
+ # we'll be unresponsive. The scheme here sleeps very
63
+ # little at first, longer as time goes on, but never longer
64
+ # than 20 times per second (or the timeout time remaining).
65
+ endtime = _time () + timeout
66
+ delay = self .delay
67
+ acquire = waiter .acquire
68
+ while True :
69
+ gotit = acquire (0 )
70
+ if gotit :
71
+ break
72
+ remaining = endtime - _time ()
73
+ if remaining <= 0 :
74
+ break
75
+ delay = min (delay * 2 , remaining , .05 )
76
+ _sleep (delay )
77
+ # END endless loop
78
+ if not gotit :
79
+ try :
80
+ self .__dict__ ['_Condition__waiters' ].remove (waiter )
81
+ except ValueError :
82
+ pass
83
+ # END didn't ever get it
84
+ finally :
85
+ self ._acquire_restore (saved_state )
86
+
87
+ def notify (self , n = 1 ):
88
+ __waiters = self .__dict__ ['_Condition__waiters' ]
89
+ if not __waiters :
90
+ return
91
+ if n == 1 :
92
+ __waiters [0 ].release ()
93
+ try :
94
+ __waiters .pop (0 )
95
+ except IndexError :
96
+ pass
97
+ else :
98
+ waiters = __waiters [:n ]
99
+ for waiter in waiters :
100
+ waiter .release ()
101
+ try :
102
+ __waiters .remove (waiter )
103
+ except ValueError :
104
+ pass
105
+ # END handle n = 1 case faster
106
+
107
+ class PerfQueue (Queue ):
108
+ """A queue using different condition objects to gain multithreading performance"""
109
+ def __init__ (self , maxsize = 0 ):
110
+ Queue .__init__ (self , maxsize )
111
+
112
+ self .not_empty = HSCondition (self .mutex )
113
+ self .not_full = HSCondition (self .mutex )
114
+ self .all_tasks_done = HSCondition (self .mutex )
115
+
116
+
117
+ #} END utilities
118
+
21
119
class RPoolChannel (RChannel ):
22
120
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
23
121
before and after an item is to be read.
@@ -49,7 +147,7 @@ def set_post_cb(self, fun = lambda item: item):
49
147
returns a possibly changed item list. If it raises, the exception will be propagated.
50
148
If a function is not provided, the call is effectively uninstalled."""
51
149
self ._post_cb = fun
52
-
150
+
53
151
def read (self , count = 0 , block = True , timeout = None ):
54
152
"""Read an item that was processed by one of our threads
55
153
:note: Triggers task dependency handling needed to provide the necessary
@@ -58,8 +156,18 @@ def read(self, count=0, block=True, timeout=None):
58
156
self ._pre_cb ()
59
157
# END pre callback
60
158
159
+ # if we have count items, don't do any queue preparation - if someone
160
+ # depletes the queue in the meanwhile, the channel will close and
161
+ # we will unblock naturally
162
+ have_enough = False
163
+ if count > 0 :
164
+ # explicitly > count, as we want a certain safe range
165
+ have_enough = self ._wc ._queue .qsize () > count
166
+ # END risky game
167
+
61
168
########## prepare ##############################
62
- self ._pool ._prepare_channel_read (self ._task , count )
169
+ if not have_enough :
170
+ self ._pool ._prepare_channel_read (self ._task , count )
63
171
64
172
65
173
######### read data ######
@@ -127,9 +235,9 @@ class Pool(object):
127
235
128
236
def __init__ (self , size = 0 ):
129
237
self ._tasks = Graph ()
130
- self ._consumed_tasks = Queue () # make sure its threadsafe
238
+ self ._consumed_tasks = None
131
239
self ._workers = list ()
132
- self ._queue = self . TaskQueueCls ()
240
+ self ._queue = SyncQueue () # start with a sync queue
133
241
self ._taskgraph_lock = self .LockCls ()
134
242
self ._taskorder_cache = dict ()
135
243
self .set_size (size )
@@ -201,58 +309,60 @@ def _prepare_channel_read(self, task, count):
201
309
# if the task does not have the required output on its queue, schedule
202
310
# it for processing. If we should process all, we don't care about the
203
311
# amount as it should process until its all done.
204
- # NOTE: revise this for multi-tasking - checking qsize doesnt work there !
205
- if count < 1 or task . _out_wc . size () < count :
206
- # but we continue to use the actual count to produce the output
207
- numchunks = 1
208
- chunksize = actual_count
209
- remainder = 0
210
-
211
- # we need the count set for this - can't chunk up unlimited items
212
- # In serial mode we could do this by checking for empty input channels,
213
- # but in dispatch mode its impossible ( == not easily possible )
214
- # Only try it if we have enough demand
215
- if task . max_chunksize and actual_count > task . max_chunksize :
216
- numchunks = actual_count / task . max_chunksize
217
- chunksize = task . max_chunksize
218
- remainder = actual_count - ( numchunks * chunksize )
219
- # END handle chunking
220
-
221
- # the following loops are kind of unrolled - code duplication
222
- # should make things execute faster. Putting the if statements
223
- # into the loop would be less code, but ... slower
224
- # DEBUG
225
- # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
226
- if self . _workers :
227
- # respect the chunk size, and split the task up if we want
228
- # to process too much. This can be defined per task
229
- queue = self ._queue
230
- if numchunks > 1 :
231
- for i in xrange ( numchunks ):
232
- queue . put (( task . process , chunksize ))
233
- # END for each chunk to put
234
- else :
312
+ #if count > 1 and task._out_wc.size() >= count:
313
+ # continue
314
+ # END skip if we have enough
315
+
316
+ # but use the actual count to produce the output, we may produce
317
+ # more than requested
318
+ numchunks = 1
319
+ chunksize = actual_count
320
+ remainder = 0
321
+
322
+ # we need the count set for this - can't chunk up unlimited items
323
+ # In serial mode we could do this by checking for empty input channels,
324
+ # but in dispatch mode its impossible ( == not easily possible )
325
+ # Only try it if we have enough demand
326
+ if task . max_chunksize and actual_count > task . max_chunksize :
327
+ numchunks = actual_count / task . max_chunksize
328
+ chunksize = task . max_chunksize
329
+ remainder = actual_count - ( numchunks * chunksize )
330
+ # END handle chunking
331
+
332
+ # the following loops are kind of unrolled - code duplication
333
+ # should make things execute faster. Putting the if statements
334
+ # into the loop would be less code, but ... slower
335
+ # DEBUG
336
+ # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
337
+ if self ._workers :
338
+ # respect the chunk size, and split the task up if we want
339
+ # to process too much. This can be defined per task
340
+ queue = self . _queue
341
+ if numchunks > 1 :
342
+ for i in xrange ( numchunks ) :
235
343
queue .put ((task .process , chunksize ))
236
- # END try efficient looping
237
-
238
- if remainder :
239
- queue .put ((task .process , remainder ))
240
- # END handle chunksize
344
+ # END for each chunk to put
241
345
else :
242
- # no workers, so we have to do the work ourselves
243
- if numchunks > 1 :
244
- for i in xrange (numchunks ):
245
- task .process (chunksize )
246
- # END for each chunk to put
247
- else :
346
+ queue .put ((task .process , chunksize ))
347
+ # END try efficient looping
348
+
349
+ if remainder :
350
+ queue .put ((task .process , remainder ))
351
+ # END handle chunksize
352
+ else :
353
+ # no workers, so we have to do the work ourselves
354
+ if numchunks > 1 :
355
+ for i in xrange (numchunks ):
248
356
task .process (chunksize )
249
- # END try efficient looping
250
-
251
- if remainder :
252
- task .process (remainder )
253
- # END handle chunksize
254
- # END handle serial mode
255
- # END handle queuing
357
+ # END for each chunk to put
358
+ else :
359
+ task .process (chunksize )
360
+ # END try efficient looping
361
+
362
+ if remainder :
363
+ task .process (remainder )
364
+ # END handle chunksize
365
+ # END handle serial mode
256
366
# END for each task to process
257
367
258
368
@@ -297,11 +407,22 @@ def set_size(self, size=0):
297
407
otherwise the work will be distributed among the given amount of threads
298
408
299
409
:note: currently NOT threadsafe !"""
410
+ assert size > - 1 , "Size cannot be negative"
411
+
300
412
# either start new threads, or kill existing ones.
301
413
# If we end up with no threads, we process the remaining chunks on the queue
302
414
# ourselves
303
415
cur_count = len (self ._workers )
304
416
if cur_count < size :
417
+ # make sure we have a real queue, and can store our consumed tasks properly
418
+ if not isinstance (self ._queue , self .TaskQueueCls ):
419
+ if self ._queue is not None and not self ._queue .empty ():
420
+ raise AssertionError ("Expected empty queue when switching the queue type" )
421
+ # END safety check
422
+ self ._queue = self .TaskQueueCls ()
423
+ self ._consumed_tasks = Queue ()
424
+ # END init queue
425
+
305
426
for i in range (size - cur_count ):
306
427
worker = self .WorkerCls (self ._queue )
307
428
worker .start ()
@@ -323,6 +444,16 @@ def set_size(self, size=0):
323
444
except Queue .Empty :
324
445
continue
325
446
# END while there are tasks on the queue
447
+
448
+ # use a serial queue, its faster
449
+ if not isinstance (self ._queue , SyncQueue ):
450
+ self ._queue = SyncQueue ()
451
+ # END handle queue type
452
+
453
+ if self ._consumed_tasks and not self ._consumed_tasks .empty ():
454
+ self ._post_channel_read (self ._consumed_tasks .pop ())
455
+ # END assure consumed tasks are empty
456
+ self ._consumed_tasks = SyncQueue ()
326
457
# END process queue
327
458
return self
328
459
@@ -403,4 +534,4 @@ class ThreadPool(Pool):
403
534
"""A pool using threads as worker"""
404
535
WorkerCls = WorkerThread
405
536
LockCls = Lock
406
- TaskQueueCls = Queue
537
+ TaskQueueCls = PerfQueue
0 commit comments