-
-
Notifications
You must be signed in to change notification settings - Fork 8.2k
extmod/modopenamp: Implement streaming protocol for endpoints. #14181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
iabdalkader
wants to merge
1
commit into
micropython:master
from
iabdalkader:openamp_endpoint_streaming
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,8 @@ | |
#include "py/nlr.h" | ||
#include "py/runtime.h" | ||
#include "py/mpprint.h" | ||
#include "py/stream.h" | ||
#include "py/mperrno.h" | ||
|
||
#include "metal/sys.h" | ||
#include "metal/alloc.h" | ||
|
@@ -44,6 +46,7 @@ | |
#include "openamp/open_amp.h" | ||
#include "openamp/remoteproc.h" | ||
#include "openamp/remoteproc_loader.h" | ||
#include "lib/rpmsg/rpmsg_internal.h" | ||
#include "modopenamp.h" | ||
|
||
#if !MICROPY_ENABLE_FINALISER | ||
|
@@ -126,19 +129,52 @@ static MP_DEFINE_CONST_OBJ_TYPE( | |
); | ||
|
||
// ###################### RPMsg Endpoint class ###################### | ||
// The number of RPMsg buffers to hold for an Endpoint. Note if this number matches | ||
// the number of buffers per VRING, a full ring buffer/queue will cause the other | ||
// side to block, because all buffers will be held by the application. If it's less | ||
// than the number of buffers in a VRING, messages will be dropped if this side is not | ||
// receiving messages. | ||
#define ENDPOINT_RPMSG_RING_SIZE VRING_NUM_BUFFS | ||
typedef struct _endpoint_obj_t { | ||
mp_obj_base_t base; | ||
mp_obj_t name; | ||
mp_obj_t callback; | ||
struct rpmsg_endpoint ep; | ||
volatile uint32_t head; | ||
volatile uint32_t tail; | ||
void *rpmsg_buf[ENDPOINT_RPMSG_RING_SIZE]; | ||
} endpoint_obj_t; | ||
|
||
static const mp_obj_type_t endpoint_type; | ||
|
||
static int endpoint_rpmsg_enqueue(endpoint_obj_t *ept, void *buf) { | ||
if (((ept->tail + 1) % ENDPOINT_RPMSG_RING_SIZE) != ept->head) { | ||
ept->rpmsg_buf[ept->tail] = buf; | ||
ept->tail = (ept->tail + 1) % ENDPOINT_RPMSG_RING_SIZE; | ||
rpmsg_hold_rx_buffer(&ept->ep, buf); | ||
return 0; | ||
} | ||
return -1; | ||
} | ||
|
||
static size_t endpoint_rpmsg_dequeue(endpoint_obj_t *ept, void *buf, size_t len) { | ||
size_t size = 0; | ||
if (ept->head != ept->tail) { | ||
void *rpmsg = ept->rpmsg_buf[ept->head]; | ||
size = MIN(RPMSG_LOCATE_HDR(rpmsg)->len, len); | ||
memcpy(buf, rpmsg, size); | ||
ept->head = (ept->head + 1) % ENDPOINT_RPMSG_RING_SIZE; | ||
rpmsg_release_rx_buffer(&ept->ep, rpmsg); | ||
} | ||
return size; | ||
} | ||
|
||
static int endpoint_recv_callback(struct rpmsg_endpoint *ept, void *data, size_t len, uint32_t src, void *priv) { | ||
metal_log(METAL_LOG_DEBUG, "endpoint_recv_callback() message received src: %lu msg len: %d\n", src, len); | ||
endpoint_obj_t *self = metal_container_of(ept, endpoint_obj_t, ep); | ||
if (self->callback != mp_const_none) { | ||
if (self->callback == mp_const_none) { | ||
endpoint_rpmsg_enqueue(self, data); | ||
} else { | ||
mp_call_function_2(self->callback, mp_obj_new_int(src), mp_obj_new_bytearray_by_ref(len, data)); | ||
} | ||
return 0; | ||
|
@@ -193,6 +229,40 @@ static mp_obj_t endpoint_send(uint n_args, const mp_obj_t *pos_args, mp_map_t *k | |
} | ||
static MP_DEFINE_CONST_FUN_OBJ_KW(endpoint_send_obj, 2, endpoint_send); | ||
|
||
static mp_obj_t endpoint_recv(uint n_args, const mp_obj_t *pos_args, mp_map_t *kw_args) { | ||
enum { ARG_timeout }; | ||
static const mp_arg_t allowed_args[] = { | ||
{ MP_QSTR_timeout, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = 0 } }, | ||
}; | ||
|
||
// Parse args. | ||
mp_arg_val_t args[MP_ARRAY_SIZE(allowed_args)]; | ||
mp_arg_parse_all(n_args - 2, pos_args + 2, kw_args, MP_ARRAY_SIZE(allowed_args), allowed_args, args); | ||
|
||
endpoint_obj_t *self = MP_OBJ_TO_PTR(pos_args[0]); | ||
vstr_t vstr; | ||
size_t len = mp_obj_get_int(pos_args[1]); | ||
vstr_init_len(&vstr, len); | ||
|
||
mp_int_t timeout = args[ARG_timeout].u_int; | ||
for (mp_uint_t start = mp_hal_ticks_ms(); ;) { | ||
vstr.len = endpoint_rpmsg_dequeue(self, vstr.buf, len); | ||
if (vstr.len > 0) { | ||
break; | ||
} | ||
if (timeout == 0) { | ||
break; | ||
} | ||
if (timeout > 0 && (mp_hal_ticks_ms() - start > timeout)) { | ||
mp_raise_msg(&mp_type_OSError, MP_ERROR_TEXT("timeout waiting for message")); | ||
} | ||
MICROPY_EVENT_POLL_HOOK | ||
} | ||
|
||
return mp_obj_new_bytes_from_vstr(&vstr); | ||
} | ||
static MP_DEFINE_CONST_FUN_OBJ_KW(endpoint_recv_obj, 2, endpoint_recv); | ||
|
||
static mp_obj_t endpoint_is_ready(mp_obj_t self_in) { | ||
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in); | ||
return is_rpmsg_ept_ready(&self->ep) ? mp_const_true : mp_const_false; | ||
|
@@ -209,10 +279,10 @@ static MP_DEFINE_CONST_FUN_OBJ_1(endpoint_deinit_obj, endpoint_deinit); | |
static mp_obj_t endpoint_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *all_args) { | ||
enum { ARG_name, ARG_callback, ARG_src, ARG_dest }; | ||
static const mp_arg_t allowed_args[] = { | ||
{ MP_QSTR_name, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } }, | ||
{ MP_QSTR_callback, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } }, | ||
{ MP_QSTR_src, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } }, | ||
{ MP_QSTR_dest, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } }, | ||
{ MP_QSTR_name, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } }, | ||
{ MP_QSTR_callback, MP_ARG_OBJ, {.u_rom_obj = MP_ROM_NONE } }, | ||
{ MP_QSTR_src, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } }, | ||
{ MP_QSTR_dest, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } }, | ||
}; | ||
|
||
// Parse args. | ||
|
@@ -222,6 +292,7 @@ static mp_obj_t endpoint_make_new(const mp_obj_type_t *type, size_t n_args, size | |
endpoint_obj_t *self = mp_obj_malloc_with_finaliser(endpoint_obj_t, &endpoint_type); | ||
self->name = args[ARG_name].u_obj; | ||
self->callback = args[ARG_callback].u_obj; | ||
self->head = self->tail = 0; | ||
|
||
if (MP_STATE_PORT(virtio_device) == NULL) { | ||
openamp_init(); | ||
|
@@ -239,15 +310,66 @@ static const mp_rom_map_elem_t endpoint_locals_dict_table[] = { | |
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_Endpoint) }, | ||
{ MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&endpoint_deinit_obj) }, | ||
{ MP_ROM_QSTR(MP_QSTR_send), MP_ROM_PTR(&endpoint_send_obj) }, | ||
{ MP_ROM_QSTR(MP_QSTR_recv), MP_ROM_PTR(&endpoint_recv_obj) }, | ||
{ MP_ROM_QSTR(MP_QSTR_is_ready), MP_ROM_PTR(&endpoint_is_ready_obj) }, | ||
}; | ||
static MP_DEFINE_CONST_DICT(endpoint_locals_dict, endpoint_locals_dict_table); | ||
|
||
mp_uint_t endpoint_read(mp_obj_t self_in, void *buf, mp_uint_t size, int *errcode) { | ||
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in); | ||
if (!is_rpmsg_ept_ready(&self->ep)) { | ||
return MP_STREAM_ERROR; | ||
} | ||
return endpoint_rpmsg_dequeue(self, buf, size); | ||
} | ||
|
||
mp_uint_t endpoint_write(mp_obj_t self_in, const void *buf, mp_uint_t size, int *errcode) { | ||
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in); | ||
int ret = 0; | ||
if (!is_rpmsg_ept_ready(&self->ep)) { | ||
ret = MP_STREAM_ERROR; | ||
} else { | ||
ret = rpmsg_send(&self->ep, buf, size); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this compare to |
||
if (ret < 0) { | ||
ret = MP_STREAM_ERROR; | ||
} | ||
} | ||
return ret; | ||
} | ||
|
||
mp_uint_t endpoint_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_t arg, int *errcode) { | ||
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in); | ||
mp_uint_t ret = 0; | ||
if (request == MP_STREAM_CLOSE) { | ||
rpmsg_destroy_ept(&self->ep); | ||
} else if (request == MP_STREAM_POLL) { | ||
if ((arg & MP_STREAM_POLL_RD) && self->head != self->tail) { | ||
ret |= MP_STREAM_POLL_RD; | ||
} | ||
|
||
if ((arg & MP_STREAM_POLL_WR) && is_rpmsg_ept_ready(&self->ep)) { | ||
ret |= MP_STREAM_POLL_WR; | ||
} | ||
} else { | ||
*errcode = MP_EINVAL; | ||
ret = MP_STREAM_ERROR; | ||
} | ||
return ret; | ||
} | ||
|
||
static const mp_stream_p_t endpoint_stream_p = { | ||
.read = endpoint_read, | ||
.write = endpoint_write, | ||
.ioctl = endpoint_ioctl, | ||
.is_text = false, | ||
}; | ||
|
||
static MP_DEFINE_CONST_OBJ_TYPE( | ||
endpoint_type, | ||
MP_QSTR_Endpoint, | ||
MP_TYPE_FLAG_NONE, | ||
make_new, endpoint_make_new, | ||
protocol, &endpoint_stream_p, | ||
locals_dict, &endpoint_locals_dict | ||
); | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could possibly just use
mp_stream_read1_obj
. That should give equivalent behaviour, just without a timeout argument (it would be a non-blocking call which IMO is enough).