21
21
mkchannel ,
22
22
WChannel ,
23
23
SerialWChannel ,
24
- RChannel
24
+ CallbackRChannel
25
25
)
26
26
27
27
import sys
28
28
from time import sleep
29
29
30
30
31
- class RPoolChannel (RChannel ):
31
+ class RPoolChannel (CallbackRChannel ):
32
32
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
33
33
before and after an item is to be read.
34
34
35
35
It acts like a handle to the underlying task in the pool."""
36
- __slots__ = ('_task' , '_pool' , '_pre_cb' , '_post_cb' )
36
+ __slots__ = ('_task' , '_pool' )
37
37
38
38
def __init__ (self , wchannel , task , pool ):
39
- RChannel .__init__ (self , wchannel )
39
+ CallbackRChannel .__init__ (self , wchannel )
40
40
self ._task = task
41
41
self ._pool = pool
42
- self ._pre_cb = None
43
- self ._post_cb = None
44
42
45
43
def __del__ (self ):
46
44
"""Assures that our task will be deleted if we were the last reader"""
@@ -56,30 +54,10 @@ def __del__(self):
56
54
self ._pool .remove_task (self ._task )
57
55
# END handle refcount based removal of task
58
56
59
- def set_pre_cb (self , fun = lambda count : None ):
60
- """Install a callback to call with the item count to be read before any
61
- item is actually read from the channel. The call must be threadsafe if
62
- the channel is passed to more than one tasks.
63
- If it fails, the read will fail with an IOError
64
- If a function is not provided, the call is effectively uninstalled."""
65
- self ._pre_cb = fun
66
-
67
- def set_post_cb (self , fun = lambda item : item ):
68
- """Install a callback to call after the items were read. The function
69
- returns a possibly changed item list.The call must be threadsafe if
70
- the channel is passed to more than one tasks.
71
- If it raises, the exception will be propagated.
72
- If a function is not provided, the call is effectively uninstalled."""
73
- self ._post_cb = fun
74
-
75
57
def read (self , count = 0 , block = True , timeout = None ):
76
58
"""Read an item that was processed by one of our threads
77
59
:note: Triggers task dependency handling needed to provide the necessary
78
60
input"""
79
- if self ._pre_cb :
80
- self ._pre_cb ()
81
- # END pre callback
82
-
83
61
# NOTE: we always queue the operation that would give us count items
84
62
# as tracking the scheduled items or testing the channels size
85
63
# is in herently unsafe depending on the design of the task network
@@ -90,7 +68,7 @@ def read(self, count=0, block=True, timeout=None):
90
68
91
69
# NOTE: TODO: that case is only possible if one Task could be connected
92
70
# to multiple input channels in a manner known by the system. Currently
93
- # this is not possible, but should be implemented at some point
71
+ # this is not possible, but should be implemented at some point.
94
72
95
73
# if the user tries to use us to read from a done task, we will never
96
74
# compute as all produced items are already in the channel
@@ -105,25 +83,12 @@ def read(self, count=0, block=True, timeout=None):
105
83
####### read data ########
106
84
##########################
107
85
# read actual items, tasks were setup to put their output into our channel ( as well )
108
- items = RChannel .read (self , count , block , timeout )
86
+ items = CallbackRChannel .read (self , count , block , timeout )
109
87
##########################
110
88
111
- if self ._post_cb :
112
- items = self ._post_cb (items )
113
-
114
-
115
- ####### Finalize ########
116
- self ._pool ._post_channel_read (self ._task )
117
89
118
90
return items
119
91
120
- #{ Internal
121
- def _read (self , count = 0 , block = False , timeout = None ):
122
- """Calls the underlying channel's read directly, without triggering
123
- the pool"""
124
- return RChannel .read (self , count , block , timeout )
125
-
126
- #} END internal
127
92
128
93
129
94
class Pool (object ):
@@ -296,10 +261,6 @@ def _prepare_channel_read(self, task, count):
296
261
# END for each task to process
297
262
298
263
299
- def _post_channel_read (self , task ):
300
- """Called after we processed a read to cleanup"""
301
- pass
302
-
303
264
def _remove_task_if_orphaned (self , task ):
304
265
"""Check the task, and delete it if it is orphaned"""
305
266
# 1 as its stored on the task, 1 for the getrefcount call
0 commit comments