Skip to content

Commit 65c9fe0

Browse files
committed
A code donation: Donating a worker thread implementation inclduding tests to Git-Python. I have the feeling it can do much good here :)
1 parent c69b6b9 commit 65c9fe0

File tree

3 files changed

+251
-0
lines changed

3 files changed

+251
-0
lines changed

lib/git/odb/channel.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def __new__(cls, *args):
3030
# END constructor mode
3131
return object.__new__(cls)
3232

33+
3334
class WChannel(Channel):
3435
"""The write end of a channel"""
3536
__slots__ = ('_closed', '_queue')

lib/git/odb/thread.py

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
# -*- coding: utf-8 -*-
2+
"""Module with threading utilities"""
3+
__docformat__ = "restructuredtext"
4+
import threading
5+
import inspect
6+
import Queue
7+
8+
#{ Decorators
9+
10+
def do_terminate_threads(whitelist=list()):
11+
"""Simple function which terminates all of our threads
12+
:param whitelist: If whitelist is given, only the given threads will be terminated"""
13+
for t in threading.enumerate():
14+
if not isinstance(t, TerminatableThread):
15+
continue
16+
if whitelist and t not in whitelist:
17+
continue
18+
if isinstance(t, WorkerThread):
19+
t.inq.put(t.quit)
20+
# END worker special handling
21+
t.stop_and_join()
22+
# END for each thread
23+
24+
def terminate_threads( func ):
25+
"""Kills all worker threads the method has created by sending the quit signal.
26+
This takes over in case of an error in the main function"""
27+
def wrapper(*args, **kwargs):
28+
cur_threads = set(threading.enumerate())
29+
try:
30+
return func(*args, **kwargs)
31+
finally:
32+
do_terminate_threads(set(threading.enumerate()) - cur_threads)
33+
# END finally shutdown threads
34+
# END wrapper
35+
wrapper.__name__ = func.__name__
36+
return wrapper
37+
38+
#} END decorators
39+
40+
#{ Classes
41+
42+
class TerminatableThread(threading.Thread):
43+
"""A simple thread able to terminate itself on behalf of the user.
44+
45+
Terminate a thread as follows:
46+
47+
t.stop_and_join()
48+
49+
Derived classes call _should_terminate() to determine whether they should
50+
abort gracefully
51+
"""
52+
__slots__ = '_terminate'
53+
54+
def __init__(self):
55+
super(TerminatableThread, self).__init__()
56+
self._terminate = False
57+
58+
59+
#{ Subclass Interface
60+
def _should_terminate(self):
61+
""":return: True if this thread should terminate its operation immediately"""
62+
return self._terminate
63+
64+
def _terminated(self):
65+
"""Called once the thread terminated. Its called in the main thread
66+
and may perform cleanup operations"""
67+
pass
68+
69+
def start(self):
70+
"""Start the thread and return self"""
71+
super(TerminatableThread, self).start()
72+
return self
73+
74+
#} END subclass interface
75+
76+
#{ Interface
77+
78+
def stop_and_join(self):
79+
"""Ask the thread to stop its operation and wait for it to terminate
80+
:note: Depending on the implenetation, this might block a moment"""
81+
self._terminate = True
82+
self.join()
83+
self._terminated()
84+
#} END interface
85+
86+
87+
class WorkerThread(TerminatableThread):
88+
"""
89+
This base allows to call functions on class instances natively and retrieve
90+
their results asynchronously using a queue.
91+
The thread runs forever unless it receives the terminate signal using
92+
its task queue.
93+
94+
Tasks could be anything, but should usually be class methods and arguments to
95+
allow the following:
96+
97+
inq = Queue()
98+
outq = Queue()
99+
w = WorkerThread(inq, outq)
100+
w.start()
101+
inq.put((WorkerThread.<method>, args, kwargs))
102+
res = outq.get()
103+
104+
finally we call quit to terminate asap.
105+
106+
alternatively, you can make a call more intuitively - the output is the output queue
107+
allowing you to get the result right away or later
108+
w.call(arg, kwarg='value').get()
109+
110+
inq.put(WorkerThread.quit)
111+
w.join()
112+
113+
You may provide the following tuples as task:
114+
t[0] = class method, function or instance method
115+
t[1] = optional, tuple or list of arguments to pass to the routine
116+
t[2] = optional, dictionary of keyword arguments to pass to the routine
117+
"""
118+
__slots__ = ('inq', 'outq')
119+
120+
class InvalidRoutineError(Exception):
121+
"""Class sent as return value in case of an error"""
122+
123+
def __init__(self, inq = None, outq = None):
124+
super(WorkerThread, self).__init__()
125+
self.inq = inq or Queue.Queue()
126+
self.outq = outq or Queue.Queue()
127+
128+
def call(self, function, *args, **kwargs):
129+
"""Method that makes the call to the worker using the input queue,
130+
returning our output queue
131+
132+
:param funciton: can be a standalone function unrelated to this class,
133+
a class method of this class or any instance method.
134+
If it is a string, it will be considered a function residing on this instance
135+
:param args: arguments to pass to function
136+
:parma **kwargs: kwargs to pass to function"""
137+
self.inq.put((function, args, kwargs))
138+
return self.outq
139+
140+
def wait_until_idle(self):
141+
"""wait until the input queue is empty, in the meanwhile, take all
142+
results off the output queue."""
143+
while not self.inq.empty():
144+
try:
145+
self.outq.get(False)
146+
except Queue.Empty:
147+
continue
148+
# END while there are tasks on the queue
149+
150+
def run(self):
151+
"""Process input tasks until we receive the quit signal"""
152+
while True:
153+
if self._should_terminate():
154+
break
155+
# END check for stop request
156+
routine = self.__class__.quit
157+
args = tuple()
158+
kwargs = dict()
159+
tasktuple = self.inq.get()
160+
161+
if isinstance(tasktuple, (tuple, list)):
162+
if len(tasktuple) == 3:
163+
routine, args, kwargs = tasktuple
164+
elif len(tasktuple) == 2:
165+
routine, args = tasktuple
166+
elif len(tasktuple) == 1:
167+
routine = tasktuple[0]
168+
# END tasktuple length check
169+
elif inspect.isroutine(tasktuple):
170+
routine = tasktuple
171+
# END tasktuple handling
172+
173+
try:
174+
rval = None
175+
if inspect.ismethod(routine):
176+
if routine.im_self is None:
177+
rval = routine(self, *args, **kwargs)
178+
else:
179+
rval = routine(*args, **kwargs)
180+
elif inspect.isroutine(routine):
181+
rval = routine(*args, **kwargs)
182+
elif isinstance(routine, basestring) and hasattr(self, routine):
183+
rval = getattr(self, routine)(*args, **kwargs)
184+
else:
185+
# ignore unknown items
186+
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
187+
self.outq.put(self.InvalidRoutineError(routine))
188+
break
189+
# END make routine call
190+
self.outq.put(rval)
191+
except StopIteration:
192+
break
193+
except Exception,e:
194+
print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
195+
self.outq.put(e)
196+
# END routine exception handling
197+
# END endless loop
198+
199+
def quit(self):
200+
raise StopIteration
201+
202+
203+
#} END classes

test/git/odb/test_thread.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# -*- coding: utf-8 -*-
2+
""" Test thead classes and functions"""
3+
from test.testlib import *
4+
from git.odb.thread import *
5+
from Queue import Queue
6+
7+
class TestWorker(WorkerThread):
8+
def __init__(self, *args, **kwargs):
9+
super(TestWorker, self).__init__(*args, **kwargs)
10+
self.reset()
11+
12+
def fun(self, *args, **kwargs):
13+
self.called = True
14+
self.args = args
15+
self.kwargs = kwargs
16+
return True
17+
18+
def make_assertion(self):
19+
assert self.called
20+
assert self.args
21+
assert self.kwargs
22+
self.reset()
23+
24+
def reset(self):
25+
self.called = False
26+
self.args = None
27+
self.kwargs = None
28+
29+
30+
class TestCase( TestCase ):
31+
32+
@terminate_threads
33+
def test_worker_thread(self):
34+
worker = TestWorker()
35+
assert isinstance(worker.start(), WorkerThread)
36+
37+
# test different method types
38+
standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs)
39+
for function in ("fun", TestWorker.fun, worker.fun, standalone_func):
40+
rval = worker.call(function, 1, this='that')
41+
assert isinstance(rval, Queue)
42+
assert rval.get() is True
43+
worker.make_assertion()
44+
# END for each function type
45+
46+
worker.call('quit')
47+

0 commit comments

Comments
 (0)