Skip to content

Commit 62d2bc4

Browse files
committed
client.py Use Event and Queue to replace busy-wait.
1 parent b1124c6 commit 62d2bc4

File tree

4 files changed

+142
-53
lines changed

4 files changed

+142
-53
lines changed

iot/__init__.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,3 @@ def isnew(mid, lst=bytearray(32)):
2727
lst[idx] |= bit
2828
lst[(idx + 16 & 0x1f)] = 0
2929
return res
30-
31-
# Minimal implementation of set for integers in range 0-255
32-
class SetByte:
33-
def __init__(self):
34-
self._ba = bytearray(32)
35-
36-
def __bool__(self):
37-
return any(self._ba)
38-
39-
def __contains__(self, i):
40-
return (self._ba[i >> 3] & 1 << (i & 7)) > 0
41-
42-
def discard(self, i):
43-
self._ba[i >> 3] &= ~(1 << (i &7))
44-
45-
def add(self, i):
46-
self._ba[i >> 3] |= 1 << (i & 7)

iot/client.mpy

445 Bytes
Binary file not shown.

iot/client.py

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
gc.collect()
1515
import usocket as socket
16-
from ucollections import deque
1716
import uasyncio as asyncio
1817

1918
gc.collect()
@@ -22,8 +21,9 @@
2221
import utime
2322
import machine
2423
import uerrno as errno
25-
from . import gmid, isnew, SetByte # __init__.py
24+
from . import gmid, isnew # __init__.py
2625
from .primitives import launch
26+
from .primitives.queue import Queue, QueueFull
2727
gc.collect()
2828
from micropython import const
2929

@@ -34,6 +34,42 @@
3434
getmid = gmid()
3535
gc.collect()
3636

37+
# Minimal implementation of set for integers in range 0-255
38+
# Asynchronous version has efficient wait_empty and has_not methods
39+
# based on Events rather than polling.
40+
41+
42+
class ASetByte:
43+
def __init__(self):
44+
self._ba = bytearray(32)
45+
self._eve = asyncio.Event()
46+
self._eve.set() # Empty event initially set
47+
self._evdis = asyncio.Event() # Discard event
48+
49+
def __bool__(self):
50+
return any(self._ba)
51+
52+
def __contains__(self, i):
53+
return (self._ba[i >> 3] & 1 << (i & 7)) > 0
54+
55+
def add(self, i):
56+
self._eve.clear()
57+
self._ba[i >> 3] |= 1 << (i & 7)
58+
59+
def discard(self, i):
60+
self._ba[i >> 3] &= ~(1 << (i & 7))
61+
self._evdis.set()
62+
if not any(self._ba):
63+
self._eve.set()
64+
65+
async def wait_empty(self): # Pause until empty
66+
await self._eve.wait()
67+
68+
async def has_not(self, i): # Pause until i not in set
69+
while i in self:
70+
await self._evdis.wait() # Pause until something is discarded
71+
self._evdis.clear()
72+
3773

3874
class Client:
3975
def __init__(self, my_id, server, port=8123,
@@ -46,7 +82,6 @@ def __init__(self, my_id, server, port=8123,
4682
self._pw = pw
4783
self._port = port
4884
self._to = timeout # Client and server timeout
49-
self._tim_short = timeout // 10
5085
self._tim_ka = timeout // 4 # Keepalive interval
5186
self._concb = conn_cb
5287
self._concbargs = () if conn_cb_args is None else conn_cb_args
@@ -99,38 +134,35 @@ def inner(feed=WDT_CB):
99134
gc.collect()
100135
if platform == 'esp8266':
101136
import esp
102-
esp.sleep_type(esp.SLEEP_NONE) # Improve connection integrity at cost of power consumption.
137+
# Improve connection integrity at cost of power consumption.
138+
esp.sleep_type(esp.SLEEP_NONE)
103139

104-
self._evfail = asyncio.Event()
140+
self._evfail = asyncio.Event() # Set by any comms failure
141+
self._evok = asyncio.Event() # Set by 1st successful read
105142
self._s_lock = asyncio.Lock() # For internal send conflict.
106143
self._last_wr = utime.ticks_ms()
107-
self._lineq = deque((), 20, True) # 20 entries, throw on overflow
144+
self._lineq = Queue(20) # 20 entries
108145
self.connects = 0 # Connect count for test purposes/app access
109146
self._sock = None
110-
self._ok = False # Set after 1st successful read
111-
self._acks_pend = SetByte() # ACKs which are expected to be received
147+
self._acks_pend = ASetByte() # ACKs which are expected to be received
112148
gc.collect()
113149
asyncio.create_task(self._run())
114150

115151
# **** API ****
116152
def __iter__(self): # Await a connection
117-
while not self():
118-
yield from asyncio.sleep_ms(self._tim_short) # V3 note: this syntax works.
153+
yield from self._evok.wait() # V3 note: this syntax works.
119154

120155
def status(self):
121-
return self._ok
156+
return self._evok.is_set()
122157

123158
__call__ = status
124159

125160
async def readline(self):
126-
while not self._lineq:
127-
await asyncio.sleep(0)
128-
return self._lineq.popleft()
161+
return await self._lineq.get()
129162

130163
async def write(self, buf, qos=True, wait=True):
131164
if qos and wait: # Disallow concurrent writes
132-
while self._acks_pend:
133-
await asyncio.sleep_ms(50)
165+
await self._acks_pend.wait_empty()
134166
# Prepend message ID to a copy of buf
135167
fstr = '{:02x}{}' if buf.endswith('\n') else '{:02x}{}\n'
136168
mid = next(getmid)
@@ -182,29 +214,28 @@ async def _write(self, line):
182214
while True:
183215
# After an outage wait until something is received from server
184216
# before we send.
185-
await self
217+
await self._evok.wait()
186218
if await self._send(line):
187219
return
188220

189-
# send fail. _send has triggered _evfail. Await response.
190-
while self():
191-
await asyncio.sleep_ms(self._tim_short)
221+
# send fail. _send has triggered _evfail. .run clears _evok.
222+
await asyncio.sleep_ms(0) # Ensure .run is scheduled
223+
assert not self._evok.is_set() # TEST
192224

193225
# Handle qos. Retransmit until matching ACK received.
194226
# ACKs typically take 200-400ms to arrive.
195227
async def _do_qos(self, mid, line):
196228
while True:
197229
# Wait for any outage to clear
198-
await self
230+
await self._evok.wait()
199231
# Wait for the matching ACK.
200-
tstart = utime.ticks_ms()
201-
while utime.ticks_diff(utime.ticks_ms(), tstart) < self._to:
202-
await asyncio.sleep_ms(self._tim_short)
203-
if mid not in self._acks_pend:
204-
return # ACK was received
205-
# ACK was not received. Re-send.
206-
await self._write(line)
207-
self._verbose and print('Repeat', line, 'to server app')
232+
try:
233+
await asyncio.wait_for_ms(self._acks_pend.has_not(mid), self._to)
234+
except asyncio.TimeoutError: # Ack was not received - re-send
235+
await self._write(line)
236+
self._verbose and print('Repeat', line, 'to server app')
237+
else:
238+
return # Got ack
208239

209240
# Make an attempt to connect to WiFi. May not succeed.
210241
async def _connect(self, s):
@@ -246,7 +277,8 @@ async def _run(self):
246277
self._sock = socket.socket()
247278
self._evfail.clear()
248279
try:
249-
serv = socket.getaddrinfo(self._server, self._port)[0][-1] # server read
280+
serv = socket.getaddrinfo(self._server, self._port)[
281+
0][-1] # server read
250282
# If server is down OSError e.args[0] = 111 ECONNREFUSED
251283
self._sock.connect(serv)
252284
except OSError as e:
@@ -266,7 +298,7 @@ async def _run(self):
266298
# apps might need to know connection to the server acquired
267299
launch(self._concb, True, *self._concbargs)
268300
await self._evfail.wait() # Pause until something goes wrong
269-
self._ok = False
301+
self._evok.clear()
270302
tsk_reader.cancel()
271303
tsk_ka.cancel()
272304
await asyncio.sleep_ms(0) # wait for cancellation
@@ -281,7 +313,8 @@ async def _run(self):
281313
self._close() # Close socket but not wdt
282314
s.disconnect()
283315
self._feed(0)
284-
await asyncio.sleep_ms(self._to * 2) # Ensure server detects outage
316+
# Ensure server detects outage
317+
await asyncio.sleep_ms(self._to * 2)
285318
while s.isconnected():
286319
await asyncio.sleep(1)
287320

@@ -308,8 +341,8 @@ async def _reader(self): # Entry point is after a (re) connect.
308341
isnew(-1) # Clear down rx message record
309342
if isnew(mid):
310343
try:
311-
self._lineq.append(line[2:].decode())
312-
except IndexError:
344+
self._lineq.put_nowait(line[2:].decode())
345+
except QueueFull:
313346
self._verbose and print('_reader fail. Overflow.')
314347
self._evfail.set()
315348
return
@@ -321,7 +354,8 @@ async def _sendack(self, mid):
321354

322355
async def _keepalive(self):
323356
while True:
324-
due = self._tim_ka - utime.ticks_diff(utime.ticks_ms(), self._last_wr)
357+
due = self._tim_ka - \
358+
utime.ticks_diff(utime.ticks_ms(), self._last_wr)
325359
if due <= 0:
326360
# error sets ._evfail, .run cancels this coro
327361
await self._send(b'\n')
@@ -337,7 +371,7 @@ async def _readline(self, to):
337371
start = utime.ticks_ms()
338372
while True:
339373
if line.endswith(b'\n'):
340-
self._ok = True # Got at least 1 packet after an outage.
374+
self._evok.set() # Got at least 1 packet after an outage.
341375
if len(line) > 1:
342376
return line
343377
# Got a keepalive: discard, reset timers, toggle LED.

iot/primitives/queue.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# queue.py: adapted from uasyncio V2
2+
3+
# Copyright (c) 2018-2020 Peter Hinch
4+
# Released under the MIT License (MIT) - see LICENSE file
5+
6+
# Code is based on Paul Sokolovsky's work.
7+
# This is a temporary solution until uasyncio V3 gets an efficient official version
8+
9+
import uasyncio as asyncio
10+
11+
12+
# Exception raised by get_nowait().
13+
class QueueEmpty(Exception):
14+
pass
15+
16+
17+
# Exception raised by put_nowait().
18+
class QueueFull(Exception):
19+
pass
20+
21+
class Queue:
22+
23+
def __init__(self, maxsize=0):
24+
self.maxsize = maxsize
25+
self._queue = []
26+
self._evput = asyncio.Event() # Triggered by put, tested by get
27+
self._evget = asyncio.Event() # Triggered by get, tested by put
28+
29+
def _get(self):
30+
self._evget.set()
31+
return self._queue.pop(0)
32+
33+
async def get(self): # Usage: item = await queue.get()
34+
if self.empty():
35+
# Queue is empty, put the calling Task on the waiting queue
36+
self._evput.clear()
37+
await self._evput.wait()
38+
return self._get()
39+
40+
def get_nowait(self): # Remove and return an item from the queue.
41+
# Return an item if one is immediately available, else raise QueueEmpty.
42+
if self.empty():
43+
raise QueueEmpty()
44+
return self._get()
45+
46+
def _put(self, val):
47+
self._evput.set()
48+
self._queue.append(val)
49+
50+
async def put(self, val): # Usage: await queue.put(item)
51+
if self.full():
52+
# Queue full
53+
self._evget.clear()
54+
await self._evget.wait()
55+
# Task(s) waiting to get from queue, schedule first Task
56+
self._put(val)
57+
58+
def put_nowait(self, val): # Put an item into the queue without blocking.
59+
if self.full():
60+
raise QueueFull()
61+
self._put(val)
62+
63+
def qsize(self): # Number of items in the queue.
64+
return len(self._queue)
65+
66+
def empty(self): # Return True if the queue is empty, False otherwise.
67+
return len(self._queue) == 0
68+
69+
def full(self): # Return True if there are maxsize items in the queue.
70+
# Note: if the Queue was initialized with maxsize=0 (the default) or
71+
# any negative number, then full() is never True.
72+
return self.maxsize > 0 and self.qsize() >= self.maxsize

0 commit comments

Comments
 (0)