Skip to content

Commit edd9e23

Browse files
committed
added high-speed locking facilities, allowing our Queue to be faster, at least in tests, and with multiple threads. There is still an sync bug in regard to closed channels to be fixed, as the Task.set_done handling is incorrecft
1 parent 8c3c271 commit edd9e23

File tree

3 files changed

+199
-65
lines changed

3 files changed

+199
-65
lines changed

lib/git/async/pool.py

Lines changed: 186 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
"""Implementation of a thread-pool working with channels"""
22
from thread import WorkerThread
3-
from threading import Lock
3+
4+
from threading import (
5+
Lock,
6+
_Condition,
7+
_sleep,
8+
_time,
9+
)
10+
411
from task import InputChannelTask
512
from Queue import Queue, Empty
13+
from collections import deque
614

715
from graph import (
816
Graph,
@@ -18,6 +26,96 @@
1826
import sys
1927

2028

29+
#{ Utilities
30+
31+
class SyncQueue(deque):
32+
"""Adapter to allow using a deque like a queue, without locking"""
33+
def get(self, block=True, timeout=None):
34+
try:
35+
return self.pop()
36+
except IndexError:
37+
raise Empty
38+
# END raise empty
39+
40+
def empty(self):
41+
return len(self) == 0
42+
43+
put = deque.append
44+
45+
46+
class HSCondition(_Condition):
47+
"""An attempt to make conditions less blocking, which gains performance
48+
in return by sleeping less"""
49+
delay = 0.00002 # reduces wait times, but increases overhead
50+
51+
def wait(self, timeout=None):
52+
waiter = Lock()
53+
waiter.acquire()
54+
self.__dict__['_Condition__waiters'].append(waiter)
55+
saved_state = self._release_save()
56+
try: # restore state no matter what (e.g., KeyboardInterrupt)
57+
if timeout is None:
58+
waiter.acquire()
59+
else:
60+
# Balancing act: We can't afford a pure busy loop, so we
61+
# have to sleep; but if we sleep the whole timeout time,
62+
# we'll be unresponsive. The scheme here sleeps very
63+
# little at first, longer as time goes on, but never longer
64+
# than 20 times per second (or the timeout time remaining).
65+
endtime = _time() + timeout
66+
delay = self.delay
67+
acquire = waiter.acquire
68+
while True:
69+
gotit = acquire(0)
70+
if gotit:
71+
break
72+
remaining = endtime - _time()
73+
if remaining <= 0:
74+
break
75+
delay = min(delay * 2, remaining, .05)
76+
_sleep(delay)
77+
# END endless loop
78+
if not gotit:
79+
try:
80+
self.__dict__['_Condition__waiters'].remove(waiter)
81+
except ValueError:
82+
pass
83+
# END didn't ever get it
84+
finally:
85+
self._acquire_restore(saved_state)
86+
87+
def notify(self, n=1):
88+
__waiters = self.__dict__['_Condition__waiters']
89+
if not __waiters:
90+
return
91+
if n == 1:
92+
__waiters[0].release()
93+
try:
94+
__waiters.pop(0)
95+
except IndexError:
96+
pass
97+
else:
98+
waiters = __waiters[:n]
99+
for waiter in waiters:
100+
waiter.release()
101+
try:
102+
__waiters.remove(waiter)
103+
except ValueError:
104+
pass
105+
# END handle n = 1 case faster
106+
107+
class PerfQueue(Queue):
108+
"""A queue using different condition objects to gain multithreading performance"""
109+
def __init__(self, maxsize=0):
110+
Queue.__init__(self, maxsize)
111+
112+
self.not_empty = HSCondition(self.mutex)
113+
self.not_full = HSCondition(self.mutex)
114+
self.all_tasks_done = HSCondition(self.mutex)
115+
116+
117+
#} END utilities
118+
21119
class RPoolChannel(RChannel):
22120
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
23121
before and after an item is to be read.
@@ -49,7 +147,7 @@ def set_post_cb(self, fun = lambda item: item):
49147
returns a possibly changed item list. If it raises, the exception will be propagated.
50148
If a function is not provided, the call is effectively uninstalled."""
51149
self._post_cb = fun
52-
150+
53151
def read(self, count=0, block=True, timeout=None):
54152
"""Read an item that was processed by one of our threads
55153
:note: Triggers task dependency handling needed to provide the necessary
@@ -58,8 +156,18 @@ def read(self, count=0, block=True, timeout=None):
58156
self._pre_cb()
59157
# END pre callback
60158

159+
# if we have count items, don't do any queue preparation - if someone
160+
# depletes the queue in the meanwhile, the channel will close and
161+
# we will unblock naturally
162+
have_enough = False
163+
if count > 0:
164+
# explicitly > count, as we want a certain safe range
165+
have_enough = self._wc._queue.qsize() > count
166+
# END risky game
167+
61168
########## prepare ##############################
62-
self._pool._prepare_channel_read(self._task, count)
169+
if not have_enough:
170+
self._pool._prepare_channel_read(self._task, count)
63171

64172

65173
######### read data ######
@@ -127,9 +235,9 @@ class Pool(object):
127235

128236
def __init__(self, size=0):
129237
self._tasks = Graph()
130-
self._consumed_tasks = Queue() # make sure its threadsafe
238+
self._consumed_tasks = None
131239
self._workers = list()
132-
self._queue = self.TaskQueueCls()
240+
self._queue = SyncQueue() # start with a sync queue
133241
self._taskgraph_lock = self.LockCls()
134242
self._taskorder_cache = dict()
135243
self.set_size(size)
@@ -201,58 +309,60 @@ def _prepare_channel_read(self, task, count):
201309
# if the task does not have the required output on its queue, schedule
202310
# it for processing. If we should process all, we don't care about the
203311
# amount as it should process until its all done.
204-
# NOTE: revise this for multi-tasking - checking qsize doesnt work there !
205-
if count < 1 or task._out_wc.size() < count:
206-
# but we continue to use the actual count to produce the output
207-
numchunks = 1
208-
chunksize = actual_count
209-
remainder = 0
210-
211-
# we need the count set for this - can't chunk up unlimited items
212-
# In serial mode we could do this by checking for empty input channels,
213-
# but in dispatch mode its impossible ( == not easily possible )
214-
# Only try it if we have enough demand
215-
if task.max_chunksize and actual_count > task.max_chunksize:
216-
numchunks = actual_count / task.max_chunksize
217-
chunksize = task.max_chunksize
218-
remainder = actual_count - (numchunks * chunksize)
219-
# END handle chunking
220-
221-
# the following loops are kind of unrolled - code duplication
222-
# should make things execute faster. Putting the if statements
223-
# into the loop would be less code, but ... slower
224-
# DEBUG
225-
# print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
226-
if self._workers:
227-
# respect the chunk size, and split the task up if we want
228-
# to process too much. This can be defined per task
229-
queue = self._queue
230-
if numchunks > 1:
231-
for i in xrange(numchunks):
232-
queue.put((task.process, chunksize))
233-
# END for each chunk to put
234-
else:
312+
#if count > 1 and task._out_wc.size() >= count:
313+
# continue
314+
# END skip if we have enough
315+
316+
# but use the actual count to produce the output, we may produce
317+
# more than requested
318+
numchunks = 1
319+
chunksize = actual_count
320+
remainder = 0
321+
322+
# we need the count set for this - can't chunk up unlimited items
323+
# In serial mode we could do this by checking for empty input channels,
324+
# but in dispatch mode its impossible ( == not easily possible )
325+
# Only try it if we have enough demand
326+
if task.max_chunksize and actual_count > task.max_chunksize:
327+
numchunks = actual_count / task.max_chunksize
328+
chunksize = task.max_chunksize
329+
remainder = actual_count - (numchunks * chunksize)
330+
# END handle chunking
331+
332+
# the following loops are kind of unrolled - code duplication
333+
# should make things execute faster. Putting the if statements
334+
# into the loop would be less code, but ... slower
335+
# DEBUG
336+
# print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
337+
if self._workers:
338+
# respect the chunk size, and split the task up if we want
339+
# to process too much. This can be defined per task
340+
queue = self._queue
341+
if numchunks > 1:
342+
for i in xrange(numchunks):
235343
queue.put((task.process, chunksize))
236-
# END try efficient looping
237-
238-
if remainder:
239-
queue.put((task.process, remainder))
240-
# END handle chunksize
344+
# END for each chunk to put
241345
else:
242-
# no workers, so we have to do the work ourselves
243-
if numchunks > 1:
244-
for i in xrange(numchunks):
245-
task.process(chunksize)
246-
# END for each chunk to put
247-
else:
346+
queue.put((task.process, chunksize))
347+
# END try efficient looping
348+
349+
if remainder:
350+
queue.put((task.process, remainder))
351+
# END handle chunksize
352+
else:
353+
# no workers, so we have to do the work ourselves
354+
if numchunks > 1:
355+
for i in xrange(numchunks):
248356
task.process(chunksize)
249-
# END try efficient looping
250-
251-
if remainder:
252-
task.process(remainder)
253-
# END handle chunksize
254-
# END handle serial mode
255-
# END handle queuing
357+
# END for each chunk to put
358+
else:
359+
task.process(chunksize)
360+
# END try efficient looping
361+
362+
if remainder:
363+
task.process(remainder)
364+
# END handle chunksize
365+
# END handle serial mode
256366
# END for each task to process
257367

258368

@@ -297,11 +407,22 @@ def set_size(self, size=0):
297407
otherwise the work will be distributed among the given amount of threads
298408
299409
:note: currently NOT threadsafe !"""
410+
assert size > -1, "Size cannot be negative"
411+
300412
# either start new threads, or kill existing ones.
301413
# If we end up with no threads, we process the remaining chunks on the queue
302414
# ourselves
303415
cur_count = len(self._workers)
304416
if cur_count < size:
417+
# make sure we have a real queue, and can store our consumed tasks properly
418+
if not isinstance(self._queue, self.TaskQueueCls):
419+
if self._queue is not None and not self._queue.empty():
420+
raise AssertionError("Expected empty queue when switching the queue type")
421+
# END safety check
422+
self._queue = self.TaskQueueCls()
423+
self._consumed_tasks = Queue()
424+
# END init queue
425+
305426
for i in range(size - cur_count):
306427
worker = self.WorkerCls(self._queue)
307428
worker.start()
@@ -323,6 +444,16 @@ def set_size(self, size=0):
323444
except Queue.Empty:
324445
continue
325446
# END while there are tasks on the queue
447+
448+
# use a serial queue, its faster
449+
if not isinstance(self._queue, SyncQueue):
450+
self._queue = SyncQueue()
451+
# END handle queue type
452+
453+
if self._consumed_tasks and not self._consumed_tasks.empty():
454+
self._post_channel_read(self._consumed_tasks.pop())
455+
# END assure consumed tasks are empty
456+
self._consumed_tasks = SyncQueue()
326457
# END process queue
327458
return self
328459

@@ -403,4 +534,4 @@ class ThreadPool(Pool):
403534
"""A pool using threads as worker"""
404535
WorkerCls = WorkerThread
405536
LockCls = Lock
406-
TaskQueueCls = Queue
537+
TaskQueueCls = PerfQueue

lib/git/async/thread.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ def do_terminate_threads(whitelist=list()):
1515
continue
1616
if whitelist and t not in whitelist:
1717
continue
18-
if isinstance(t, WorkerThread):
19-
t.inq.put(t.quit)
20-
# END worker special handling
2118
t.stop_and_join()
2219
# END for each thread
2320

test/git/async/test_pool.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from test.testlib import *
33
from git.async.pool import *
44
from git.async.task import *
5+
from git.async.thread import terminate_threads
56
from git.async.util import cpu_count
67
import threading
78
import time
@@ -46,7 +47,7 @@ def _assert_single_task(self, p, async=False):
4647

4748
# add a simple task
4849
# it iterates n items
49-
ni = 1000
50+
ni = 500
5051
assert ni % 2 == 0, "ni needs to be dividable by 2"
5152
assert ni % 4 == 0, "ni needs to be dividable by 4"
5253

@@ -106,8 +107,9 @@ def make_iter():
106107
assert len(rc.read(1)) == 1 # processes nothing
107108
# rest - it has ni/2 - 2 on the queue, and pulls ni-2
108109
# It wants too much, so the task realizes its done. The task
109-
# doesn't care about the items in its output channel
110-
assert len(rc.read(ni-2)) == ni - 2
110+
# doesn't care about the items in its output channel
111+
items = rc.read(ni-2)
112+
assert len(items) == ni - 2
111113
assert p.num_tasks() == null_tasks
112114
task._assert(2, ni) # two chunks, 20 calls ( all items )
113115

@@ -125,7 +127,8 @@ def make_iter():
125127
assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2
126128
# have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
127129
# ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
128-
assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2
130+
items = rc.read(ni / 2 - 2)
131+
assert len(items) == ni / 2 - 2
129132

130133
task._assert( 5, ni)
131134
assert p.num_tasks() == null_tasks # depleted
@@ -158,9 +161,12 @@ def make_iter():
158161
task.min_count = ni / 4
159162
rc = p.add_task(task)
160163
for i in range(ni):
161-
assert rc.read(1)[0] == i
164+
if async:
165+
assert len(rc.read(1)) == 1
166+
else:
167+
assert rc.read(1)[0] == i
162168
# END for each item
163-
task._assert(ni / task.min_count, ni)
169+
task._assert(ni / task.min_count + 1, ni)
164170
del(rc)
165171
assert p.num_tasks() == null_tasks
166172

@@ -181,6 +187,7 @@ def _assert_async_dependent_tasks(self, p):
181187
# t1 -> x -> t3
182188
pass
183189

190+
@terminate_threads
184191
def test_base(self):
185192
p = ThreadPool()
186193

@@ -239,7 +246,6 @@ def test_base(self):
239246
p.set_size(2)
240247
self._assert_single_task(p, True)
241248

242-
243249
# DEPENDENT TASK ASYNC MODE
244250
###########################
245251
self._assert_async_dependent_tasks(p)

0 commit comments

Comments
 (0)