Skip to content

Commit 1873db4

Browse files
committed
Improved shutdown handling - although its impossible to prevent some stderr printing thanks to the underlying threading implementation, we can at least make sure that the interpreter doesn't block during shutdown. Now it appears to be running smoothly
1 parent f606937 commit 1873db4

File tree

3 files changed

+49
-13
lines changed

3 files changed

+49
-13
lines changed

lib/git/async/__init__.py

+29
Original file line numberDiff line numberDiff line change
@@ -1 +1,30 @@
11
"""Initialize the multi-processing package"""
2+
3+
#{ Initialization
4+
def _init_atexit():
5+
"""Setup an at-exit job to be sure our workers are shutdown correctly before
6+
the interpreter quits"""
7+
import atexit
8+
import thread
9+
atexit.register(thread.do_terminate_threads)
10+
11+
def _init_signals():
12+
"""Assure we shutdown our threads correctly when being interrupted"""
13+
import signal
14+
import thread
15+
16+
prev_handler = signal.getsignal(signal.SIGINT)
17+
def thread_interrupt_handler(signum, frame):
18+
thread.do_terminate_threads()
19+
if callable(prev_handler):
20+
prev_handler(signum, frame)
21+
raise KeyboardInterrupt()
22+
# END call previous handler
23+
# END signal handler
24+
signal.signal(signal.SIGINT, thread_interrupt_handler)
25+
26+
27+
#} END init
28+
29+
_init_atexit()
30+
_init_signals()

lib/git/async/thread.py

+17-8
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,21 @@ def stop(cls, *args):
136136

137137
def run(self):
138138
"""Process input tasks until we receive the quit signal"""
139-
print self.name, "starts processing" # DEBUG
140-
141139
gettask = self.inq.get
142140
while True:
143141
if self._should_terminate():
144142
break
145143
# END check for stop request
146144

147-
# we wait and block - to terminate, send the 'stop' method
145+
# note: during shutdown, this turns None in the middle of waiting
146+
# for an item to be put onto it - we can't du anything about it -
147+
# even if we catch everything and break gracefully, the parent
148+
# call will think we failed with an empty exception.
149+
# Hence we just don't do anything about it. Alternatively
150+
# we could override the start method to get our own bootstrapping,
151+
# which would mean repeating plenty of code in of the threading module.
148152
tasktuple = gettask()
153+
149154
# needing exactly one function, and one arg
150155
routine, arg = tasktuple
151156

@@ -161,7 +166,7 @@ def run(self):
161166
rval = routine(arg)
162167
else:
163168
# ignore unknown items
164-
print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
169+
sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple)))
165170
break
166171
# END make routine call
167172
finally:
@@ -171,18 +176,22 @@ def run(self):
171176
del(routine)
172177
del(tasktuple)
173178
except StopProcessing:
174-
print self.name, "stops processing" # DEBUG
175179
break
176180
except Exception,e:
177-
print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
181+
sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e)))
178182
continue # just continue
179183
# END routine exception handling
180184

181185
# END handle routine release
182186
# END endless loop
183187

184188
def stop_and_join(self):
185-
"""Send stop message to ourselves"""
189+
"""Send stop message to ourselves - we don't block, the thread will terminate
190+
once it has finished processing its input queue to receive our termination
191+
event"""
192+
# DONT call superclass as it will try to join - join's don't work for
193+
# some reason, as python apparently doesn't switch threads (so often)
194+
# while waiting ... I don't know, but the threads respond properly,
195+
# but only if dear python switches to them
186196
self.inq.put((self.stop, None))
187-
super(WorkerThread, self).stop_and_join()
188197
#} END classes

test/git/async/test_pool.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,7 @@ def _assert_async_dependent_tasks(self, pool):
373373

374374

375375

376-
377-
# for some reason, sometimes it has multiple workerthreads already when he
378-
# enters the method ... dunno yet, pools should clean up themselvess
379-
#@terminate_threads
376+
@terminate_threads
380377
def test_base(self):
381378
assert len(threading.enumerate()) == 1
382379

@@ -463,10 +460,11 @@ def test_base(self):
463460
# threads per core
464461
p.set_size(4)
465462
self._assert_single_task(p, True)
463+
464+
466465
# DEPENDENT TASK ASYNC MODE
467466
###########################
468467
self._assert_async_dependent_tasks(p)
469468

470469
print >> sys.stderr, "Done with everything"
471470

472-
# TODO: test multi-pool connections

0 commit comments

Comments
 (0)