13
13
14
14
gc .collect ()
15
15
import usocket as socket
16
- from ucollections import deque
17
16
import uasyncio as asyncio
18
17
19
18
gc .collect ()
22
21
import utime
23
22
import machine
24
23
import uerrno as errno
25
- from . import gmid , isnew , SetByte # __init__.py
24
+ from . import gmid , isnew # __init__.py
26
25
from .primitives import launch
26
+ from .primitives .queue import Queue , QueueFull
27
27
gc .collect ()
28
28
from micropython import const
29
29
34
34
getmid = gmid ()
35
35
gc .collect ()
36
36
37
+ # Minimal implementation of set for integers in range 0-255
38
+ # Asynchronous version has efficient wait_empty and has_not methods
39
+ # based on Events rather than polling.
40
+
41
+
42
+ class ASetByte :
43
+ def __init__ (self ):
44
+ self ._ba = bytearray (32 )
45
+ self ._eve = asyncio .Event ()
46
+ self ._eve .set () # Empty event initially set
47
+ self ._evdis = asyncio .Event () # Discard event
48
+
49
+ def __bool__ (self ):
50
+ return any (self ._ba )
51
+
52
+ def __contains__ (self , i ):
53
+ return (self ._ba [i >> 3 ] & 1 << (i & 7 )) > 0
54
+
55
+ def add (self , i ):
56
+ self ._eve .clear ()
57
+ self ._ba [i >> 3 ] |= 1 << (i & 7 )
58
+
59
+ def discard (self , i ):
60
+ self ._ba [i >> 3 ] &= ~ (1 << (i & 7 ))
61
+ self ._evdis .set ()
62
+ if not any (self ._ba ):
63
+ self ._eve .set ()
64
+
65
+ async def wait_empty (self ): # Pause until empty
66
+ await self ._eve .wait ()
67
+
68
+ async def has_not (self , i ): # Pause until i not in set
69
+ while i in self :
70
+ await self ._evdis .wait () # Pause until something is discarded
71
+ self ._evdis .clear ()
72
+
37
73
38
74
class Client :
39
75
def __init__ (self , my_id , server , port = 8123 ,
@@ -46,7 +82,6 @@ def __init__(self, my_id, server, port=8123,
46
82
self ._pw = pw
47
83
self ._port = port
48
84
self ._to = timeout # Client and server timeout
49
- self ._tim_short = timeout // 10
50
85
self ._tim_ka = timeout // 4 # Keepalive interval
51
86
self ._concb = conn_cb
52
87
self ._concbargs = () if conn_cb_args is None else conn_cb_args
@@ -99,38 +134,35 @@ def inner(feed=WDT_CB):
99
134
gc .collect ()
100
135
if platform == 'esp8266' :
101
136
import esp
102
- esp .sleep_type (esp .SLEEP_NONE ) # Improve connection integrity at cost of power consumption.
137
+ # Improve connection integrity at cost of power consumption.
138
+ esp .sleep_type (esp .SLEEP_NONE )
103
139
104
- self ._evfail = asyncio .Event ()
140
+ self ._evfail = asyncio .Event () # Set by any comms failure
141
+ self ._evok = asyncio .Event () # Set by 1st successful read
105
142
self ._s_lock = asyncio .Lock () # For internal send conflict.
106
143
self ._last_wr = utime .ticks_ms ()
107
- self ._lineq = deque ((), 20 , True ) # 20 entries, throw on overflow
144
+ self ._lineq = Queue ( 20 ) # 20 entries
108
145
self .connects = 0 # Connect count for test purposes/app access
109
146
self ._sock = None
110
- self ._ok = False # Set after 1st successful read
111
- self ._acks_pend = SetByte () # ACKs which are expected to be received
147
+ self ._acks_pend = ASetByte () # ACKs which are expected to be received
112
148
gc .collect ()
113
149
asyncio .create_task (self ._run ())
114
150
115
151
# **** API ****
116
152
def __iter__ (self ): # Await a connection
117
- while not self ():
118
- yield from asyncio .sleep_ms (self ._tim_short ) # V3 note: this syntax works.
153
+ yield from self ._evok .wait () # V3 note: this syntax works.
119
154
120
155
def status (self ):
121
- return self ._ok
156
+ return self ._evok . is_set ()
122
157
123
158
__call__ = status
124
159
125
160
async def readline (self ):
126
- while not self ._lineq :
127
- await asyncio .sleep (0 )
128
- return self ._lineq .popleft ()
161
+ return await self ._lineq .get ()
129
162
130
163
async def write (self , buf , qos = True , wait = True ):
131
164
if qos and wait : # Disallow concurrent writes
132
- while self ._acks_pend :
133
- await asyncio .sleep_ms (50 )
165
+ await self ._acks_pend .wait_empty ()
134
166
# Prepend message ID to a copy of buf
135
167
fstr = '{:02x}{}' if buf .endswith ('\n ' ) else '{:02x}{}\n '
136
168
mid = next (getmid )
@@ -182,29 +214,28 @@ async def _write(self, line):
182
214
while True :
183
215
# After an outage wait until something is received from server
184
216
# before we send.
185
- await self
217
+ await self . _evok . wait ()
186
218
if await self ._send (line ):
187
219
return
188
220
189
- # send fail. _send has triggered _evfail. Await response .
190
- while self ():
191
- await asyncio . sleep_ms ( self ._tim_short )
221
+ # send fail. _send has triggered _evfail. .run clears _evok .
222
+ await asyncio . sleep_ms ( 0 ) # Ensure .run is scheduled
223
+ assert not self ._evok . is_set () # TEST
192
224
193
225
# Handle qos. Retransmit until matching ACK received.
194
226
# ACKs typically take 200-400ms to arrive.
195
227
async def _do_qos (self , mid , line ):
196
228
while True :
197
229
# Wait for any outage to clear
198
- await self
230
+ await self . _evok . wait ()
199
231
# Wait for the matching ACK.
200
- tstart = utime .ticks_ms ()
201
- while utime .ticks_diff (utime .ticks_ms (), tstart ) < self ._to :
202
- await asyncio .sleep_ms (self ._tim_short )
203
- if mid not in self ._acks_pend :
204
- return # ACK was received
205
- # ACK was not received. Re-send.
206
- await self ._write (line )
207
- self ._verbose and print ('Repeat' , line , 'to server app' )
232
+ try :
233
+ await asyncio .wait_for_ms (self ._acks_pend .has_not (mid ), self ._to )
234
+ except asyncio .TimeoutError : # Ack was not received - re-send
235
+ await self ._write (line )
236
+ self ._verbose and print ('Repeat' , line , 'to server app' )
237
+ else :
238
+ return # Got ack
208
239
209
240
# Make an attempt to connect to WiFi. May not succeed.
210
241
async def _connect (self , s ):
@@ -246,7 +277,8 @@ async def _run(self):
246
277
self ._sock = socket .socket ()
247
278
self ._evfail .clear ()
248
279
try :
249
- serv = socket .getaddrinfo (self ._server , self ._port )[0 ][- 1 ] # server read
280
+ serv = socket .getaddrinfo (self ._server , self ._port )[
281
+ 0 ][- 1 ] # server read
250
282
# If server is down OSError e.args[0] = 111 ECONNREFUSED
251
283
self ._sock .connect (serv )
252
284
except OSError as e :
@@ -266,7 +298,7 @@ async def _run(self):
266
298
# apps might need to know connection to the server acquired
267
299
launch (self ._concb , True , * self ._concbargs )
268
300
await self ._evfail .wait () # Pause until something goes wrong
269
- self ._ok = False
301
+ self ._evok . clear ()
270
302
tsk_reader .cancel ()
271
303
tsk_ka .cancel ()
272
304
await asyncio .sleep_ms (0 ) # wait for cancellation
@@ -281,7 +313,8 @@ async def _run(self):
281
313
self ._close () # Close socket but not wdt
282
314
s .disconnect ()
283
315
self ._feed (0 )
284
- await asyncio .sleep_ms (self ._to * 2 ) # Ensure server detects outage
316
+ # Ensure server detects outage
317
+ await asyncio .sleep_ms (self ._to * 2 )
285
318
while s .isconnected ():
286
319
await asyncio .sleep (1 )
287
320
@@ -308,8 +341,8 @@ async def _reader(self): # Entry point is after a (re) connect.
308
341
isnew (- 1 ) # Clear down rx message record
309
342
if isnew (mid ):
310
343
try :
311
- self ._lineq .append (line [2 :].decode ())
312
- except IndexError :
344
+ self ._lineq .put_nowait (line [2 :].decode ())
345
+ except QueueFull :
313
346
self ._verbose and print ('_reader fail. Overflow.' )
314
347
self ._evfail .set ()
315
348
return
@@ -321,7 +354,8 @@ async def _sendack(self, mid):
321
354
322
355
async def _keepalive (self ):
323
356
while True :
324
- due = self ._tim_ka - utime .ticks_diff (utime .ticks_ms (), self ._last_wr )
357
+ due = self ._tim_ka - \
358
+ utime .ticks_diff (utime .ticks_ms (), self ._last_wr )
325
359
if due <= 0 :
326
360
# error sets ._evfail, .run cancels this coro
327
361
await self ._send (b'\n ' )
@@ -337,7 +371,7 @@ async def _readline(self, to):
337
371
start = utime .ticks_ms ()
338
372
while True :
339
373
if line .endswith (b'\n ' ):
340
- self ._ok = True # Got at least 1 packet after an outage.
374
+ self ._evok . set () # Got at least 1 packet after an outage.
341
375
if len (line ) > 1 :
342
376
return line
343
377
# Got a keepalive: discard, reset timers, toggle LED.
0 commit comments