1
1
"""Implementation of a thread-pool working with channels"""
2
2
from thread import WorkerThread
3
+ from threading import Lock
3
4
from task import InputChannelTask
4
5
from Queue import Queue , Empty
5
6
@@ -83,7 +84,7 @@ def _read(self, count=0, block=False, timeout=None):
83
84
#} END internal
84
85
85
86
86
- class ThreadPool (object ):
87
+ class Pool (object ):
87
88
"""A thread pool maintains a set of one or more worker threads, but supports
88
89
a fully serial mode in which case the amount of threads is zero.
89
90
@@ -106,88 +107,35 @@ class ThreadPool(object):
106
107
'_consumed_tasks' , # a queue with tasks that are done or had an error
107
108
'_workers' , # list of worker threads
108
109
'_queue' , # master queue for tasks
110
+ '_taskgraph_lock' , # lock for accessing the task graph
109
111
)
110
112
113
+ # CONFIGURATION
114
+ # The type of worker to create - its expected to provide the Thread interface,
115
+ # taking the taskqueue as only init argument
116
+ # as well as a method called stop_and_join() to terminate it
117
+ WorkerCls = None
118
+
119
+ # The type of lock to use to protect critical sections, providing the
120
+ # threading.Lock interface
121
+ LockCls = None
122
+
123
+ # the type of the task queue to use - it must provide the Queue interface
124
+ TaskQueueCls = None
125
+
126
+
111
127
def __init__ (self , size = 0 ):
112
128
self ._tasks = Graph ()
113
129
self ._consumed_tasks = Queue () # make sure its threadsafe
114
130
self ._workers = list ()
115
- self ._queue = Queue ()
131
+ self ._queue = self .TaskQueueCls ()
132
+ self ._taskgraph_lock = self .LockCls ()
116
133
self .set_size (size )
117
134
118
135
def __del__ (self ):
119
136
self .set_size (0 )
120
137
121
138
#{ Internal
122
- def _queue_feeder_visitor (self , task , count ):
123
- """Walk the graph and find tasks that are done for later cleanup, and
124
- queue all others for processing by our worker threads ( if available )."""
125
- if task .error () or task .is_done ():
126
- self ._consumed_tasks .put (task )
127
- return True
128
- # END stop processing
129
-
130
- # if the task does not have the required output on its queue, schedule
131
- # it for processing. If we should process all, we don't care about the
132
- # amount as it should process until its all done.
133
- if count < 1 or task ._out_wc .size () < count :
134
- # allow min-count override. This makes sure we take at least min-count
135
- # items off the input queue ( later )
136
- if task .min_count is not None and 0 < count < task .min_count :
137
- count = task .min_count
138
- # END handle min-count
139
-
140
- numchunks = 1
141
- chunksize = count
142
- remainder = 0
143
-
144
- # we need the count set for this - can't chunk up unlimited items
145
- # In serial mode we could do this by checking for empty input channels,
146
- # but in dispatch mode its impossible ( == not easily possible )
147
- # Only try it if we have enough demand
148
- if task .max_chunksize and count > task .max_chunksize :
149
- numchunks = count / task .max_chunksize
150
- chunksize = task .max_chunksize
151
- remainder = count - (numchunks * chunksize )
152
- # END handle chunking
153
-
154
- # the following loops are kind of unrolled - code duplication
155
- # should make things execute faster. Putting the if statements
156
- # into the loop would be less code, but ... slower
157
- print count , numchunks , chunksize , remainder , task ._out_wc .size ()
158
- if self ._workers :
159
- # respect the chunk size, and split the task up if we want
160
- # to process too much. This can be defined per task
161
- queue = self ._queue
162
- if numchunks > 1 :
163
- for i in xrange (numchunks ):
164
- queue .put ((task .process , chunksize ))
165
- # END for each chunk to put
166
- else :
167
- queue .put ((task .process , chunksize ))
168
- # END try efficient looping
169
-
170
- if remainder :
171
- queue .put ((task .process , remainder ))
172
- # END handle chunksize
173
- else :
174
- # no workers, so we have to do the work ourselves
175
- if numchunks > 1 :
176
- for i in xrange (numchunks ):
177
- task .process (chunksize )
178
- # END for each chunk to put
179
- else :
180
- task .process (chunksize )
181
- # END try efficient looping
182
-
183
- if remainder :
184
- task .process (remainder )
185
- # END handle chunksize
186
- # END handle serial mode
187
- # END handle queuing
188
-
189
- # always walk the whole graph, we want to find consumed tasks
190
- return True
191
139
192
140
def _prepare_channel_read (self , task , count ):
193
141
"""Process the tasks which depend on the given one to be sure the input
@@ -201,7 +149,98 @@ def _prepare_channel_read(self, task, count):
201
149
202
150
Tasks which are not done will be put onto the queue for processing, which
203
151
is fine as we walked them depth-first."""
204
- self ._tasks .visit_input_inclusive_depth_first (task , lambda n : self ._queue_feeder_visitor (n , count ))
152
+ dfirst_tasks = list ()
153
+ # for the walk, we must make sure the ordering does not change
154
+ # Note: the result of this could be cached
155
+ self ._tasks .visit_input_inclusive_depth_first (task , lambda n : dfirst_tasks .append (n ))
156
+
157
+ # check the min count on all involved tasks, and be sure that we don't
158
+ # have any task which produces less than the maximum min-count of all tasks
159
+ # The actual_count is used when chunking tasks up for the queue, whereas
160
+ # the count is usued to determine whether we still have enough output
161
+ # on the queue, checking qsize ( ->revise )
162
+ # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces
163
+ # at least 10, T-1 goes with 1, then T will block after 1 item, which
164
+ # is read by the client. On the next read of 1 item, we would find T's
165
+ # queue empty and put in another 10, which could put another thread into
166
+ # blocking state. T-1 produces one more item, which is consumed right away
167
+ # by the two threads running T. Although this works in the end, it leaves
168
+ # many threads blocking and waiting for input, which is not desired.
169
+ # Setting the min-count to the max of the mincount of all tasks assures
170
+ # we have enough items for all.
171
+ # Addition: in serial mode, we would enter a deadlock if one task would
172
+ # ever wait for items !
173
+ actual_count = count
174
+ min_counts = (((t .min_count is not None and t .min_count ) or count ) for t in dfirst_tasks )
175
+ min_count = reduce (lambda m1 , m2 : max (m1 , m2 ), min_counts )
176
+ if 0 < count < min_count :
177
+ actual_count = min_count
178
+ # END set actual count
179
+
180
+ # the list includes our tasks - the first one to evaluate first, the
181
+ # requested one last
182
+ for task in dfirst_tasks :
183
+ if task .error () or task .is_done ():
184
+ self ._consumed_tasks .put (task )
185
+ continue
186
+ # END skip processing
187
+
188
+ # if the task does not have the required output on its queue, schedule
189
+ # it for processing. If we should process all, we don't care about the
190
+ # amount as it should process until its all done.
191
+ # NOTE: revise this for multi-tasking - checking qsize doesnt work there !
192
+ if count < 1 or task ._out_wc .size () < count :
193
+ # but we continue to use the actual count to produce the output
194
+ numchunks = 1
195
+ chunksize = actual_count
196
+ remainder = 0
197
+
198
+ # we need the count set for this - can't chunk up unlimited items
199
+ # In serial mode we could do this by checking for empty input channels,
200
+ # but in dispatch mode its impossible ( == not easily possible )
201
+ # Only try it if we have enough demand
202
+ if task .max_chunksize and actual_count > task .max_chunksize :
203
+ numchunks = actual_count / task .max_chunksize
204
+ chunksize = task .max_chunksize
205
+ remainder = actual_count - (numchunks * chunksize )
206
+ # END handle chunking
207
+
208
+ # the following loops are kind of unrolled - code duplication
209
+ # should make things execute faster. Putting the if statements
210
+ # into the loop would be less code, but ... slower
211
+ print actual_count , numchunks , chunksize , remainder , task ._out_wc .size ()
212
+ if self ._workers :
213
+ # respect the chunk size, and split the task up if we want
214
+ # to process too much. This can be defined per task
215
+ queue = self ._queue
216
+ if numchunks > 1 :
217
+ for i in xrange (numchunks ):
218
+ queue .put ((task .process , chunksize ))
219
+ # END for each chunk to put
220
+ else :
221
+ queue .put ((task .process , chunksize ))
222
+ # END try efficient looping
223
+
224
+ if remainder :
225
+ queue .put ((task .process , remainder ))
226
+ # END handle chunksize
227
+ else :
228
+ # no workers, so we have to do the work ourselves
229
+ if numchunks > 1 :
230
+ for i in xrange (numchunks ):
231
+ task .process (chunksize )
232
+ # END for each chunk to put
233
+ else :
234
+ task .process (chunksize )
235
+ # END try efficient looping
236
+
237
+ if remainder :
238
+ task .process (remainder )
239
+ # END handle chunksize
240
+ # END handle serial mode
241
+ # END handle queuing
242
+ # END for each task to process
243
+
205
244
206
245
def _post_channel_read (self , task ):
207
246
"""Called after we processed a read to cleanup"""
@@ -250,7 +289,7 @@ def set_size(self, size=0):
250
289
cur_count = len (self ._workers )
251
290
if cur_count < size :
252
291
for i in range (size - cur_count ):
253
- worker = WorkerThread (self ._queue )
292
+ worker = self . WorkerCls (self ._queue )
254
293
worker .start ()
255
294
self ._workers .append (worker )
256
295
# END for each new worker to create
@@ -291,7 +330,12 @@ def del_task(self, task):
291
330
# keep its input nodes as we check whether they were orphaned
292
331
in_tasks = task .in_nodes
293
332
task .set_done ()
294
- self ._tasks .del_node (task )
333
+ self ._taskgraph_lock .acquire ()
334
+ try :
335
+ self ._tasks .del_node (task )
336
+ finally :
337
+ self ._taskgraph_lock .release ()
338
+ # END locked deletion
295
339
296
340
for t in in_tasks :
297
341
self ._del_task_if_orphaned (t )
@@ -314,16 +358,33 @@ def add_task(self, task):
314
358
task ._pool_ref = weakref .ref (self )
315
359
# END init input channel task
316
360
317
- self ._tasks .add_node (task )
361
+ self ._taskgraph_lock .acquire ()
362
+ try :
363
+ self ._tasks .add_node (task )
364
+ finally :
365
+ self ._taskgraph_lock .release ()
366
+ # END sync task addition
318
367
319
368
# If the input channel is one of our read channels, we add the relation
320
369
if has_input_channel :
321
370
ic = task .in_rc
322
371
if isinstance (ic , RPoolChannel ) and ic ._pool is self :
323
- self ._tasks .add_edge (ic ._task , task )
372
+ self ._taskgraph_lock .acquire ()
373
+ try :
374
+ self ._tasks .add_edge (ic ._task , task )
375
+ finally :
376
+ self ._taskgraph_lock .release ()
377
+ # END handle edge-adding
324
378
# END add task relation
325
379
# END handle input channels for connections
326
380
327
381
return rc
328
382
329
383
#} END interface
384
+
385
+
386
+ class ThreadPool (Pool ):
387
+ """A pool using threads as worker"""
388
+ WorkerCls = WorkerThread
389
+ LockCls = Lock
390
+ TaskQueueCls = Queue
0 commit comments