1
1
from graph import Node
2
+ from channel import WChannel
2
3
from util import ReadOnly
3
4
4
5
import threading
@@ -11,8 +12,8 @@ class OutputChannelTask(Node):
11
12
"""Abstracts a named task as part of a set of interdependent tasks, which contains
12
13
additional information on how the task should be queued and processed.
13
14
14
- Results of the item processing are sent to an output channel, which is to be
15
- set by the creator
15
+ Results of the item processing are sent to a write channel, which is to be
16
+ set by the creator using the ``set_wchannel`` method.
16
17
17
18
* **min_count** assures that not less than min_count items will be processed per call.
18
19
* **max_chunksize** assures that multi-threading is happening in smaller chunks. If
@@ -29,10 +30,11 @@ class OutputChannelTask(Node):
29
30
'apply_single' # apply single items even if multiple where read
30
31
)
31
32
32
- def __init__ (self , id , fun , apply_single = True , min_count = None , max_chunksize = 0 ):
33
+ def __init__ (self , id , fun , apply_single = True , min_count = None , max_chunksize = 0 ,
34
+ wchannel = None ):
33
35
Node .__init__ (self , id )
34
36
self ._read = None # to be set by subclasss
35
- self ._out_wc = None # to be set later
37
+ self ._out_wc = wchannel # to be set later
36
38
self ._exc = None
37
39
self ._done = False
38
40
self .fun = fun
@@ -48,13 +50,21 @@ def set_done(self):
48
50
"""Set ourselves to being done, has we have completed the processing"""
49
51
self ._done = True
50
52
51
- def set_wc (self , wc ):
52
- """Set the write channel to the given one
53
- :note: resets it done state in order to allow proper queue handling"""
54
- self ._done = False # TODO : fix this, this is a side-effect
55
- self ._scheduled_items = 0
53
+ def set_wchannel (self , wc ):
54
+ """Set the write channel to the given one"""
56
55
self ._out_wc = wc
57
56
57
+ def wchannel (self ):
58
+ """:return: a proxy to our write channel or None if non is set
59
+ :note: you must not hold a reference to our write channel when the
60
+ task is being processed. This would cause the write channel never
61
+ to be closed as the task will think there is still another instance
62
+ being processed which can close the channel once it is done.
63
+ In the worst case, this will block your reads."""
64
+ if self ._out_wc is None :
65
+ return None
66
+ return self ._out_wc
67
+
58
68
def close (self ):
59
69
"""A closed task will close its channel to assure the readers will wake up
60
70
:note: its safe to call this method multiple times"""
@@ -128,8 +138,10 @@ def process(self, count=0):
128
138
# END handle done state
129
139
130
140
# If we appear to be the only one left with our output channel, and are
131
- # closed ( this could have been set in another thread as well ), make
141
+ # done ( this could have been set in another thread as well ), make
132
142
# sure to close the output channel.
143
+ # Waiting with this to be the last one helps to keep the
144
+ # write-channel writable longer
133
145
# The count is: 1 = wc itself, 2 = first reader channel, + x for every
134
146
# thread having its copy on the stack
135
147
# + 1 for the instance we provide to refcount
@@ -196,10 +208,5 @@ def __init__(self, in_rc, *args, **kwargs):
196
208
OutputChannelTask .__init__ (self , * args , ** kwargs )
197
209
self ._read = in_rc .read
198
210
199
- def process (self , count = 1 ):
200
- # for now, just blindly read our input, could trigger a pool, even
201
- # ours, but why not ? It should be able to handle this
202
- # TODO: remove this method
203
- super (InputChannelTask , self ).process (count )
204
211
#{ Configuration
205
212
0 commit comments