-
-
Notifications
You must be signed in to change notification settings - Fork 8.2k
extmod/uasyncio: Clear Stream buffer before yield in drain(). #8817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #8817 +/- ##
=======================================
Coverage 98.31% 98.31%
=======================================
Files 157 157
Lines 20342 20342
=======================================
Hits 19999 19999
Misses 343 343 Continue to review full report at Codecov.
|
I don't think this addresses the case where multiple coroutines await
|
This has actually previously been discussed here #6621. |
@peterhinch Yes you are right about that. I've changed the code to use a |
The code looks OK to me. If a coroutine is draining the stream, A way to handle the case of multiple coros writing to the same stream is to have each one call async def write_out(stream):
while True:
await stream.drain()
await asyncio.sleep(0) There is a more efficient and simpler solution. The solution I advocated in #6621 has the merit of sidestepping any possible concurrency issues and is absurdly simple. Also my implementation of Queue uses |
@peterhinch I really feel uncomfortable about my PR right now as it is just adding unnecessary complexity to
Anyway, I think I will wait until @dpgeorge comes and shares his opinion and decision about this with us . |
I'm not entirely sure this needs to be fixed, it would be good to see some real-world code where this problem occurs. Aside from that, this PR will need an associated test, which show the bug and fails if the fix is not applied. |
@dpgeorge I agree with you. In many cases developers call |
Before we do anything, are you able to provide a minimal test case that shows the bug? |
@dpgeorge @peterhinch I've got wired results while trying to reproduce the bug. I've reproduced the bug with Micropython before this commit but after this commit the bug is gone. |
Can you post the test that reproduces the bug before that commit? |
Here is the code that reproduces the bug before that commit : from struct import pack, unpack
from uasyncio import create_task, get_event_loop, start_server, open_connection
async def send_packet(sw, i):
sw.write(pack(">I", i))
await sw.drain()
async def handle_con(sr, sw):
print(await sr.readexactly(1))
while True:
print("RECV", unpack(">I", await sr.readexactly(4))[0])
async def main(multiple_coroutines):
await start_server(handle_con, "127.0.0.1", "2025")
sr, sw = await open_connection("127.0.0.1", "2025")
sw.out_buf = b"." # This will make #c21452a ineffective
for i in range(3):
coro = send_packet(sw, i)
if multiple_coroutines:
create_task(coro)
else:
await coro
loop = get_event_loop()
loop.run_until_complete(main(True))
loop.run_forever() If you call
But with
I've tried to find out what exactly is going on but I have no idea except that this line causes some coroutines ( which we created them as tasks ) end just there so they will not continue farther than this line. As the result they will not send repetitive data and only the last coroutine writes data into the stream. micropython/extmod/uasyncio/stream.py Line 83 in e22b7fb
What happens here is exactly the same bug in #6621 :
|
Can you do it without explicitly writing to the |
There's no need to micropython/extmod/uasyncio/stream.py Lines 58 to 59 in ba21f76
|
Another strange thing that I've faced is that this one doesn't produce any problems : from struct import pack, unpack
from uasyncio import create_task, get_event_loop, start_server, open_connection
async def send_packet(sw, i):
sw.write(pack(">I", i))
await sw.drain()
async def handle_con(sr, sw):
i = 0
print(await sr.readexactly(1))
while True:
print("RECV", unpack(">I", await sr.readexactly(4))[0])
i += 1
async def main():
await start_server(handle_con, "127.0.0.1", "2025")
sr, sw = await open_connection("127.0.0.1", "2025")
sw.out_buf = b"." # This makes #c21452a ineffective
for i in range(3):
await send_packet(sw, i)
for i in range(3):
create_task(send_packet(sw, i))
loop = get_event_loop()
loop.run_until_complete(main())
loop.run_forever() The result of snippet will be :
It seems the bug appears If you've not called |
I'm puzzled by this discussion. Looking at the code, it seems evident that I can repro this consistently on a Pyboard 1.1 with pins X1 and X2 linked. Commented out code further demonstrates the cause as a import uasyncio as asyncio
from machine import UART
uart = UART(4, 1200)
#lock = asyncio.Lock()
async def sender(n):
swriter = asyncio.StreamWriter(uart, {})
while True:
swriter.write('Hello uart\n')
#async with lock:
await swriter.drain()
await asyncio.sleep(0)
print('write', n)
async def receiver():
sreader = asyncio.StreamReader(uart)
while True:
res = await sreader.readline()
await asyncio.sleep(0)
print('Received', res)
async def main():
asyncio.create_task(sender(1))
asyncio.create_task(sender(2))
asyncio.create_task(receiver())
while True:
await asyncio.sleep(1)
asyncio.run(main()) |
Thank you @peterhinch. Creating multiple |
I see exactly the same effect with two coros sharing the same import uasyncio as asyncio
from machine import UART
uart = UART(4, 1200)
#lock = asyncio.Lock()
async def sender(n, swriter):
while True:
swriter.write('Hello uart\n')
#async with lock:
await swriter.drain()
await asyncio.sleep(0)
print('write', n)
async def receiver():
sreader = asyncio.StreamReader(uart)
while True:
res = await sreader.readline()
await asyncio.sleep(0)
print('Received', res)
async def main():
swriter = asyncio.StreamWriter(uart, {})
asyncio.create_task(sender(1, swriter))
asyncio.create_task(sender(2, swriter))
asyncio.create_task(receiver())
while True:
await asyncio.sleep(1)
asyncio.run(main()) |
@peterhinch Thanks for providing that snippet. I've tested it and I confirm that messages are not receiving correctly on |
Assume a program awaits for
StreamWriter.drain()
and also callsStreamWriter.write()
from another coroutine in the meantime. If the buffer doesn't get cleared before yielding to event loop, whatever program writes inStreamWriter
might gets cleared and lost. This PR fixes this bug without using synchronization primitives such asuasyncio.Lock()
by clearing buffer ofStreamWriter
before yielding to event loop.