Skip to content

extmod/modopenamp: Implement streaming protocol for endpoints. #14181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 127 additions & 5 deletions extmod/modopenamp.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "py/nlr.h"
#include "py/runtime.h"
#include "py/mpprint.h"
#include "py/stream.h"
#include "py/mperrno.h"

#include "metal/sys.h"
#include "metal/alloc.h"
Expand All @@ -44,6 +46,7 @@
#include "openamp/open_amp.h"
#include "openamp/remoteproc.h"
#include "openamp/remoteproc_loader.h"
#include "lib/rpmsg/rpmsg_internal.h"
#include "modopenamp.h"

#if !MICROPY_ENABLE_FINALISER
Expand Down Expand Up @@ -126,19 +129,52 @@ static MP_DEFINE_CONST_OBJ_TYPE(
);

// ###################### RPMsg Endpoint class ######################
// The number of RPMsg buffers to hold for an Endpoint. Note if this number matches
// the number of buffers per VRING, a full ring buffer/queue will cause the other
// side to block, because all buffers will be held by the application. If it's less
// than the number of buffers in a VRING, messages will be dropped if this side is not
// receiving messages.
#define ENDPOINT_RPMSG_RING_SIZE VRING_NUM_BUFFS
typedef struct _endpoint_obj_t {
mp_obj_base_t base;
mp_obj_t name;
mp_obj_t callback;
struct rpmsg_endpoint ep;
volatile uint32_t head;
volatile uint32_t tail;
void *rpmsg_buf[ENDPOINT_RPMSG_RING_SIZE];
} endpoint_obj_t;

static const mp_obj_type_t endpoint_type;

static int endpoint_rpmsg_enqueue(endpoint_obj_t *ept, void *buf) {
if (((ept->tail + 1) % ENDPOINT_RPMSG_RING_SIZE) != ept->head) {
ept->rpmsg_buf[ept->tail] = buf;
ept->tail = (ept->tail + 1) % ENDPOINT_RPMSG_RING_SIZE;
rpmsg_hold_rx_buffer(&ept->ep, buf);
return 0;
}
return -1;
}

static size_t endpoint_rpmsg_dequeue(endpoint_obj_t *ept, void *buf, size_t len) {
size_t size = 0;
if (ept->head != ept->tail) {
void *rpmsg = ept->rpmsg_buf[ept->head];
size = MIN(RPMSG_LOCATE_HDR(rpmsg)->len, len);
memcpy(buf, rpmsg, size);
ept->head = (ept->head + 1) % ENDPOINT_RPMSG_RING_SIZE;
rpmsg_release_rx_buffer(&ept->ep, rpmsg);
}
return size;
}

static int endpoint_recv_callback(struct rpmsg_endpoint *ept, void *data, size_t len, uint32_t src, void *priv) {
metal_log(METAL_LOG_DEBUG, "endpoint_recv_callback() message received src: %lu msg len: %d\n", src, len);
endpoint_obj_t *self = metal_container_of(ept, endpoint_obj_t, ep);
if (self->callback != mp_const_none) {
if (self->callback == mp_const_none) {
endpoint_rpmsg_enqueue(self, data);
} else {
mp_call_function_2(self->callback, mp_obj_new_int(src), mp_obj_new_bytearray_by_ref(len, data));
}
return 0;
Expand Down Expand Up @@ -193,6 +229,40 @@ static mp_obj_t endpoint_send(uint n_args, const mp_obj_t *pos_args, mp_map_t *k
}
static MP_DEFINE_CONST_FUN_OBJ_KW(endpoint_send_obj, 2, endpoint_send);

static mp_obj_t endpoint_recv(uint n_args, const mp_obj_t *pos_args, mp_map_t *kw_args) {
enum { ARG_timeout };
static const mp_arg_t allowed_args[] = {
{ MP_QSTR_timeout, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = 0 } },
};

// Parse args.
mp_arg_val_t args[MP_ARRAY_SIZE(allowed_args)];
mp_arg_parse_all(n_args - 2, pos_args + 2, kw_args, MP_ARRAY_SIZE(allowed_args), allowed_args, args);

endpoint_obj_t *self = MP_OBJ_TO_PTR(pos_args[0]);
vstr_t vstr;
size_t len = mp_obj_get_int(pos_args[1]);
vstr_init_len(&vstr, len);

mp_int_t timeout = args[ARG_timeout].u_int;
for (mp_uint_t start = mp_hal_ticks_ms(); ;) {
vstr.len = endpoint_rpmsg_dequeue(self, vstr.buf, len);
if (vstr.len > 0) {
break;
}
if (timeout == 0) {
break;
}
if (timeout > 0 && (mp_hal_ticks_ms() - start > timeout)) {
mp_raise_msg(&mp_type_OSError, MP_ERROR_TEXT("timeout waiting for message"));
}
MICROPY_EVENT_POLL_HOOK
}

return mp_obj_new_bytes_from_vstr(&vstr);
}
static MP_DEFINE_CONST_FUN_OBJ_KW(endpoint_recv_obj, 2, endpoint_recv);

static mp_obj_t endpoint_is_ready(mp_obj_t self_in) {
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in);
return is_rpmsg_ept_ready(&self->ep) ? mp_const_true : mp_const_false;
Expand All @@ -209,10 +279,10 @@ static MP_DEFINE_CONST_FUN_OBJ_1(endpoint_deinit_obj, endpoint_deinit);
static mp_obj_t endpoint_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *all_args) {
enum { ARG_name, ARG_callback, ARG_src, ARG_dest };
static const mp_arg_t allowed_args[] = {
{ MP_QSTR_name, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } },
{ MP_QSTR_callback, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } },
{ MP_QSTR_src, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } },
{ MP_QSTR_dest, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } },
{ MP_QSTR_name, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } },
{ MP_QSTR_callback, MP_ARG_OBJ, {.u_rom_obj = MP_ROM_NONE } },
{ MP_QSTR_src, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } },
{ MP_QSTR_dest, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } },
};

// Parse args.
Expand All @@ -222,6 +292,7 @@ static mp_obj_t endpoint_make_new(const mp_obj_type_t *type, size_t n_args, size
endpoint_obj_t *self = mp_obj_malloc_with_finaliser(endpoint_obj_t, &endpoint_type);
self->name = args[ARG_name].u_obj;
self->callback = args[ARG_callback].u_obj;
self->head = self->tail = 0;

if (MP_STATE_PORT(virtio_device) == NULL) {
openamp_init();
Expand All @@ -239,15 +310,66 @@ static const mp_rom_map_elem_t endpoint_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_Endpoint) },
{ MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&endpoint_deinit_obj) },
{ MP_ROM_QSTR(MP_QSTR_send), MP_ROM_PTR(&endpoint_send_obj) },
{ MP_ROM_QSTR(MP_QSTR_recv), MP_ROM_PTR(&endpoint_recv_obj) },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could possibly just use mp_stream_read1_obj. That should give equivalent behaviour, just without a timeout argument (it would be a non-blocking call which IMO is enough).

{ MP_ROM_QSTR(MP_QSTR_is_ready), MP_ROM_PTR(&endpoint_is_ready_obj) },
};
static MP_DEFINE_CONST_DICT(endpoint_locals_dict, endpoint_locals_dict_table);

mp_uint_t endpoint_read(mp_obj_t self_in, void *buf, mp_uint_t size, int *errcode) {
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in);
if (!is_rpmsg_ept_ready(&self->ep)) {
return MP_STREAM_ERROR;
}
return endpoint_rpmsg_dequeue(self, buf, size);
}

mp_uint_t endpoint_write(mp_obj_t self_in, const void *buf, mp_uint_t size, int *errcode) {
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in);
int ret = 0;
if (!is_rpmsg_ept_ready(&self->ep)) {
ret = MP_STREAM_ERROR;
} else {
ret = rpmsg_send(&self->ep, buf, size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this compare to .send() and calling of rpmsg_send_offchannel_raw()? In the case here, what are src and dest, and why don't you need to specify them?

if (ret < 0) {
ret = MP_STREAM_ERROR;
}
}
return ret;
}

mp_uint_t endpoint_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_t arg, int *errcode) {
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_uint_t ret = 0;
if (request == MP_STREAM_CLOSE) {
rpmsg_destroy_ept(&self->ep);
} else if (request == MP_STREAM_POLL) {
if ((arg & MP_STREAM_POLL_RD) && self->head != self->tail) {
ret |= MP_STREAM_POLL_RD;
}

if ((arg & MP_STREAM_POLL_WR) && is_rpmsg_ept_ready(&self->ep)) {
ret |= MP_STREAM_POLL_WR;
}
} else {
*errcode = MP_EINVAL;
ret = MP_STREAM_ERROR;
}
return ret;
}

static const mp_stream_p_t endpoint_stream_p = {
.read = endpoint_read,
.write = endpoint_write,
.ioctl = endpoint_ioctl,
.is_text = false,
};

static MP_DEFINE_CONST_OBJ_TYPE(
endpoint_type,
MP_QSTR_Endpoint,
MP_TYPE_FLAG_NONE,
make_new, endpoint_make_new,
protocol, &endpoint_stream_p,
locals_dict, &endpoint_locals_dict
);

Expand Down
Loading