Skip to content

Commit 619662a

Browse files
committed
changed scheduling and chunksize calculation in respect to the task.min_count, to fix theoretical option for a deadlock in serial mode, and unnecessary blocking in async mode
1 parent a8a448b commit 619662a

File tree

1 file changed

+137
-76
lines changed

1 file changed

+137
-76
lines changed

lib/git/async/pool.py

Lines changed: 137 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Implementation of a thread-pool working with channels"""
22
from thread import WorkerThread
3+
from threading import Lock
34
from task import InputChannelTask
45
from Queue import Queue, Empty
56

@@ -83,7 +84,7 @@ def _read(self, count=0, block=False, timeout=None):
8384
#} END internal
8485

8586

86-
class ThreadPool(object):
87+
class Pool(object):
8788
"""A thread pool maintains a set of one or more worker threads, but supports
8889
a fully serial mode in which case the amount of threads is zero.
8990
@@ -106,88 +107,35 @@ class ThreadPool(object):
106107
'_consumed_tasks', # a queue with tasks that are done or had an error
107108
'_workers', # list of worker threads
108109
'_queue', # master queue for tasks
110+
'_taskgraph_lock', # lock for accessing the task graph
109111
)
110112

113+
# CONFIGURATION
114+
# The type of worker to create - its expected to provide the Thread interface,
115+
# taking the taskqueue as only init argument
116+
# as well as a method called stop_and_join() to terminate it
117+
WorkerCls = None
118+
119+
# The type of lock to use to protect critical sections, providing the
120+
# threading.Lock interface
121+
LockCls = None
122+
123+
# the type of the task queue to use - it must provide the Queue interface
124+
TaskQueueCls = None
125+
126+
111127
def __init__(self, size=0):
112128
self._tasks = Graph()
113129
self._consumed_tasks = Queue() # make sure its threadsafe
114130
self._workers = list()
115-
self._queue = Queue()
131+
self._queue = self.TaskQueueCls()
132+
self._taskgraph_lock = self.LockCls()
116133
self.set_size(size)
117134

118135
def __del__(self):
119136
self.set_size(0)
120137

121138
#{ Internal
122-
def _queue_feeder_visitor(self, task, count):
123-
"""Walk the graph and find tasks that are done for later cleanup, and
124-
queue all others for processing by our worker threads ( if available )."""
125-
if task.error() or task.is_done():
126-
self._consumed_tasks.put(task)
127-
return True
128-
# END stop processing
129-
130-
# if the task does not have the required output on its queue, schedule
131-
# it for processing. If we should process all, we don't care about the
132-
# amount as it should process until its all done.
133-
if count < 1 or task._out_wc.size() < count:
134-
# allow min-count override. This makes sure we take at least min-count
135-
# items off the input queue ( later )
136-
if task.min_count is not None and 0 < count < task.min_count:
137-
count = task.min_count
138-
# END handle min-count
139-
140-
numchunks = 1
141-
chunksize = count
142-
remainder = 0
143-
144-
# we need the count set for this - can't chunk up unlimited items
145-
# In serial mode we could do this by checking for empty input channels,
146-
# but in dispatch mode its impossible ( == not easily possible )
147-
# Only try it if we have enough demand
148-
if task.max_chunksize and count > task.max_chunksize:
149-
numchunks = count / task.max_chunksize
150-
chunksize = task.max_chunksize
151-
remainder = count - (numchunks * chunksize)
152-
# END handle chunking
153-
154-
# the following loops are kind of unrolled - code duplication
155-
# should make things execute faster. Putting the if statements
156-
# into the loop would be less code, but ... slower
157-
print count, numchunks, chunksize, remainder, task._out_wc.size()
158-
if self._workers:
159-
# respect the chunk size, and split the task up if we want
160-
# to process too much. This can be defined per task
161-
queue = self._queue
162-
if numchunks > 1:
163-
for i in xrange(numchunks):
164-
queue.put((task.process, chunksize))
165-
# END for each chunk to put
166-
else:
167-
queue.put((task.process, chunksize))
168-
# END try efficient looping
169-
170-
if remainder:
171-
queue.put((task.process, remainder))
172-
# END handle chunksize
173-
else:
174-
# no workers, so we have to do the work ourselves
175-
if numchunks > 1:
176-
for i in xrange(numchunks):
177-
task.process(chunksize)
178-
# END for each chunk to put
179-
else:
180-
task.process(chunksize)
181-
# END try efficient looping
182-
183-
if remainder:
184-
task.process(remainder)
185-
# END handle chunksize
186-
# END handle serial mode
187-
# END handle queuing
188-
189-
# always walk the whole graph, we want to find consumed tasks
190-
return True
191139

192140
def _prepare_channel_read(self, task, count):
193141
"""Process the tasks which depend on the given one to be sure the input
@@ -201,7 +149,98 @@ def _prepare_channel_read(self, task, count):
201149
202150
Tasks which are not done will be put onto the queue for processing, which
203151
is fine as we walked them depth-first."""
204-
self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
152+
dfirst_tasks = list()
153+
# for the walk, we must make sure the ordering does not change
154+
# Note: the result of this could be cached
155+
self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n))
156+
157+
# check the min count on all involved tasks, and be sure that we don't
158+
# have any task which produces less than the maximum min-count of all tasks
159+
# The actual_count is used when chunking tasks up for the queue, whereas
160+
# the count is usued to determine whether we still have enough output
161+
# on the queue, checking qsize ( ->revise )
162+
# ABTRACT: If T depends on T-1, and the client wants 1 item, T produces
163+
# at least 10, T-1 goes with 1, then T will block after 1 item, which
164+
# is read by the client. On the next read of 1 item, we would find T's
165+
# queue empty and put in another 10, which could put another thread into
166+
# blocking state. T-1 produces one more item, which is consumed right away
167+
# by the two threads running T. Although this works in the end, it leaves
168+
# many threads blocking and waiting for input, which is not desired.
169+
# Setting the min-count to the max of the mincount of all tasks assures
170+
# we have enough items for all.
171+
# Addition: in serial mode, we would enter a deadlock if one task would
172+
# ever wait for items !
173+
actual_count = count
174+
min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks)
175+
min_count = reduce(lambda m1, m2: max(m1, m2), min_counts)
176+
if 0 < count < min_count:
177+
actual_count = min_count
178+
# END set actual count
179+
180+
# the list includes our tasks - the first one to evaluate first, the
181+
# requested one last
182+
for task in dfirst_tasks:
183+
if task.error() or task.is_done():
184+
self._consumed_tasks.put(task)
185+
continue
186+
# END skip processing
187+
188+
# if the task does not have the required output on its queue, schedule
189+
# it for processing. If we should process all, we don't care about the
190+
# amount as it should process until its all done.
191+
# NOTE: revise this for multi-tasking - checking qsize doesnt work there !
192+
if count < 1 or task._out_wc.size() < count:
193+
# but we continue to use the actual count to produce the output
194+
numchunks = 1
195+
chunksize = actual_count
196+
remainder = 0
197+
198+
# we need the count set for this - can't chunk up unlimited items
199+
# In serial mode we could do this by checking for empty input channels,
200+
# but in dispatch mode its impossible ( == not easily possible )
201+
# Only try it if we have enough demand
202+
if task.max_chunksize and actual_count > task.max_chunksize:
203+
numchunks = actual_count / task.max_chunksize
204+
chunksize = task.max_chunksize
205+
remainder = actual_count - (numchunks * chunksize)
206+
# END handle chunking
207+
208+
# the following loops are kind of unrolled - code duplication
209+
# should make things execute faster. Putting the if statements
210+
# into the loop would be less code, but ... slower
211+
print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
212+
if self._workers:
213+
# respect the chunk size, and split the task up if we want
214+
# to process too much. This can be defined per task
215+
queue = self._queue
216+
if numchunks > 1:
217+
for i in xrange(numchunks):
218+
queue.put((task.process, chunksize))
219+
# END for each chunk to put
220+
else:
221+
queue.put((task.process, chunksize))
222+
# END try efficient looping
223+
224+
if remainder:
225+
queue.put((task.process, remainder))
226+
# END handle chunksize
227+
else:
228+
# no workers, so we have to do the work ourselves
229+
if numchunks > 1:
230+
for i in xrange(numchunks):
231+
task.process(chunksize)
232+
# END for each chunk to put
233+
else:
234+
task.process(chunksize)
235+
# END try efficient looping
236+
237+
if remainder:
238+
task.process(remainder)
239+
# END handle chunksize
240+
# END handle serial mode
241+
# END handle queuing
242+
# END for each task to process
243+
205244

206245
def _post_channel_read(self, task):
207246
"""Called after we processed a read to cleanup"""
@@ -250,7 +289,7 @@ def set_size(self, size=0):
250289
cur_count = len(self._workers)
251290
if cur_count < size:
252291
for i in range(size - cur_count):
253-
worker = WorkerThread(self._queue)
292+
worker = self.WorkerCls(self._queue)
254293
worker.start()
255294
self._workers.append(worker)
256295
# END for each new worker to create
@@ -291,7 +330,12 @@ def del_task(self, task):
291330
# keep its input nodes as we check whether they were orphaned
292331
in_tasks = task.in_nodes
293332
task.set_done()
294-
self._tasks.del_node(task)
333+
self._taskgraph_lock.acquire()
334+
try:
335+
self._tasks.del_node(task)
336+
finally:
337+
self._taskgraph_lock.release()
338+
# END locked deletion
295339

296340
for t in in_tasks:
297341
self._del_task_if_orphaned(t)
@@ -314,16 +358,33 @@ def add_task(self, task):
314358
task._pool_ref = weakref.ref(self)
315359
# END init input channel task
316360

317-
self._tasks.add_node(task)
361+
self._taskgraph_lock.acquire()
362+
try:
363+
self._tasks.add_node(task)
364+
finally:
365+
self._taskgraph_lock.release()
366+
# END sync task addition
318367

319368
# If the input channel is one of our read channels, we add the relation
320369
if has_input_channel:
321370
ic = task.in_rc
322371
if isinstance(ic, RPoolChannel) and ic._pool is self:
323-
self._tasks.add_edge(ic._task, task)
372+
self._taskgraph_lock.acquire()
373+
try:
374+
self._tasks.add_edge(ic._task, task)
375+
finally:
376+
self._taskgraph_lock.release()
377+
# END handle edge-adding
324378
# END add task relation
325379
# END handle input channels for connections
326380

327381
return rc
328382

329383
#} END interface
384+
385+
386+
class ThreadPool(Pool):
387+
"""A pool using threads as worker"""
388+
WorkerCls = WorkerThread
389+
LockCls = Lock
390+
TaskQueueCls = Queue

0 commit comments

Comments
 (0)