Skip to content

extmod/modopenamp: Implement streaming protocol for endpoints. #14181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

iabdalkader
Copy link
Contributor

@iabdalkader iabdalkader commented Mar 26, 2024

This patch implements a streaming protocol for Endpoints, to allow them to be used with select/poll or asyncio. It also implements Endpoint.recv() function, whose behavior matches that of UDP sockets' recv(). Calling Endpoint.recv() returns a single message, truncated to fit the requested number of bytes. If two messages are pending on the endpoint, the code must do two separate reads to get them both. Received RPMsg messages are saved by holding the buffers in the endpoint receive callback, and releasing them when they are no longer needed (i.e., when Endpoint.recv is called). This avoids the copying buffers twice, saves memory, but also allows the host to block the remote if it's not receiving messages (assuming the queue size is the same as the number of buffers in the VRING). Note that the queuing of RPMsg messages is only enabled if callback=None (i.e., if the application is not processing messages asynchronously).

Alternative Python implementation:

class EndpointIO(io.IOBase):
    def __init__(self, ept):
        self.ept = ept
        
    def write(self, buf):
        if buf != b"\r\n":
            self.ept.send(buf, timeout=0)
        
    def ioctl(self, op, arg):
        if op == _MP_STREAM_POLL and self.ept.is_ready():
            return _MP_STREAM_POLL_WR
        return 0

@iabdalkader iabdalkader marked this pull request as ready for review March 26, 2024 07:09
@iabdalkader iabdalkader force-pushed the openamp_endpoint_streaming branch from d2afd88 to 286cf50 Compare March 26, 2024 07:22
Copy link

codecov bot commented Mar 26, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 98.39%. Comparing base (87d821a) to head (286cf50).
Report is 5 commits behind head on master.

❗ Current head 286cf50 differs from pull request most recent head 065ef5e. Consider uploading reports for the commit 065ef5e to get more accurate results

Additional details and impacted files
@@           Coverage Diff           @@
##           master   #14181   +/-   ##
=======================================
  Coverage   98.39%   98.39%           
=======================================
  Files         161      161           
  Lines       21200    21200           
=======================================
  Hits        20860    20860           
  Misses        340      340           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

Code size report:

   bare-arm:    +0 +0.000% 
minimal x86:    +0 +0.000% 
   unix x64:    +0 +0.000% standard
      stm32:    +0 +0.000% PYBV10
     mimxrt:    +0 +0.000% TEENSY40
        rp2:    +0 +0.000% RPI_PICO
       samd:    +0 +0.000% ADAFRUIT_ITSYBITSY_M4_EXPRESS

This patch implements a streaming protocol for Endpoints, to allow them to
be used with select/poll or asyncio. It also implements `Endpoint.recv()`
function, whose behavior matches that of UDP sockets' recv(). Calling
`Endpoint.recv()` returns a single message, truncated to fit the requested
number of bytes. If two messages are pending on the endpoint, the code must
do two separate reads to get them both. Received RPMsg messages are saved
by holding the buffers in the endpoint receive callback, and releasing them
when they are no longer needed (i.e., when `Endpoint.recv` is called). This
avoids the copying buffers twice, saves memory, but also allows the host to
block the remote if it's not receiving messages (assuming the queue size is
the same as the number of buffers in the VRING). Note that the queuing of
RPMsg messages is only enabled if `callback=None` (i.e., if the application
is not processing messages asynchronously).

Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
@iabdalkader iabdalkader force-pushed the openamp_endpoint_streaming branch from 286cf50 to 065ef5e Compare March 29, 2024 08:58
@dpgeorge dpgeorge added the extmod Relates to extmod/ directory in source label Apr 19, 2024
Copy link
Member

@dpgeorge dpgeorge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you actively using this patch? Has your implementation of it changed since you made this PR?

It's nice to see endpoints behaving as streams. But it's kind of strange that while it's a stream, it doesn't have read/write methods like other streams do. That means that C code can read/write the endpoint like it's a stream, but Python code cannot.

@@ -239,15 +310,66 @@ static const mp_rom_map_elem_t endpoint_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_Endpoint) },
{ MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&endpoint_deinit_obj) },
{ MP_ROM_QSTR(MP_QSTR_send), MP_ROM_PTR(&endpoint_send_obj) },
{ MP_ROM_QSTR(MP_QSTR_recv), MP_ROM_PTR(&endpoint_recv_obj) },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could possibly just use mp_stream_read1_obj. That should give equivalent behaviour, just without a timeout argument (it would be a non-blocking call which IMO is enough).

if (!is_rpmsg_ept_ready(&self->ep)) {
ret = MP_STREAM_ERROR;
} else {
ret = rpmsg_send(&self->ep, buf, size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this compare to .send() and calling of rpmsg_send_offchannel_raw()? In the case here, what are src and dest, and why don't you need to specify them?

@iabdalkader
Copy link
Contributor Author

iabdalkader commented Oct 10, 2024

Are you actively using this patch? Has your implementation of it changed since you made this PR?

No, and I'm not sure anymore it's useful. I managed to make an Endpoint stream like so:

class EndpointIO(io.IOBase):
    def __init__(self, ept):
        self.ept = ept
        
    def write(self, buf):
        if buf != b"\r\n":
            self.ept.send(buf, timeout=0)
        
    def ioctl(self, op, arg):
        if op == _MP_STREAM_POLL and self.ept.is_ready():
            return _MP_STREAM_POLL_WR
        return 0

So I think this PR's only value is having streaming supported out of the box. If that's something desirable, I can address issues and update it.

@dpgeorge Actually, I think we should make the module extensible and implement streaming support in an extension just like the above code does. It's easier and keeps the C code minimal.

@iabdalkader
Copy link
Contributor Author

Closing this because the Python implementation works fine and keeps the C code minimal. A future improvement would be to make the module extensible and add streaming support in an extension.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
extmod Relates to extmod/ directory in source
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants