Skip to content

Commit fbe062b

Browse files
committed
Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there
1 parent c34343d commit fbe062b

File tree

6 files changed

+155
-59
lines changed

6 files changed

+155
-59
lines changed

lib/git/async/graph.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,24 @@ class Graph(object):
2525

2626
def __init__(self):
2727
self.nodes = list()
28+
29+
def __del__(self):
30+
"""Deletes bidericational dependencies"""
31+
for node in self.nodes:
32+
node.in_nodes = None
33+
node.out_nodes = None
34+
# END cleanup nodes
35+
36+
# otherwise the nodes would keep floating around
2837

38+
2939
def add_node(self, node):
3040
"""Add a new node to the graph
3141
:return: the newly added node"""
3242
self.nodes.append(node)
3343
return node
3444

35-
def del_node(self, node):
45+
def remove_node(self, node):
3646
"""Delete a node from the graph
3747
:return: self"""
3848
try:
@@ -46,6 +56,8 @@ def del_node(self, node):
4656
del(outn.in_nodes[outn.in_nodes.index(node)])
4757
for inn in node.in_nodes:
4858
del(inn.out_nodes[inn.out_nodes.index(node)])
59+
node.out_nodes = list()
60+
node.in_nodes = list()
4961
return self
5062

5163
def add_edge(self, u, v):

lib/git/async/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ def remove_task(self, task, _from_destructor_ = False):
402402

403403
# keep its input nodes as we check whether they were orphaned
404404
in_tasks = task.in_nodes
405-
self._tasks.del_node(task)
405+
self._tasks.remove_node(task)
406406
self._taskorder_cache.clear()
407407
finally:
408408
self._taskgraph_lock.release()

lib/git/async/task.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ def error(self):
8282

8383
def process(self, count=0):
8484
"""Process count items and send the result individually to the output channel"""
85-
# first thing: increment the writer count
85+
# first thing: increment the writer count - other tasks must be able
86+
# to respond properly ( even if it turns out we don't need it later )
8687
self._wlock.acquire()
8788
self._num_writers += 1
8889
self._wlock.release()
@@ -191,7 +192,11 @@ def __init__(self, iterator, *args, **kwargs):
191192
raise ValueError("Iterator %r needs a next() function" % iterator)
192193
self._iterator = iterator
193194
self._lock = self.lock_type()
194-
self._read = self.__read
195+
196+
# this is necessary to prevent a cyclic ref, preventing us from
197+
# getting deleted ( and collected )
198+
weakself = weakref.ref(self)
199+
self._read = lambda count: weakself().__read(count)
195200
self._empty = False
196201

197202
def __read(self, count=0):
@@ -201,6 +206,7 @@ def __read(self, count=0):
201206
if self._empty:
202207
return list()
203208
# END early abort
209+
204210
self._lock.acquire()
205211
try:
206212
if count == 0:

lib/git/async/thread.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class WorkerThread(TerminatableThread):
116116
t[1] = optional, tuple or list of arguments to pass to the routine
117117
t[2] = optional, dictionary of keyword arguments to pass to the routine
118118
"""
119-
__slots__ = ('inq', '_current_routine')
119+
__slots__ = ('inq')
120120

121121

122122
# define how often we should check for a shutdown request in case our
@@ -128,7 +128,6 @@ def __init__(self, inq = None):
128128
self.inq = inq
129129
if inq is None:
130130
self.inq = Queue.Queue()
131-
self._current_routine = None # routine we execute right now
132131

133132
@classmethod
134133
def stop(cls, *args):
@@ -141,7 +140,6 @@ def run(self):
141140

142141
gettask = self.inq.get
143142
while True:
144-
self._current_routine = None
145143
if self._should_terminate():
146144
break
147145
# END check for stop request
@@ -153,35 +151,38 @@ def run(self):
153151
assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
154152
routine, arg = tasktuple
155153

156-
self._current_routine = routine
157-
158154
try:
159-
rval = None
160-
if inspect.ismethod(routine):
161-
if routine.im_self is None:
162-
rval = routine(self, arg)
163-
else:
155+
try:
156+
rval = None
157+
if inspect.ismethod(routine):
158+
if routine.im_self is None:
159+
rval = routine(self, arg)
160+
else:
161+
rval = routine(arg)
162+
elif inspect.isroutine(routine):
164163
rval = routine(arg)
165-
elif inspect.isroutine(routine):
166-
rval = routine(arg)
167-
else:
168-
# ignore unknown items
169-
print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
170-
break
171-
# END make routine call
164+
else:
165+
# ignore unknown items
166+
print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
167+
break
168+
# END make routine call
169+
finally:
170+
# make sure we delete the routine to release the reference as soon
171+
# as possible. Otherwise objects might not be destroyed
172+
# while we are waiting
173+
del(routine)
174+
del(tasktuple)
172175
except StopProcessing:
173176
print self.name, "stops processing" # DEBUG
174177
break
175178
except Exception,e:
176179
print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
177180
continue # just continue
178181
# END routine exception handling
182+
183+
# END handle routine release
179184
# END endless loop
180185

181-
def routine(self):
182-
""":return: routine we are currently executing, or None if we have no task"""
183-
return self._current_routine
184-
185186
def stop_and_join(self):
186187
"""Send stop message to ourselves"""
187188
self.inq.put((self.stop, None))

test/git/async/test_graph.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from git.async.graph import *
44

55
import time
6+
import sys
67

78
class TestGraph(TestBase):
89

@@ -19,7 +20,7 @@ def test_base(self):
1920

2021
# delete unconnected nodes
2122
for n in g.nodes[:]:
22-
g.del_node(n)
23+
g.remove_node(n)
2324
# END del nodes
2425

2526
# add a chain of connected nodes
@@ -54,8 +55,8 @@ def test_base(self):
5455

5556
# deleting a connected node clears its neighbour connections
5657
assert n3.in_nodes[0] is n2
57-
assert g.del_node(n2) is g
58-
assert g.del_node(n2) is g # multi-deletion okay
58+
assert g.remove_node(n2) is g
59+
assert g.remove_node(n2) is g # multi-deletion okay
5960
assert len(g.nodes) == nn - 1
6061
assert len(n3.in_nodes) == 0
6162
assert len(n1.out_nodes) == 0
@@ -68,3 +69,12 @@ def test_base(self):
6869
assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1
6970

7071

72+
# test cleanup
73+
# its at least kept by its graph
74+
assert sys.getrefcount(end) > 3
75+
del(g)
76+
del(n1); del(n2); del(n3)
77+
del(dfirst_nodes)
78+
del(last)
79+
del(n)
80+
assert sys.getrefcount(end) == 2

0 commit comments

Comments
 (0)