1
1
"""Implementation of a thread-pool working with channels"""
2
2
from thread import WorkerThread
3
+ from threading import Lock
3
4
4
- from threading import (
5
- Lock ,
6
- _Condition ,
7
- _sleep ,
8
- _time ,
5
+ from util import (
6
+ SyncQueue ,
7
+ AsyncQueue ,
9
8
)
10
9
11
10
from task import InputChannelTask
12
- from Queue import Queue , Empty
13
- from collections import deque
14
-
15
- from graph import (
16
- Graph ,
11
+ from Queue import (
12
+ Queue ,
13
+ Empty
17
14
)
18
15
16
+ from graph import Graph
19
17
from channel import (
20
18
Channel ,
21
19
WChannel ,
22
20
RChannel
23
21
)
24
22
25
- import weakref
26
23
import sys
27
24
28
25
29
- #{ Utilities
30
-
31
- class SyncQueue (deque ):
32
- """Adapter to allow using a deque like a queue, without locking"""
33
- def get (self , block = True , timeout = None ):
34
- try :
35
- return self .pop ()
36
- except IndexError :
37
- raise Empty
38
- # END raise empty
39
-
40
- def empty (self ):
41
- return len (self ) == 0
42
-
43
- put = deque .append
44
-
45
-
46
- class HSCondition (_Condition ):
47
- """An attempt to make conditions less blocking, which gains performance
48
- in return by sleeping less"""
49
- delay = 0.00002 # reduces wait times, but increases overhead
50
-
51
- def wait (self , timeout = None ):
52
- waiter = Lock ()
53
- waiter .acquire ()
54
- self .__dict__ ['_Condition__waiters' ].append (waiter )
55
- saved_state = self ._release_save ()
56
- try : # restore state no matter what (e.g., KeyboardInterrupt)
57
- if timeout is None :
58
- waiter .acquire ()
59
- else :
60
- # Balancing act: We can't afford a pure busy loop, so we
61
- # have to sleep; but if we sleep the whole timeout time,
62
- # we'll be unresponsive. The scheme here sleeps very
63
- # little at first, longer as time goes on, but never longer
64
- # than 20 times per second (or the timeout time remaining).
65
- endtime = _time () + timeout
66
- delay = self .delay
67
- acquire = waiter .acquire
68
- while True :
69
- gotit = acquire (0 )
70
- if gotit :
71
- break
72
- remaining = endtime - _time ()
73
- if remaining <= 0 :
74
- break
75
- delay = min (delay * 2 , remaining , .05 )
76
- _sleep (delay )
77
- # END endless loop
78
- if not gotit :
79
- try :
80
- self .__dict__ ['_Condition__waiters' ].remove (waiter )
81
- except ValueError :
82
- pass
83
- # END didn't ever get it
84
- finally :
85
- self ._acquire_restore (saved_state )
86
-
87
- def notify (self , n = 1 ):
88
- __waiters = self .__dict__ ['_Condition__waiters' ]
89
- if not __waiters :
90
- return
91
- if n == 1 :
92
- __waiters [0 ].release ()
93
- try :
94
- __waiters .pop (0 )
95
- except IndexError :
96
- pass
97
- else :
98
- waiters = __waiters [:n ]
99
- for waiter in waiters :
100
- waiter .release ()
101
- try :
102
- __waiters .remove (waiter )
103
- except ValueError :
104
- pass
105
- # END handle n = 1 case faster
106
-
107
- class PerfQueue (Queue ):
108
- """A queue using different condition objects to gain multithreading performance"""
109
- def __init__ (self , maxsize = 0 ):
110
- Queue .__init__ (self , maxsize )
111
-
112
- self .not_empty = HSCondition (self .mutex )
113
- self .not_full = HSCondition (self .mutex )
114
- self .all_tasks_done = HSCondition (self .mutex )
115
-
116
-
117
- #} END utilities
118
-
119
26
class RPoolChannel (RChannel ):
120
27
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
121
28
before and after an item is to be read.
@@ -237,7 +144,7 @@ def __init__(self, size=0):
237
144
self ._tasks = Graph ()
238
145
self ._consumed_tasks = None
239
146
self ._workers = list ()
240
- self ._queue = SyncQueue () # start with a sync queue
147
+ self ._queue = self . TaskQueueCls ()
241
148
self ._taskgraph_lock = self .LockCls ()
242
149
self ._taskorder_cache = dict ()
243
150
self .set_size (size )
@@ -375,7 +282,10 @@ def _post_channel_read(self, task):
375
282
self ._consumed_tasks .put (task )
376
283
# END handle consumption
377
284
378
- # delete consumed tasks to cleanup
285
+ self ._handle_consumed_tasks ()
286
+
287
+ def _handle_consumed_tasks (self ):
288
+ """Remove all consumed tasks from our queue by deleting them"""
379
289
try :
380
290
while True :
381
291
ct = self ._consumed_tasks .get (False )
@@ -384,7 +294,7 @@ def _post_channel_read(self, task):
384
294
except Empty :
385
295
pass
386
296
# END pop queue empty
387
-
297
+
388
298
def _del_task_if_orphaned (self , task ):
389
299
"""Check the task, and delete it if it is orphaned"""
390
300
if sys .getrefcount (task ._out_wc ) < 3 :
@@ -415,11 +325,7 @@ def set_size(self, size=0):
415
325
cur_count = len (self ._workers )
416
326
if cur_count < size :
417
327
# make sure we have a real queue, and can store our consumed tasks properly
418
- if not isinstance (self ._queue , self .TaskQueueCls ):
419
- if self ._queue is not None and not self ._queue .empty ():
420
- raise AssertionError ("Expected empty queue when switching the queue type" )
421
- # END safety check
422
- self ._queue = self .TaskQueueCls ()
328
+ if not isinstance (self ._consumed_tasks , self .TaskQueueCls ):
423
329
self ._consumed_tasks = Queue ()
424
330
# END init queue
425
331
@@ -445,13 +351,8 @@ def set_size(self, size=0):
445
351
continue
446
352
# END while there are tasks on the queue
447
353
448
- # use a serial queue, its faster
449
- if not isinstance (self ._queue , SyncQueue ):
450
- self ._queue = SyncQueue ()
451
- # END handle queue type
452
-
453
354
if self ._consumed_tasks and not self ._consumed_tasks .empty ():
454
- self ._post_channel_read ( self . _consumed_tasks . pop () )
355
+ self ._handle_consumed_tasks ( )
455
356
# END assure consumed tasks are empty
456
357
self ._consumed_tasks = SyncQueue ()
457
358
# END process queue
@@ -467,6 +368,8 @@ def del_task(self, task):
467
368
output channel is only held by themselves, so no one will ever consume
468
369
its items.
469
370
371
+ This method blocks until all tasks to be removed have been processed, if
372
+ they are currently being processed.
470
373
:return: self"""
471
374
# now delete our actual node - must set it done os it closes its channels.
472
375
# Otherwise further reads of output tasks will block.
@@ -478,6 +381,21 @@ def del_task(self, task):
478
381
self ._taskgraph_lock .acquire ()
479
382
try :
480
383
self ._taskorder_cache .clear ()
384
+ # before we can delete the task, make sure its write channel
385
+ # is closed, otherwise people might still be waiting for its result.
386
+ # If a channel is not closed, this could also mean its not yet fully
387
+ # processed, but more importantly, there must be no task being processed
388
+ # right now.
389
+ # TODO: figure this out
390
+ for worker in self ._workers :
391
+ r = worker .routine ()
392
+ if r and r .im_self is task :
393
+ raise NotImplementedError ("todo" )
394
+ # END handle running task
395
+ # END check for in-progress routine
396
+
397
+ # its done, close the channel for writing
398
+ task .close ()
481
399
self ._tasks .del_node (task )
482
400
finally :
483
401
self ._taskgraph_lock .release ()
@@ -497,11 +415,11 @@ def add_task(self, task):
497
415
# create a write channel for it
498
416
wc , rc = Channel ()
499
417
rc = RPoolChannel (wc , task , self )
500
- task ._out_wc = wc
418
+ task .set_wc ( wc )
501
419
502
420
has_input_channel = isinstance (task , InputChannelTask )
503
421
if has_input_channel :
504
- task ._pool_ref = weakref . ref (self )
422
+ task .set_pool (self )
505
423
# END init input channel task
506
424
507
425
self ._taskgraph_lock .acquire ()
@@ -534,4 +452,4 @@ class ThreadPool(Pool):
534
452
"""A pool using threads as worker"""
535
453
WorkerCls = WorkerThread
536
454
LockCls = Lock
537
- TaskQueueCls = PerfQueue
455
+ TaskQueueCls = AsyncQueue
0 commit comments