Skip to content

Commit ea81f14

Browse files
committed
Channel: Callbacks reviewed - they are now part of Subclasses of the default channel implementation, one of which is used as base by the Pool Read channel, releasing it of the duty to call these itself. The write channel with callback subclass allows the transformation of the item to be written
1 parent 07996a1 commit ea81f14

File tree

3 files changed

+84
-45
lines changed

3 files changed

+84
-45
lines changed

lib/git/async/channel.py

+51
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,32 @@ def closed(self):
6868
#} END interface
6969

7070

71+
class CallbackWChannel(WChannel):
72+
"""The write end of a channel which allows you to setup a callback to be
73+
called after an item was written to the channel"""
74+
__slots__ = ('_pre_cb')
75+
76+
def __init__(self):
77+
WChannel.__init__(self)
78+
self._pre_cb = None
79+
80+
def set_pre_cb(self, fun = lambda item: item):
81+
"""Install a callback to be called before the given item is written.
82+
It returns a possibly altered item which will be written to the channel
83+
instead, making it useful for pre-write item conversions.
84+
Providing None uninstalls the current method.
85+
:return: the previously installed function or None
86+
:note: Must be thread-safe if the channel is used in multiple threads"""
87+
prev = self._pre_cb
88+
self._pre_cb = fun
89+
return prev
90+
91+
def write(self, item, block=True, timeout=None):
92+
if self._pre_cb:
93+
item = self._pre_cb(item)
94+
WChannel.write(self, item, block, timeout)
95+
96+
7197
class SerialWChannel(WChannel):
7298
"""A slightly faster version of a WChannel, which sacrificed thead-safety for
7399
performance"""
@@ -171,7 +197,32 @@ def read(self, count=0, block=True, timeout=None):
171197
return out
172198

173199
#} END interface
200+
201+
class CallbackRChannel(RChannel):
202+
"""A channel which sends a callback before items are read from the channel"""
203+
__slots__ = "_pre_cb"
204+
205+
def __init__(self, wc):
206+
RChannel.__init__(self, wc)
207+
self._pre_cb = None
208+
209+
def set_pre_cb(self, fun = lambda count: None):
210+
"""Install a callback to call with the item count to be read before any
211+
item is actually read from the channel.
212+
Exceptions will be propagated.
213+
If a function is not provided, the call is effectively uninstalled.
214+
:return: the previously installed callback or None
215+
:note: The callback must be threadsafe if the channel is used by multiple threads."""
216+
prev = self._pre_cb
217+
self._pre_cb = fun
218+
return prev
174219

220+
def read(self, count=0, block=True, timeout=None):
221+
if self._pre_cb:
222+
self._pre_cb(count)
223+
return RChannel.read(self, count, block, timeout)
224+
225+
175226
#} END classes
176227

177228
#{ Constructors

lib/git/async/pool.py

+6-45
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,24 @@
2121
mkchannel,
2222
WChannel,
2323
SerialWChannel,
24-
RChannel
24+
CallbackRChannel
2525
)
2626

2727
import sys
2828
from time import sleep
2929

3030

31-
class RPoolChannel(RChannel):
31+
class RPoolChannel(CallbackRChannel):
3232
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
3333
before and after an item is to be read.
3434
3535
It acts like a handle to the underlying task in the pool."""
36-
__slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
36+
__slots__ = ('_task', '_pool')
3737

3838
def __init__(self, wchannel, task, pool):
39-
RChannel.__init__(self, wchannel)
39+
CallbackRChannel.__init__(self, wchannel)
4040
self._task = task
4141
self._pool = pool
42-
self._pre_cb = None
43-
self._post_cb = None
4442

4543
def __del__(self):
4644
"""Assures that our task will be deleted if we were the last reader"""
@@ -56,30 +54,10 @@ def __del__(self):
5654
self._pool.remove_task(self._task)
5755
# END handle refcount based removal of task
5856

59-
def set_pre_cb(self, fun = lambda count: None):
60-
"""Install a callback to call with the item count to be read before any
61-
item is actually read from the channel. The call must be threadsafe if
62-
the channel is passed to more than one tasks.
63-
If it fails, the read will fail with an IOError
64-
If a function is not provided, the call is effectively uninstalled."""
65-
self._pre_cb = fun
66-
67-
def set_post_cb(self, fun = lambda item: item):
68-
"""Install a callback to call after the items were read. The function
69-
returns a possibly changed item list.The call must be threadsafe if
70-
the channel is passed to more than one tasks.
71-
If it raises, the exception will be propagated.
72-
If a function is not provided, the call is effectively uninstalled."""
73-
self._post_cb = fun
74-
7557
def read(self, count=0, block=True, timeout=None):
7658
"""Read an item that was processed by one of our threads
7759
:note: Triggers task dependency handling needed to provide the necessary
7860
input"""
79-
if self._pre_cb:
80-
self._pre_cb()
81-
# END pre callback
82-
8361
# NOTE: we always queue the operation that would give us count items
8462
# as tracking the scheduled items or testing the channels size
8563
# is in herently unsafe depending on the design of the task network
@@ -90,7 +68,7 @@ def read(self, count=0, block=True, timeout=None):
9068

9169
# NOTE: TODO: that case is only possible if one Task could be connected
9270
# to multiple input channels in a manner known by the system. Currently
93-
# this is not possible, but should be implemented at some point
71+
# this is not possible, but should be implemented at some point.
9472

9573
# if the user tries to use us to read from a done task, we will never
9674
# compute as all produced items are already in the channel
@@ -105,25 +83,12 @@ def read(self, count=0, block=True, timeout=None):
10583
####### read data ########
10684
##########################
10785
# read actual items, tasks were setup to put their output into our channel ( as well )
108-
items = RChannel.read(self, count, block, timeout)
86+
items = CallbackRChannel.read(self, count, block, timeout)
10987
##########################
11088

111-
if self._post_cb:
112-
items = self._post_cb(items)
113-
114-
115-
####### Finalize ########
116-
self._pool._post_channel_read(self._task)
11789

11890
return items
11991

120-
#{ Internal
121-
def _read(self, count=0, block=False, timeout=None):
122-
"""Calls the underlying channel's read directly, without triggering
123-
the pool"""
124-
return RChannel.read(self, count, block, timeout)
125-
126-
#} END internal
12792

12893

12994
class Pool(object):
@@ -296,10 +261,6 @@ def _prepare_channel_read(self, task, count):
296261
# END for each task to process
297262

298263

299-
def _post_channel_read(self, task):
300-
"""Called after we processed a read to cleanup"""
301-
pass
302-
303264
def _remove_task_if_orphaned(self, task):
304265
"""Check the task, and delete it if it is orphaned"""
305266
# 1 as its stored on the task, 1 for the getrefcount call

test/git/async/test_channel.py

+27
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,30 @@ def test_base(self):
4444
assert len(rc.read(5)) == 0
4545
assert len(rc.read(1)) == 0
4646

47+
48+
# test callback channels
49+
wc, rc = mkchannel(wctype = CallbackWChannel, rctype = CallbackRChannel)
50+
51+
cb = [0, 0] # set slots to one if called
52+
def pre_write(item):
53+
cb[0] = 1
54+
return item + 1
55+
def pre_read(count):
56+
cb[1] = 1
57+
58+
# set, verify it returns previous one
59+
assert wc.set_pre_cb(pre_write) is None
60+
assert rc.set_pre_cb(pre_read) is None
61+
assert wc.set_pre_cb(pre_write) is pre_write
62+
assert rc.set_pre_cb(pre_read) is pre_read
63+
64+
# writer transforms input
65+
val = 5
66+
wc.write(val)
67+
assert cb[0] == 1 and cb[1] == 0
68+
69+
rval = rc.read(1)[0] # read one item, must not block
70+
assert cb[0] == 1 and cb[1] == 1
71+
assert rval == val + 1
72+
73+

0 commit comments

Comments
 (0)