Skip to content

Commit 583cd88

Browse files
committed
Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state
1 parent edd9e23 commit 583cd88

File tree

4 files changed

+176
-126
lines changed

4 files changed

+176
-126
lines changed

lib/git/async/pool.py

Lines changed: 36 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -1,121 +1,28 @@
11
"""Implementation of a thread-pool working with channels"""
22
from thread import WorkerThread
3+
from threading import Lock
34

4-
from threading import (
5-
Lock,
6-
_Condition,
7-
_sleep,
8-
_time,
5+
from util import (
6+
SyncQueue,
7+
AsyncQueue,
98
)
109

1110
from task import InputChannelTask
12-
from Queue import Queue, Empty
13-
from collections import deque
14-
15-
from graph import (
16-
Graph,
11+
from Queue import (
12+
Queue,
13+
Empty
1714
)
1815

16+
from graph import Graph
1917
from channel import (
2018
Channel,
2119
WChannel,
2220
RChannel
2321
)
2422

25-
import weakref
2623
import sys
2724

2825

29-
#{ Utilities
30-
31-
class SyncQueue(deque):
32-
"""Adapter to allow using a deque like a queue, without locking"""
33-
def get(self, block=True, timeout=None):
34-
try:
35-
return self.pop()
36-
except IndexError:
37-
raise Empty
38-
# END raise empty
39-
40-
def empty(self):
41-
return len(self) == 0
42-
43-
put = deque.append
44-
45-
46-
class HSCondition(_Condition):
47-
"""An attempt to make conditions less blocking, which gains performance
48-
in return by sleeping less"""
49-
delay = 0.00002 # reduces wait times, but increases overhead
50-
51-
def wait(self, timeout=None):
52-
waiter = Lock()
53-
waiter.acquire()
54-
self.__dict__['_Condition__waiters'].append(waiter)
55-
saved_state = self._release_save()
56-
try: # restore state no matter what (e.g., KeyboardInterrupt)
57-
if timeout is None:
58-
waiter.acquire()
59-
else:
60-
# Balancing act: We can't afford a pure busy loop, so we
61-
# have to sleep; but if we sleep the whole timeout time,
62-
# we'll be unresponsive. The scheme here sleeps very
63-
# little at first, longer as time goes on, but never longer
64-
# than 20 times per second (or the timeout time remaining).
65-
endtime = _time() + timeout
66-
delay = self.delay
67-
acquire = waiter.acquire
68-
while True:
69-
gotit = acquire(0)
70-
if gotit:
71-
break
72-
remaining = endtime - _time()
73-
if remaining <= 0:
74-
break
75-
delay = min(delay * 2, remaining, .05)
76-
_sleep(delay)
77-
# END endless loop
78-
if not gotit:
79-
try:
80-
self.__dict__['_Condition__waiters'].remove(waiter)
81-
except ValueError:
82-
pass
83-
# END didn't ever get it
84-
finally:
85-
self._acquire_restore(saved_state)
86-
87-
def notify(self, n=1):
88-
__waiters = self.__dict__['_Condition__waiters']
89-
if not __waiters:
90-
return
91-
if n == 1:
92-
__waiters[0].release()
93-
try:
94-
__waiters.pop(0)
95-
except IndexError:
96-
pass
97-
else:
98-
waiters = __waiters[:n]
99-
for waiter in waiters:
100-
waiter.release()
101-
try:
102-
__waiters.remove(waiter)
103-
except ValueError:
104-
pass
105-
# END handle n = 1 case faster
106-
107-
class PerfQueue(Queue):
108-
"""A queue using different condition objects to gain multithreading performance"""
109-
def __init__(self, maxsize=0):
110-
Queue.__init__(self, maxsize)
111-
112-
self.not_empty = HSCondition(self.mutex)
113-
self.not_full = HSCondition(self.mutex)
114-
self.all_tasks_done = HSCondition(self.mutex)
115-
116-
117-
#} END utilities
118-
11926
class RPoolChannel(RChannel):
12027
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
12128
before and after an item is to be read.
@@ -237,7 +144,7 @@ def __init__(self, size=0):
237144
self._tasks = Graph()
238145
self._consumed_tasks = None
239146
self._workers = list()
240-
self._queue = SyncQueue() # start with a sync queue
147+
self._queue = self.TaskQueueCls()
241148
self._taskgraph_lock = self.LockCls()
242149
self._taskorder_cache = dict()
243150
self.set_size(size)
@@ -375,7 +282,10 @@ def _post_channel_read(self, task):
375282
self._consumed_tasks.put(task)
376283
# END handle consumption
377284

378-
# delete consumed tasks to cleanup
285+
self._handle_consumed_tasks()
286+
287+
def _handle_consumed_tasks(self):
288+
"""Remove all consumed tasks from our queue by deleting them"""
379289
try:
380290
while True:
381291
ct = self._consumed_tasks.get(False)
@@ -384,7 +294,7 @@ def _post_channel_read(self, task):
384294
except Empty:
385295
pass
386296
# END pop queue empty
387-
297+
388298
def _del_task_if_orphaned(self, task):
389299
"""Check the task, and delete it if it is orphaned"""
390300
if sys.getrefcount(task._out_wc) < 3:
@@ -415,11 +325,7 @@ def set_size(self, size=0):
415325
cur_count = len(self._workers)
416326
if cur_count < size:
417327
# make sure we have a real queue, and can store our consumed tasks properly
418-
if not isinstance(self._queue, self.TaskQueueCls):
419-
if self._queue is not None and not self._queue.empty():
420-
raise AssertionError("Expected empty queue when switching the queue type")
421-
# END safety check
422-
self._queue = self.TaskQueueCls()
328+
if not isinstance(self._consumed_tasks, self.TaskQueueCls):
423329
self._consumed_tasks = Queue()
424330
# END init queue
425331

@@ -445,13 +351,8 @@ def set_size(self, size=0):
445351
continue
446352
# END while there are tasks on the queue
447353

448-
# use a serial queue, its faster
449-
if not isinstance(self._queue, SyncQueue):
450-
self._queue = SyncQueue()
451-
# END handle queue type
452-
453354
if self._consumed_tasks and not self._consumed_tasks.empty():
454-
self._post_channel_read(self._consumed_tasks.pop())
355+
self._handle_consumed_tasks()
455356
# END assure consumed tasks are empty
456357
self._consumed_tasks = SyncQueue()
457358
# END process queue
@@ -467,6 +368,8 @@ def del_task(self, task):
467368
output channel is only held by themselves, so no one will ever consume
468369
its items.
469370
371+
This method blocks until all tasks to be removed have been processed, if
372+
they are currently being processed.
470373
:return: self"""
471374
# now delete our actual node - must set it done os it closes its channels.
472375
# Otherwise further reads of output tasks will block.
@@ -478,6 +381,21 @@ def del_task(self, task):
478381
self._taskgraph_lock.acquire()
479382
try:
480383
self._taskorder_cache.clear()
384+
# before we can delete the task, make sure its write channel
385+
# is closed, otherwise people might still be waiting for its result.
386+
# If a channel is not closed, this could also mean its not yet fully
387+
# processed, but more importantly, there must be no task being processed
388+
# right now.
389+
# TODO: figure this out
390+
for worker in self._workers:
391+
r = worker.routine()
392+
if r and r.im_self is task:
393+
raise NotImplementedError("todo")
394+
# END handle running task
395+
# END check for in-progress routine
396+
397+
# its done, close the channel for writing
398+
task.close()
481399
self._tasks.del_node(task)
482400
finally:
483401
self._taskgraph_lock.release()
@@ -497,11 +415,11 @@ def add_task(self, task):
497415
# create a write channel for it
498416
wc, rc = Channel()
499417
rc = RPoolChannel(wc, task, self)
500-
task._out_wc = wc
418+
task.set_wc(wc)
501419

502420
has_input_channel = isinstance(task, InputChannelTask)
503421
if has_input_channel:
504-
task._pool_ref = weakref.ref(self)
422+
task.set_pool(self)
505423
# END init input channel task
506424

507425
self._taskgraph_lock.acquire()
@@ -534,4 +452,4 @@ class ThreadPool(Pool):
534452
"""A pool using threads as worker"""
535453
WorkerCls = WorkerThread
536454
LockCls = Lock
537-
TaskQueueCls = PerfQueue
455+
TaskQueueCls = AsyncQueue

lib/git/async/task.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from graph import Node
2+
23
import threading
4+
import weakref
35
import new
46

57
class OutputChannelTask(Node):
@@ -17,6 +19,7 @@ class OutputChannelTask(Node):
1719
__slots__ = ( '_read', # method to yield items to process
1820
'_out_wc', # output write channel
1921
'_exc', # exception caught
22+
'_done', # True if we are done
2023
'fun', # function to call with items read
2124
'min_count', # minimum amount of items to produce, None means no override
2225
'max_chunksize', # maximium amount of items to process per process call
@@ -28,19 +31,36 @@ def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0):
2831
self._read = None # to be set by subclasss
2932
self._out_wc = None # to be set later
3033
self._exc = None
34+
self._done = False
3135
self.fun = fun
3236
self.min_count = None
3337
self.max_chunksize = 0 # note set
3438
self.apply_single = apply_single
3539

3640
def is_done(self):
3741
""":return: True if we are finished processing"""
38-
return self._out_wc.closed
42+
return self._done
3943

4044
def set_done(self):
4145
"""Set ourselves to being done, has we have completed the processing"""
46+
self._done = True
47+
self.close()
48+
49+
def set_wc(self, wc):
50+
"""Set the write channel to the given one
51+
:note: resets it done state in order to allow proper queue handling"""
52+
self._done = False
53+
self._out_wc = wc
54+
55+
def close(self):
56+
"""A closed task will close its channel to assure the readers will wake up
57+
:note: its safe to call this method multiple times"""
4258
self._out_wc.close()
4359

60+
def is_closed(self):
61+
""":return: True if the task's write channel is closed"""
62+
return self._out_wc.closed
63+
4464
def error(self):
4565
""":return: Exception caught during last processing or None"""
4666
return self._exc
@@ -148,5 +168,9 @@ def process(self, count=1):
148168

149169
# and call it
150170
return OutputChannelTask.process(self, count)
171+
172+
def set_pool(self, pool):
173+
"""Set our pool to the given one, it will be weakref'd"""
174+
self._pool_ref = weakref.ref(pool)
151175
#{ Configuration
152176

lib/git/async/thread.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class WorkerThread(TerminatableThread):
110110
t[1] = optional, tuple or list of arguments to pass to the routine
111111
t[2] = optional, dictionary of keyword arguments to pass to the routine
112112
"""
113-
__slots__ = ('inq', 'outq')
113+
__slots__ = ('inq', '_current_routine')
114114

115115

116116
# define how often we should check for a shutdown request in case our
@@ -120,10 +120,12 @@ class WorkerThread(TerminatableThread):
120120
def __init__(self, inq = None):
121121
super(WorkerThread, self).__init__()
122122
self.inq = inq or Queue.Queue()
123+
self._current_routine = None # routine we execute right now
123124

124125
def run(self):
125126
"""Process input tasks until we receive the quit signal"""
126127
while True:
128+
self._current_routine = None
127129
if self._should_terminate():
128130
break
129131
# END check for stop request
@@ -138,8 +140,9 @@ def run(self):
138140
# needing exactly one function, and one arg
139141
assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
140142
routine, arg = tasktuple
141-
# DEBUG
142-
# print "%s: picked up: %s(%s)" % (self.name, routine, arg)
143+
144+
self._current_routine = routine
145+
143146
try:
144147
rval = None
145148
if inspect.ismethod(routine):
@@ -154,16 +157,15 @@ def run(self):
154157
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
155158
break
156159
# END make routine call
157-
except StopIteration:
158-
break
159160
except Exception,e:
160161
print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
161162
break # abort ...
162163
# END routine exception handling
163164
# END endless loop
164165

165-
def quit(self):
166-
raise StopIteration
166+
def routine(self):
167+
""":return: routine we are currently executing, or None if we have no task"""
168+
return self._current_routine
167169

168170

169171
#} END classes

0 commit comments

Comments
 (0)