Skip to content

Commit f606937

Browse files
committed
Merge branch 'taskdep' into async
2 parents 257a8a9 + 18e3252 commit f606937

File tree

10 files changed

+672
-359
lines changed

10 files changed

+672
-359
lines changed

lib/git/async/channel.py

+47-55
Original file line numberDiff line numberDiff line change
@@ -21,61 +21,57 @@ class Channel(object):
2121
If the channel is closed, any read operation will result in an exception
2222
2323
This base class is not instantiated directly, but instead serves as constructor
24-
for RWChannel pairs.
24+
for Rwriter pairs.
2525
2626
Create a new channel """
27-
__slots__ = tuple()
28-
29-
30-
class WChannel(Channel):
31-
"""The write end of a channel - it is thread-safe"""
32-
__slots__ = ('_queue')
27+
__slots__ = 'queue'
3328

3429
# The queue to use to store the actual data
3530
QueueCls = AsyncQueue
3631

3732
def __init__(self):
38-
"""initialize this instance, able to hold max_items at once
39-
Write calls will block if the channel is full, until someone reads from it"""
40-
self._queue = self.QueueCls()
41-
42-
#{ Interface
43-
def write(self, item, block=True, timeout=None):
44-
"""Send an item into the channel, it can be read from the read end of the
45-
channel accordingly
46-
:param item: Item to send
47-
:param block: If True, the call will block until there is free space in the
48-
channel
49-
:param timeout: timeout in seconds for blocking calls.
50-
:raise ReadOnly: when writing into closed channel"""
51-
# let the queue handle the 'closed' attribute, we write much more often
52-
# to an open channel than to a closed one, saving a few cycles
53-
self._queue.put(item, block, timeout)
54-
33+
"""initialize this instance with a queue holding the channel contents"""
34+
self.queue = self.QueueCls()
35+
36+
37+
class SerialChannel(Channel):
38+
"""A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
39+
QueueCls = SyncQueue
40+
41+
42+
class Writer(object):
43+
"""The write end of a channel, a file-like interface for a channel"""
44+
__slots__ = ('write', 'channel')
45+
46+
def __init__(self, channel):
47+
"""Initialize the writer to use the given channel"""
48+
self.channel = channel
49+
self.write = channel.queue.put
50+
51+
#{ Interface
5552
def size(self):
56-
""":return: approximate number of items that could be read from the read-ends
57-
of this channel"""
58-
return self._queue.qsize()
53+
return self.channel.queue.qsize()
5954

6055
def close(self):
6156
"""Close the channel. Multiple close calls on a closed channel are no
6257
an error"""
63-
self._queue.set_writable(False)
58+
self.channel.queue.set_writable(False)
6459

6560
def closed(self):
6661
""":return: True if the channel was closed"""
67-
return not self._queue.writable()
62+
return not self.channel.queue.writable()
6863
#} END interface
6964

7065

71-
class CallbackWChannel(WChannel):
66+
class CallbackWriter(Writer):
7267
"""The write end of a channel which allows you to setup a callback to be
7368
called after an item was written to the channel"""
7469
__slots__ = ('_pre_cb')
7570

76-
def __init__(self):
77-
WChannel.__init__(self)
71+
def __init__(self, channel):
72+
Writer.__init__(self, channel)
7873
self._pre_cb = None
74+
self.write = self._write
7975

8076
def set_pre_cb(self, fun = lambda item: item):
8177
"""Install a callback to be called before the given item is written.
@@ -88,25 +84,19 @@ def set_pre_cb(self, fun = lambda item: item):
8884
self._pre_cb = fun
8985
return prev
9086

91-
def write(self, item, block=True, timeout=None):
87+
def _write(self, item, block=True, timeout=None):
9288
if self._pre_cb:
9389
item = self._pre_cb(item)
94-
WChannel.write(self, item, block, timeout)
90+
self.channel.queue.put(item, block, timeout)
9591

96-
97-
class SerialWChannel(WChannel):
98-
"""A slightly faster version of a WChannel, which sacrificed thead-safety for
99-
performance"""
100-
QueueCls = SyncQueue
101-
10292

103-
class RChannel(Channel):
104-
"""The read-end of a corresponding write channel"""
105-
__slots__ = '_wc'
93+
class Reader(object):
94+
"""Allows reading from a channel"""
95+
__slots__ = 'channel'
10696

107-
def __init__(self, wchannel):
97+
def __init__(self, channel):
10898
"""Initialize this instance from its parent write channel"""
109-
self._wc = wchannel
99+
self.channel = channel
110100

111101

112102
#{ Interface
@@ -135,7 +125,7 @@ def read(self, count=0, block=True, timeout=None):
135125

136126
# in non-blocking mode, its all not a problem
137127
out = list()
138-
queue = self._wc._queue
128+
queue = self.channel.queue
139129
if not block:
140130
# be as fast as possible in non-blocking mode, hence
141131
# its a bit 'unrolled'
@@ -198,12 +188,12 @@ def read(self, count=0, block=True, timeout=None):
198188

199189
#} END interface
200190

201-
class CallbackRChannel(RChannel):
191+
class CallbackReader(Reader):
202192
"""A channel which sends a callback before items are read from the channel"""
203193
__slots__ = "_pre_cb"
204194

205-
def __init__(self, wc):
206-
RChannel.__init__(self, wc)
195+
def __init__(self, channel):
196+
Reader.__init__(self, channel)
207197
self._pre_cb = None
208198

209199
def set_pre_cb(self, fun = lambda count: None):
@@ -220,18 +210,20 @@ def set_pre_cb(self, fun = lambda count: None):
220210
def read(self, count=0, block=True, timeout=None):
221211
if self._pre_cb:
222212
self._pre_cb(count)
223-
return RChannel.read(self, count, block, timeout)
213+
return Reader.read(self, count, block, timeout)
224214

225215

226216
#} END classes
227217

228218
#{ Constructors
229-
def mkchannel(wctype = WChannel, rctype = RChannel):
230-
"""Create a channel, which consists of one write end and one read end
231-
:return: tuple(write_channel, read_channel)
219+
def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader):
220+
"""Create a channel, with a reader and a writer
221+
:return: tuple(reader, writer)
222+
:param ctype: Channel to instantiate
232223
:param wctype: The type of the write channel to instantiate
233224
:param rctype: The type of the read channel to instantiate"""
234-
wc = wctype()
235-
rc = rctype(wc)
225+
c = ctype()
226+
wc = wtype(c)
227+
rc = rtype(c)
236228
return wc, rc
237229
#} END constructors

lib/git/async/graph.py

+25-12
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,24 @@ class Graph(object):
2525

2626
def __init__(self):
2727
self.nodes = list()
28+
29+
def __del__(self):
30+
"""Deletes bidericational dependencies"""
31+
for node in self.nodes:
32+
node.in_nodes = None
33+
node.out_nodes = None
34+
# END cleanup nodes
35+
36+
# otherwise the nodes would keep floating around
2837

38+
2939
def add_node(self, node):
3040
"""Add a new node to the graph
3141
:return: the newly added node"""
3242
self.nodes.append(node)
3343
return node
3444

35-
def del_node(self, node):
45+
def remove_node(self, node):
3646
"""Delete a node from the graph
3747
:return: self"""
3848
try:
@@ -46,6 +56,8 @@ def del_node(self, node):
4656
del(outn.in_nodes[outn.in_nodes.index(node)])
4757
for inn in node.in_nodes:
4858
del(inn.out_nodes[inn.out_nodes.index(node)])
59+
node.out_nodes = list()
60+
node.in_nodes = list()
4961
return self
5062

5163
def add_edge(self, u, v):
@@ -87,25 +99,26 @@ def add_edge(self, u, v):
8799

88100
return self
89101

90-
def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ):
91-
"""Visit all input nodes of the given node, depth first, calling visitor
92-
for each node on our way. If the function returns False, the traversal
93-
will not go any deeper, but continue at the next branch
94-
It will return the actual input node in the end !"""
95-
nodes = node.in_nodes[:]
102+
def input_inclusive_dfirst_reversed(self, node):
103+
"""Return all input nodes of the given node, depth first,
104+
It will return the actual input node last, as it is required
105+
like that by the pool"""
106+
stack = [node]
96107
seen = set()
97108

98109
# depth first
99-
while nodes:
100-
n = nodes.pop()
110+
out = list()
111+
while stack:
112+
n = stack.pop()
101113
if n in seen:
102114
continue
103115
seen.add(n)
116+
out.append(n)
104117

105118
# only proceed in that direction if visitor is fine with it
106-
if visitor(n):
107-
nodes.extend(n.in_nodes)
119+
stack.extend(n.in_nodes)
108120
# END call visitor
109121
# END while walking
110-
visitor(node)
122+
out.reverse()
123+
return out
111124

0 commit comments

Comments
 (0)