Skip to content

Commit 29eb123

Browse files
committed
Merge branch 'cleanup' into async
2 parents f606937 + e14e3f1 commit 29eb123

File tree

6 files changed

+49
-27
lines changed

6 files changed

+49
-27
lines changed

lib/git/async/__init__.py

+29
Original file line numberDiff line numberDiff line change
@@ -1 +1,30 @@
11
"""Initialize the multi-processing package"""
2+
3+
#{ Initialization
4+
def _init_atexit():
5+
"""Setup an at-exit job to be sure our workers are shutdown correctly before
6+
the interpreter quits"""
7+
import atexit
8+
import thread
9+
atexit.register(thread.do_terminate_threads)
10+
11+
def _init_signals():
12+
"""Assure we shutdown our threads correctly when being interrupted"""
13+
import signal
14+
import thread
15+
16+
prev_handler = signal.getsignal(signal.SIGINT)
17+
def thread_interrupt_handler(signum, frame):
18+
thread.do_terminate_threads()
19+
if callable(prev_handler):
20+
prev_handler(signum, frame)
21+
raise KeyboardInterrupt()
22+
# END call previous handler
23+
# END signal handler
24+
signal.signal(signal.SIGINT, thread_interrupt_handler)
25+
26+
27+
#} END init
28+
29+
_init_atexit()
30+
_init_signals()

lib/git/async/pool.py

-6
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,6 @@ def read(self, count=0, block=True, timeout=None):
9595
# provided enough - its better to have some possibly empty task runs
9696
# than having and empty queue that blocks.
9797

98-
# NOTE: TODO: that case is only possible if one Task could be connected
99-
# to multiple input channels in a manner known by the system. Currently
100-
# this is not possible, but should be implemented at some point.
101-
10298
# if the user tries to use us to read from a done task, we will never
10399
# compute as all produced items are already in the channel
104100
task = self._task_ref()
@@ -260,8 +256,6 @@ def _prepare_channel_read(self, task, count):
260256
# the following loops are kind of unrolled - code duplication
261257
# should make things execute faster. Putting the if statements
262258
# into the loop would be less code, but ... slower
263-
# DEBUG
264-
# print actual_count, numchunks, chunksize, remainder, task._out_writer.size()
265259
if self._num_workers:
266260
# respect the chunk size, and split the task up if we want
267261
# to process too much. This can be defined per task

lib/git/async/task.py

-6
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,7 @@ def process(self, count=0):
8888
self._num_writers += 1
8989
self._wlock.release()
9090

91-
#print "%r: reading %i" % (self.id, count)
92-
#if hasattr(self, 'reader'):
93-
# print "from", self.reader().channel
9491
items = self._read(count)
95-
#print "%r: done reading %i items" % (self.id, len(items))
9692

9793
try:
9894
try:
@@ -117,7 +113,6 @@ def process(self, count=0):
117113
self._wlock.release()
118114
# END handle writer count
119115
except Exception, e:
120-
print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging
121116
# be sure our task is not scheduled again
122117
self.set_done()
123118

@@ -164,7 +159,6 @@ def process(self, count=0):
164159
self._wlock.acquire()
165160
try:
166161
if self._num_writers == 0:
167-
# print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel
168162
self.close()
169163
# END handle writers
170164
finally:

lib/git/async/thread.py

+17-8
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,21 @@ def stop(cls, *args):
136136

137137
def run(self):
138138
"""Process input tasks until we receive the quit signal"""
139-
print self.name, "starts processing" # DEBUG
140-
141139
gettask = self.inq.get
142140
while True:
143141
if self._should_terminate():
144142
break
145143
# END check for stop request
146144

147-
# we wait and block - to terminate, send the 'stop' method
145+
# note: during shutdown, this turns None in the middle of waiting
146+
# for an item to be put onto it - we can't du anything about it -
147+
# even if we catch everything and break gracefully, the parent
148+
# call will think we failed with an empty exception.
149+
# Hence we just don't do anything about it. Alternatively
150+
# we could override the start method to get our own bootstrapping,
151+
# which would mean repeating plenty of code in of the threading module.
148152
tasktuple = gettask()
153+
149154
# needing exactly one function, and one arg
150155
routine, arg = tasktuple
151156

@@ -161,7 +166,7 @@ def run(self):
161166
rval = routine(arg)
162167
else:
163168
# ignore unknown items
164-
print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
169+
sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple)))
165170
break
166171
# END make routine call
167172
finally:
@@ -171,18 +176,22 @@ def run(self):
171176
del(routine)
172177
del(tasktuple)
173178
except StopProcessing:
174-
print self.name, "stops processing" # DEBUG
175179
break
176180
except Exception,e:
177-
print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
181+
sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e)))
178182
continue # just continue
179183
# END routine exception handling
180184

181185
# END handle routine release
182186
# END endless loop
183187

184188
def stop_and_join(self):
185-
"""Send stop message to ourselves"""
189+
"""Send stop message to ourselves - we don't block, the thread will terminate
190+
once it has finished processing its input queue to receive our termination
191+
event"""
192+
# DONT call superclass as it will try to join - join's don't work for
193+
# some reason, as python apparently doesn't switch threads (so often)
194+
# while waiting ... I don't know, but the threads respond properly,
195+
# but only if dear python switches to them
186196
self.inq.put((self.stop, None))
187-
super(WorkerThread, self).stop_and_join()
188197
#} END classes

lib/git/async/util.py

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from threading import (
44
Lock,
5-
current_thread,
65
_allocate_lock,
76
_Condition,
87
_sleep,

test/git/async/test_pool.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ class TestThreadPool(TestBase):
1919

2020
def _assert_single_task(self, p, async=False):
2121
"""Performs testing in a synchronized environment"""
22-
# return # DEBUG TODO: Fixme deactivated it
2322
print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size())
2423
null_tasks = p.num_tasks() # in case we had some before
2524

@@ -373,10 +372,7 @@ def _assert_async_dependent_tasks(self, pool):
373372

374373

375374

376-
377-
# for some reason, sometimes it has multiple workerthreads already when he
378-
# enters the method ... dunno yet, pools should clean up themselvess
379-
#@terminate_threads
375+
@terminate_threads
380376
def test_base(self):
381377
assert len(threading.enumerate()) == 1
382378

@@ -463,10 +459,11 @@ def test_base(self):
463459
# threads per core
464460
p.set_size(4)
465461
self._assert_single_task(p, True)
462+
463+
466464
# DEPENDENT TASK ASYNC MODE
467465
###########################
468466
self._assert_async_dependent_tasks(p)
469467

470468
print >> sys.stderr, "Done with everything"
471469

472-
# TODO: test multi-pool connections

0 commit comments

Comments
 (0)