Skip to content

Commit 365fb14

Browse files
committed
task: redesigned write channel access to allow the task creator to set own write channels, possibly some with callbacks installed etc.. Pool.add_task will respect the users choice now, but provide defaults which are optimized for performance
1 parent ea81f14 commit 365fb14

File tree

2 files changed

+30
-20
lines changed

2 files changed

+30
-20
lines changed

lib/git/async/pool.py

+8-5
Original file line numberDiff line numberDiff line change
@@ -388,18 +388,21 @@ def add_task(self, task):
388388
self._taskorder_cache.clear()
389389
self._tasks.add_node(task)
390390

391-
# fix locks - in serial mode, the task does not need real locks
392-
# Additionally, use a non-threadsafe queue
391+
# Use a non-threadsafe queue
393392
# This brings about 15% more performance, but sacrifices thread-safety
394393
# when reading from multiple threads.
395394
if self.size() == 0:
396395
wctype = SerialWChannel
397396
# END improve locks
398397

399-
# setup the tasks channel
400-
wc = wctype()
398+
# setup the tasks channel - respect the task creators choice though
399+
# if it is set.
400+
wc = task.wchannel()
401+
if wc is None:
402+
wc = wctype()
403+
# END create write channel ifunset
401404
rc = RPoolChannel(wc, task, self)
402-
task.set_wc(wc)
405+
task.set_wchannel(wc)
403406
finally:
404407
self._taskgraph_lock.release()
405408
# END sync task addition

lib/git/async/task.py

+22-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from graph import Node
2+
from channel import WChannel
23
from util import ReadOnly
34

45
import threading
@@ -11,8 +12,8 @@ class OutputChannelTask(Node):
1112
"""Abstracts a named task as part of a set of interdependent tasks, which contains
1213
additional information on how the task should be queued and processed.
1314
14-
Results of the item processing are sent to an output channel, which is to be
15-
set by the creator
15+
Results of the item processing are sent to a write channel, which is to be
16+
set by the creator using the ``set_wchannel`` method.
1617
1718
* **min_count** assures that not less than min_count items will be processed per call.
1819
* **max_chunksize** assures that multi-threading is happening in smaller chunks. If
@@ -29,10 +30,11 @@ class OutputChannelTask(Node):
2930
'apply_single' # apply single items even if multiple where read
3031
)
3132

32-
def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0):
33+
def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,
34+
wchannel=None):
3335
Node.__init__(self, id)
3436
self._read = None # to be set by subclasss
35-
self._out_wc = None # to be set later
37+
self._out_wc = wchannel # to be set later
3638
self._exc = None
3739
self._done = False
3840
self.fun = fun
@@ -48,13 +50,21 @@ def set_done(self):
4850
"""Set ourselves to being done, has we have completed the processing"""
4951
self._done = True
5052

51-
def set_wc(self, wc):
52-
"""Set the write channel to the given one
53-
:note: resets it done state in order to allow proper queue handling"""
54-
self._done = False # TODO : fix this, this is a side-effect
55-
self._scheduled_items = 0
53+
def set_wchannel(self, wc):
54+
"""Set the write channel to the given one"""
5655
self._out_wc = wc
5756

57+
def wchannel(self):
58+
""":return: a proxy to our write channel or None if non is set
59+
:note: you must not hold a reference to our write channel when the
60+
task is being processed. This would cause the write channel never
61+
to be closed as the task will think there is still another instance
62+
being processed which can close the channel once it is done.
63+
In the worst case, this will block your reads."""
64+
if self._out_wc is None:
65+
return None
66+
return self._out_wc
67+
5868
def close(self):
5969
"""A closed task will close its channel to assure the readers will wake up
6070
:note: its safe to call this method multiple times"""
@@ -128,8 +138,10 @@ def process(self, count=0):
128138
# END handle done state
129139

130140
# If we appear to be the only one left with our output channel, and are
131-
# closed ( this could have been set in another thread as well ), make
141+
# done ( this could have been set in another thread as well ), make
132142
# sure to close the output channel.
143+
# Waiting with this to be the last one helps to keep the
144+
# write-channel writable longer
133145
# The count is: 1 = wc itself, 2 = first reader channel, + x for every
134146
# thread having its copy on the stack
135147
# + 1 for the instance we provide to refcount
@@ -196,10 +208,5 @@ def __init__(self, in_rc, *args, **kwargs):
196208
OutputChannelTask.__init__(self, *args, **kwargs)
197209
self._read = in_rc.read
198210

199-
def process(self, count=1):
200-
# for now, just blindly read our input, could trigger a pool, even
201-
# ours, but why not ? It should be able to handle this
202-
# TODO: remove this method
203-
super(InputChannelTask, self).process(count)
204211
#{ Configuration
205212

0 commit comments

Comments
 (0)