@@ -21,61 +21,57 @@ class Channel(object):
21
21
If the channel is closed, any read operation will result in an exception
22
22
23
23
This base class is not instantiated directly, but instead serves as constructor
24
- for RWChannel pairs.
24
+ for Rwriter pairs.
25
25
26
26
Create a new channel """
27
- __slots__ = tuple ()
28
-
29
-
30
- class WChannel (Channel ):
31
- """The write end of a channel - it is thread-safe"""
32
- __slots__ = ('_queue' )
27
+ __slots__ = 'queue'
33
28
34
29
# The queue to use to store the actual data
35
30
QueueCls = AsyncQueue
36
31
37
32
def __init__ (self ):
38
- """initialize this instance, able to hold max_items at once
39
- Write calls will block if the channel is full, until someone reads from it"""
40
- self ._queue = self .QueueCls ()
41
-
42
- #{ Interface
43
- def write (self , item , block = True , timeout = None ):
44
- """Send an item into the channel, it can be read from the read end of the
45
- channel accordingly
46
- :param item: Item to send
47
- :param block: If True, the call will block until there is free space in the
48
- channel
49
- :param timeout: timeout in seconds for blocking calls.
50
- :raise ReadOnly: when writing into closed channel"""
51
- # let the queue handle the 'closed' attribute, we write much more often
52
- # to an open channel than to a closed one, saving a few cycles
53
- self ._queue .put (item , block , timeout )
54
-
33
+ """initialize this instance with a queue holding the channel contents"""
34
+ self .queue = self .QueueCls ()
35
+
36
+
37
+ class SerialChannel (Channel ):
38
+ """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
39
+ QueueCls = SyncQueue
40
+
41
+
42
+ class Writer (object ):
43
+ """The write end of a channel, a file-like interface for a channel"""
44
+ __slots__ = ('write' , 'channel' )
45
+
46
+ def __init__ (self , channel ):
47
+ """Initialize the writer to use the given channel"""
48
+ self .channel = channel
49
+ self .write = channel .queue .put
50
+
51
+ #{ Interface
55
52
def size (self ):
56
- """:return: approximate number of items that could be read from the read-ends
57
- of this channel"""
58
- return self ._queue .qsize ()
53
+ return self .channel .queue .qsize ()
59
54
60
55
def close (self ):
61
56
"""Close the channel. Multiple close calls on a closed channel are no
62
57
an error"""
63
- self ._queue .set_writable (False )
58
+ self .channel . queue .set_writable (False )
64
59
65
60
def closed (self ):
66
61
""":return: True if the channel was closed"""
67
- return not self ._queue .writable ()
62
+ return not self .channel . queue .writable ()
68
63
#} END interface
69
64
70
65
71
- class CallbackWChannel ( WChannel ):
66
+ class CallbackWriter ( Writer ):
72
67
"""The write end of a channel which allows you to setup a callback to be
73
68
called after an item was written to the channel"""
74
69
__slots__ = ('_pre_cb' )
75
70
76
- def __init__ (self ):
77
- WChannel .__init__ (self )
71
+ def __init__ (self , channel ):
72
+ Writer .__init__ (self , channel )
78
73
self ._pre_cb = None
74
+ self .write = self ._write
79
75
80
76
def set_pre_cb (self , fun = lambda item : item ):
81
77
"""Install a callback to be called before the given item is written.
@@ -88,25 +84,19 @@ def set_pre_cb(self, fun = lambda item: item):
88
84
self ._pre_cb = fun
89
85
return prev
90
86
91
- def write (self , item , block = True , timeout = None ):
87
+ def _write (self , item , block = True , timeout = None ):
92
88
if self ._pre_cb :
93
89
item = self ._pre_cb (item )
94
- WChannel . write ( self , item , block , timeout )
90
+ self . channel . queue . put ( item , block , timeout )
95
91
96
-
97
- class SerialWChannel (WChannel ):
98
- """A slightly faster version of a WChannel, which sacrificed thead-safety for
99
- performance"""
100
- QueueCls = SyncQueue
101
-
102
92
103
- class RChannel ( Channel ):
104
- """The read-end of a corresponding write channel"""
105
- __slots__ = '_wc '
93
+ class Reader ( object ):
94
+ """Allows reading from a channel"""
95
+ __slots__ = 'channel '
106
96
107
- def __init__ (self , wchannel ):
97
+ def __init__ (self , channel ):
108
98
"""Initialize this instance from its parent write channel"""
109
- self ._wc = wchannel
99
+ self .channel = channel
110
100
111
101
112
102
#{ Interface
@@ -135,7 +125,7 @@ def read(self, count=0, block=True, timeout=None):
135
125
136
126
# in non-blocking mode, its all not a problem
137
127
out = list ()
138
- queue = self ._wc . _queue
128
+ queue = self .channel . queue
139
129
if not block :
140
130
# be as fast as possible in non-blocking mode, hence
141
131
# its a bit 'unrolled'
@@ -198,12 +188,12 @@ def read(self, count=0, block=True, timeout=None):
198
188
199
189
#} END interface
200
190
201
- class CallbackRChannel ( RChannel ):
191
+ class CallbackReader ( Reader ):
202
192
"""A channel which sends a callback before items are read from the channel"""
203
193
__slots__ = "_pre_cb"
204
194
205
- def __init__ (self , wc ):
206
- RChannel .__init__ (self , wc )
195
+ def __init__ (self , channel ):
196
+ Reader .__init__ (self , channel )
207
197
self ._pre_cb = None
208
198
209
199
def set_pre_cb (self , fun = lambda count : None ):
@@ -220,18 +210,20 @@ def set_pre_cb(self, fun = lambda count: None):
220
210
def read (self , count = 0 , block = True , timeout = None ):
221
211
if self ._pre_cb :
222
212
self ._pre_cb (count )
223
- return RChannel .read (self , count , block , timeout )
213
+ return Reader .read (self , count , block , timeout )
224
214
225
215
226
216
#} END classes
227
217
228
218
#{ Constructors
229
- def mkchannel (wctype = WChannel , rctype = RChannel ):
230
- """Create a channel, which consists of one write end and one read end
231
- :return: tuple(write_channel, read_channel)
219
+ def mkchannel (ctype = Channel , wtype = Writer , rtype = Reader ):
220
+ """Create a channel, with a reader and a writer
221
+ :return: tuple(reader, writer)
222
+ :param ctype: Channel to instantiate
232
223
:param wctype: The type of the write channel to instantiate
233
224
:param rctype: The type of the read channel to instantiate"""
234
- wc = wctype ()
235
- rc = rctype (wc )
225
+ c = ctype ()
226
+ wc = wtype (c )
227
+ rc = rtype (c )
236
228
return wc , rc
237
229
#} END constructors
0 commit comments