Skip to content

extmod/uasyncio: Clear Stream buffer before yield in drain(). #8817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed

extmod/uasyncio: Clear Stream buffer before yield in drain(). #8817

wants to merge 2 commits into from

Conversation

AmirHmZz
Copy link
Contributor

Assume a program awaits for StreamWriter.drain() and also calls StreamWriter.write() from another coroutine in the meantime. If the buffer doesn't get cleared before yielding to event loop, whatever program writes in StreamWriter might gets cleared and lost. This PR fixes this bug without using synchronization primitives such as uasyncio.Lock() by clearing buffer of StreamWriter before yielding to event loop.

@codecov-commenter
Copy link

Codecov Report

Merging #8817 (ee58418) into master (e22b7fb) will not change coverage.
The diff coverage is n/a.

@@           Coverage Diff           @@
##           master    #8817   +/-   ##
=======================================
  Coverage   98.31%   98.31%           
=======================================
  Files         157      157           
  Lines       20342    20342           
=======================================
  Hits        19999    19999           
  Misses        343      343           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e22b7fb...ee58418. Read the comment docs.

@peterhinch
Copy link
Contributor

I don't think this addresses the case where multiple coroutines await .drain. As far as I can see there are two options:

  1. Use a Lock at application level. I have now tested this. There is a problem and a Lock fixes it.
  2. Modify the Stream class so that the current pointer into the output buffer is a bound variable rather than being local to the .drain method. In this way multiple instances of .drain share the same currency.

@peterhinch
Copy link
Contributor

This has actually previously been discussed here #6621.
tl;dr
Concurrently running .drain is potentially hazardous regardless of implementation. Use of a Lock or Queue is mandatory to avoid possible data corruption.

@AmirHmZz
Copy link
Contributor Author

@peterhinch Yes you are right about that. I've changed the code to use a uasyncio.Lock in StreamWriter.drain() but I think still we got another problem. Due to c21452a StreamWriter.write() now can write directly into the underlying stream. Write is not a coroutine so we cannot even use that Lock() there. So Assume a coroutine is draining the stream and another coroutines calls write() and writes parts of its desired data to stream's buffer. The only way to handle this with a lock is to check if not Lock.locked() in write in order to write directly in stream.

@peterhinch
Copy link
Contributor

The code looks OK to me. If a coroutine is draining the stream, .out_buf will not be empty so .write will not write to the hardware.

A way to handle the case of multiple coros writing to the same stream is to have each one call .write() as required. This causes messages effectively to be queued. Then a single coro does something like

async def write_out(stream):
    while True:
        await stream.drain()
        await asyncio.sleep(0)

There is a more efficient and simpler solution.

The solution I advocated in #6621 has the merit of sidestepping any possible concurrency issues and is absurdly simple. Also my implementation of Queue uses Event: when the queue is empty the coroutine is shelved by the scheduler. No processing is done by that task until something is put on the queue.

@AmirHmZz
Copy link
Contributor Author

AmirHmZz commented Jun 25, 2022

@peterhinch I really feel uncomfortable about my PR right now as it is just adding unnecessary complexity to StreamWriter. I really prefer to implement this using an asynchronous queue which is really simple, But I believe the current situation of Micropython against this problem is not desirable. I think there are two options to solve this forever :

  1. Solve this problem in Micropython level : Which can be done by adding a async queue instead of out_buf and changing drain() to write data of queue on stream by itself without need to be called by developer. But AFAIK @dpgeorge's feedback on this will be :

It's a nice solution but the problem is that it's not compatible with CPython.

  1. Mention this in documentation to warn developers about calling drain() from multiple coroutines and leave Micropython to be as it is.

Anyway, I think I will wait until @dpgeorge comes and shares his opinion and decision about this with us .

@dpgeorge dpgeorge added the extmod Relates to extmod/ directory in source label Jun 26, 2022
@dpgeorge
Copy link
Member

I'm not entirely sure this needs to be fixed, it would be good to see some real-world code where this problem occurs.

Aside from that, this PR will need an associated test, which show the bug and fails if the fix is not applied.

@AmirHmZz
Copy link
Contributor Author

@dpgeorge I agree with you. In many cases developers call drain() from one coroutine and this Lock just adds additional overhead for that case. In my opinion it would be better to add a hint in drain() documentation to warn developers about using it. What do you think about it?

@dpgeorge
Copy link
Member

Before we do anything, are you able to provide a minimal test case that shows the bug?

@AmirHmZz
Copy link
Contributor Author

@dpgeorge @peterhinch I've got wired results while trying to reproduce the bug. I've reproduced the bug with Micropython before this commit but after this commit the bug is gone.

@dpgeorge
Copy link
Member

Can you post the test that reproduces the bug before that commit?

@AmirHmZz
Copy link
Contributor Author

AmirHmZz commented Jun 27, 2022

Here is the code that reproduces the bug before that commit :

from struct import pack, unpack

from uasyncio import create_task, get_event_loop, start_server, open_connection


async def send_packet(sw, i):
    sw.write(pack(">I", i))
    await sw.drain()


async def handle_con(sr, sw):
    print(await sr.readexactly(1))
    while True:
        print("RECV", unpack(">I", await sr.readexactly(4))[0])


async def main(multiple_coroutines):
    await start_server(handle_con, "127.0.0.1", "2025")
    sr, sw = await open_connection("127.0.0.1", "2025")
    sw.out_buf = b"."  # This will make #c21452a ineffective
    for i in range(3):
        coro = send_packet(sw, i)
        if multiple_coroutines:
            create_task(coro)
        else:
            await coro


loop = get_event_loop()
loop.run_until_complete(main(True))
loop.run_forever()

If you call main() with multiple_coroutines=False the result will be :

b'.'
RECV 0
RECV 1
RECV 2

But with multiple_coroutines=True the result will be :

b'.'
RECV 0
RECV 771751936
RECV 0
RECV 19791872
RECV 0
RECV 65536

I've tried to find out what exactly is going on but I have no idea except that this line causes some coroutines ( which we created them as tasks ) end just there so they will not continue farther than this line. As the result they will not send repetitive data and only the last coroutine writes data into the stream.

yield core._io_queue.queue_write(self.s)

What happens here is exactly the same bug in #6621 :

but instead will likely write the first packet N times, the second packet N-1 times, ... the last packet one time (not in that order).

@dpgeorge
Copy link
Member

Can you do it without explicitly writing to the out_buf variable in sw.out_buf = b"."?

@AmirHmZz
Copy link
Contributor Author

AmirHmZz commented Jun 27, 2022

There's no need to sw.out_buf = b"." if you run the snippet on micropython1.18, so if you are testing on a build before that commit you can omit that line. But generally that line does not do anything in fact. You can consider instead of sw.out_buf = b"." I am calling StreamWriter.write() which was like this before :

def write(self, buf):
self.out_buf += buf

@AmirHmZz
Copy link
Contributor Author

AmirHmZz commented Jun 27, 2022

Another strange thing that I've faced is that this one doesn't produce any problems :

from struct import pack, unpack

from uasyncio import create_task, get_event_loop, start_server, open_connection


async def send_packet(sw, i):
    sw.write(pack(">I", i))
    await sw.drain()


async def handle_con(sr, sw):
    i = 0
    print(await sr.readexactly(1))
    while True:
        print("RECV", unpack(">I", await sr.readexactly(4))[0])
        i += 1


async def main():
    await start_server(handle_con, "127.0.0.1", "2025")
    sr, sw = await open_connection("127.0.0.1", "2025")
    sw.out_buf = b"."  # This makes #c21452a ineffective
    for i in range(3):
        await send_packet(sw, i)
    for i in range(3):
        create_task(send_packet(sw, i))


loop = get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

The result of snippet will be :

b'.'
RECV 0
RECV 1
RECV 2
RECV 0
RECV 1
RECV 2

It seems the bug appears If you've not called self.s.write() once before calling drain() from multiple coroutines. Or maybe I'm wrong about this and something else causes this behavior.

@peterhinch
Copy link
Contributor

I'm puzzled by this discussion. Looking at the code, it seems evident that .drain doesn't support concurrent calling because each instance maintains its own independent buffer currency. My view is that it's best documented rather than fixed as the workround is both simple and efficient.

I can repro this consistently on a Pyboard 1.1 with pins X1 and X2 linked. Commented out code further demonstrates the cause as a Lock prevents the fault from occurring.

import uasyncio as asyncio
from machine import UART
uart = UART(4, 1200)

#lock = asyncio.Lock()

async def sender(n):
    swriter = asyncio.StreamWriter(uart, {})
    while True:
        swriter.write('Hello uart\n')
        #async with lock:
        await swriter.drain()
        await asyncio.sleep(0)
        print('write', n)

async def receiver():
    sreader = asyncio.StreamReader(uart)
    while True:
        res = await sreader.readline()
        await asyncio.sleep(0)
        print('Received', res)

async def main():
    asyncio.create_task(sender(1))
    asyncio.create_task(sender(2))
    asyncio.create_task(receiver())
    while True:
        await asyncio.sleep(1)

asyncio.run(main())

@AmirHmZz
Copy link
Contributor Author

AmirHmZz commented Jun 27, 2022

Thank you @peterhinch. Creating multiple StreamWriters on the same stream can cause problems with no doubt. IMHO it is better that we focus on case that we are sharing same StreamWriter between multiple coroutines because If we fix this then we can simply mention that developers should just create one StreamWriter per stream. Even if creating multiple StreamWriters on the same stream was not making trouble, It was still discouraged because we care about controlling data flow which darin() does for us. Having multiple StreamWriter creates two separate worlds and coro_A doesn't care if coro_B has filled out_buf.

@peterhinch
Copy link
Contributor

I see exactly the same effect with two coros sharing the same StreamWriter:

import uasyncio as asyncio
from machine import UART
uart = UART(4, 1200)

#lock = asyncio.Lock()

async def sender(n, swriter):
    while True:
        swriter.write('Hello uart\n')
        #async with lock:
        await swriter.drain()
        await asyncio.sleep(0)
        print('write', n)

async def receiver():
    sreader = asyncio.StreamReader(uart)
    while True:
        res = await sreader.readline()
        await asyncio.sleep(0)
        print('Received', res)

async def main():
    swriter = asyncio.StreamWriter(uart, {})
    asyncio.create_task(sender(1, swriter))
    asyncio.create_task(sender(2, swriter))
    asyncio.create_task(receiver())
    while True:
        await asyncio.sleep(1)

asyncio.run(main())

@AmirHmZz
Copy link
Contributor Author

AmirHmZz commented Jun 27, 2022

@peterhinch Thanks for providing that snippet. I've tested it and I confirm that messages are not receiving correctly on esp32-wroom32 running on micropython1.19.1 without Lock. After adding Lock the bug is gone.

@AmirHmZz AmirHmZz closed this by deleting the head repository Jun 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
extmod Relates to extmod/ directory in source
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants