diff --git a/lib/msgpackrpc/README.md b/lib/msgpackrpc/README.md new file mode 100644 index 0000000..f2a45a7 --- /dev/null +++ b/lib/msgpackrpc/README.md @@ -0,0 +1,2 @@ +MessagePack RPC protocol implementation for MicroPython. +See https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md diff --git a/lib/msgpackrpc/example.py b/lib/msgpackrpc/example.py new file mode 100644 index 0000000..1b32e3a --- /dev/null +++ b/lib/msgpackrpc/example.py @@ -0,0 +1,60 @@ +# This file is part of the msgpack-rpc module. +# Any copyright is dedicated to the Public Domain. +# https://creativecommons.org/publicdomain/zero/1.0/ + +import time +import logging +import msgpackrpc +import gc + + +class Adder: + def __init__(self): + pass + + def add(self, a, b): + logging.info(f"add({a}, {b}) is called") + return a + b + + +def sub(a, b): + logging.info(f"sub({a}, {b}) is called") + return a - b + + +if __name__ == "__main__": + # Configure the logger. + # All message equal to or higher than the logger level are printed. + logging.basicConfig( + datefmt="%H:%M:%S", + format="%(asctime)s.%(msecs)03d %(message)s", + level=logging.INFO, + ) + + # Create an RPC object + rpc = msgpackrpc.MsgPackRPC() + + # Register objects or functions to be called by the remote processor. + rpc.bind(Adder()) + rpc.bind(sub) + + # Start the remote processor and wait for it to be ready to communicate. + rpc.start(firmware=0x08180000, timeout=1000, num_channels=2) + + while True: + alloc = gc.mem_alloc() + # Async calls + f1 = rpc.call_async("add", 0, 1) + f2 = rpc.call_async("add", 0, 2) + f3 = rpc.call_async("add", 0, 3) + + # Can join async call in any order + logging.info("async add(0, 3) => %d", f3.join()) + + # Sync call + res = rpc.call("add", 2, 2) + logging.info("sync add(2, 2) => %d" % res) + logging.info("async add(0, 1) => %d" % f1.join()) + logging.info("async add(0, 2) => %d" % f2.join()) + logging.info("memory per iteration %d" % (gc.mem_alloc() - alloc)) + time.sleep_ms(250) diff --git a/lib/msgpackrpc/manifest.py b/lib/msgpackrpc/manifest.py new file mode 100644 index 0000000..5c62cfc --- /dev/null +++ b/lib/msgpackrpc/manifest.py @@ -0,0 +1,7 @@ +metadata( + description="MessagePack RPC protocol implementation for MicroPython.", + version="0.0.1", +) + +module("msgpackrpc.py") +require("msgpack", library="arduino-lib") diff --git a/lib/msgpackrpc/md5sum.py b/lib/msgpackrpc/md5sum.py new file mode 100644 index 0000000..af0409b --- /dev/null +++ b/lib/msgpackrpc/md5sum.py @@ -0,0 +1,63 @@ +# This file is part of the msgpack-rpc module. +# Any copyright is dedicated to the Public Domain. +# https://creativecommons.org/publicdomain/zero/1.0/ + +import logging +import msgpackrpc +import hashlib +import random +import binascii +from time import ticks_ms +from time import ticks_diff + + +hash_in = 0 +hash_out = 0 +DATA_BUF_SIZE = 256 + + +def md5hash(buf): + global hash_out + hash_out += 1 + return hashlib.md5(buf).digest() + + +def randbytes(size): + return bytes(random.getrandbits(8) for x in range(size)) + + +if __name__ == "__main__": + # Configure the logger. + # All message equal to or higher than the logger level are printed. + logging.basicConfig( + datefmt="%H:%M:%S", + format="%(asctime)s.%(msecs)03d %(message)s", + level=logging.INFO, + ) + + # Create an RPC object + rpc = msgpackrpc.MsgPackRPC() + # Register objects or functions to be called by the remote processor. + rpc.bind(md5hash) + # Start the remote processor and wait for it to be ready to communicate. + rpc.start(firmware=0x08180000, timeout=3000, num_channels=2) + + # Set data buffer size + rpc.call("set_buffer_size", DATA_BUF_SIZE) + + data = randbytes(DATA_BUF_SIZE) + ticks_start = ticks_ms() + while True: + f = rpc.call_async("md5hash", data) + # While waiting for the remote processor we can generate the checksum + # of the data that was just sent, and generate new random data block. + digest = hashlib.md5(data).digest() + data = randbytes(DATA_BUF_SIZE) + + # Read back the checksum of the first block + digest_ret = f.join() + if digest != digest_ret: + raise Exception(f"MD5 checksum doesn't match {binascii.hexlify(digest)} {binascii.hexlify(digest_ret)}") + khs = (hash_in + hash_out) / ticks_diff(ticks_ms(), ticks_start) + logging.info(f"hashes generated: {hash_out} hashes received: {hash_in} size: {DATA_BUF_SIZE} bytes KH/S: {khs}") + hash_in += 1 diff --git a/lib/msgpackrpc/msgpackrpc.py b/lib/msgpackrpc/msgpackrpc.py new file mode 100644 index 0000000..279de88 --- /dev/null +++ b/lib/msgpackrpc/msgpackrpc.py @@ -0,0 +1,201 @@ +# This file is part of the msgpack-rpc module. +# Copyright (c) 2023 Arduino SA +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# MessagePack RPC protocol implementation for MicroPython. +# https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md + +import logging +import openamp +import msgpack +from micropython import const +from io import BytesIO +from time import sleep_ms, ticks_ms, ticks_diff + +_MSG_TYPE_REQUEST = const(0) +_MSG_TYPE_RESPONSE = const(1) +_MSG_TYPE_NOTIFY = const(2) + + +def log_level_enabled(level): + return logging.getLogger().isEnabledFor(level) + + +class Future: + def __init__(self, msgid, msgbuf, fname, fargs): + self.msgid = msgid + self.msgbuf = msgbuf + self.fname = fname + self.fargs = fargs + + def join(self, timeout=0): + if log_level_enabled(logging.DEBUG): + logging.debug(f"join {self.fname}()") + + if timeout > 0: + t = ticks_ms() + + while self.msgid not in self.msgbuf: + if timeout > 0 and ticks_diff(ticks_ms(), t) > timeout: + raise OSError(f"Timeout joining function {self.fname}") + sleep_ms(100) + + obj = self.msgbuf.pop(self.msgid) + if obj[2] is not None: + raise (OSError(obj[2])) + + if log_level_enabled(logging.DEBUG): + logging.debug(f"call {self.fname}({self.fargs}) => {obj}") + return obj[3] + + +class MsgPackIO: + def __init__(self): + self.stream = BytesIO() + + def feed(self, data): + offset = self.stream.tell() + self.stream.write(data) + self.stream.seek(offset) + + def readable(self): + if self.stream.read(1): + offset = self.stream.tell() + self.stream.seek(offset - 1) + return True + return False + + def truncate(self): + if self.readable(): + offset = self.stream.tell() + self.stream = BytesIO(self.stream.getvalue()[offset:]) + + def __iter__(self): + return self + + def __next__(self): + offset = self.stream.tell() + try: + obj = msgpack.unpack(self.stream) + self.truncate() + return obj + except Exception: + self.stream.seek(offset) + raise StopIteration + + +class MsgPackRPC: + def __init__(self, streaming=False): + """ + Create a MsgPack RPC object. + streaming: If True, messages can span multiple buffers, otherwise a buffer contains + exactly one full message. Note streaming mode is slower, so it should be disabled + if it's not needed. + """ + self.epts = {} + self.msgid = 0 + self.msgbuf = {} + self.msgio = MsgPackIO() if streaming else None + self.servers = [] + + def _bind_callback(self, src, name): + if log_level_enabled(logging.INFO): + logging.info(f'New service announcement src: {src} name: "{name}"') + self.epts[name] = openamp.Endpoint(name, self._recv_callback, dest=src) + self.epts[name].send(b"\x00") + + def _recv_callback(self, src, data): + if log_level_enabled(logging.DEBUG): + logging.debug(f"Received message on endpoint: {src} data: {bytes(data)}") + + if self.msgio is None: + obj = msgpack.unpackb(data) + self._process_unpacked_obj(obj) + else: + self.msgio.feed(data) + for obj in self.msgio: + self._process_unpacked_obj(obj) + + def _process_unpacked_obj(self, obj): + if obj[0] == _MSG_TYPE_RESPONSE: + self.msgbuf[obj[1]] = obj + elif obj[0] == _MSG_TYPE_REQUEST: + self._dispatch(obj[1], obj[2], obj[-1]) + if log_level_enabled(logging.DEBUG): + logging.debug(f"Unpacked {type(obj)} val: {obj}") + + def _send_msg(self, msgid, msgtype, fname, fargs, **kwargs): + timeout = kwargs.pop("timeout", 1000) + endpoint = kwargs.pop("endpoint", "rpc") + self.epts[endpoint].send(msgpack.packb([msgtype, msgid, fname, fargs]), timeout=timeout) + if msgtype == _MSG_TYPE_REQUEST: + self.msgid += 1 + return Future(msgid, self.msgbuf, fname, fargs) + + def _dispatch(self, msgid, fname, fargs): + func = None + retobj = None + error = None + for obj in self.servers: + if callable(obj) and obj.__name__ == fname: + func = obj + elif hasattr(obj, fname): + func = getattr(obj, fname) + if func is not None: + break + + if func is not None: + retobj = func(*fargs) + else: + error = "Unbound function called %s" % (fname) + + self._send_msg(msgid, _MSG_TYPE_RESPONSE, error, retobj) + + def bind(self, obj): + """ + Register an object or a function to be called by the remote processor. + obj: An object whose methods can be called by remote processors, or a function. + """ + self.servers.append(obj) + + def start(self, firmware=None, num_channels=2, timeout=3000): + """ + Initializes OpenAMP, loads the remote processor's firmware and starts. + firmware: A path to an elf file stored in the filesystem, or an address to an entry point in flash. + num_channels: The number of channels to wait for the remote processor to + create before starting to communicate with it. + timeout: How long to wait for the remote processor to start, 0 means forever. + """ + # Initialize OpenAMP and set the New Service callback. + openamp.new_service_callback(self._bind_callback) + + # Keep a reference to the remote processor object, to stop the GC from collecting + # it, which would call the finaliser and shut down the remote processor while it's + # still being used. + self.rproc = openamp.RemoteProc(firmware) + self.rproc.start() + + # Wait for remote processor to announce the end points. + t = ticks_ms() + while len(self.epts) != num_channels: + if timeout > 0 and ticks_diff(ticks_ms(), t) > timeout: + raise OSError("timeout waiting for the remote processor to start") + sleep_ms(10) + + # Introduce a brief delay to allow the M4 sufficient time + # to bind remote functions before invoking them. + sleep_ms(100) + + def call(self, fname, *args, **kwargs): + """ + Synchronous call. The client is blocked until the RPC is finished. + """ + return self.call_async(fname, *args, *kwargs).join() + + def call_async(self, fname, *args, **kwargs): + """ + Asynchronous call. The client returns a Future object immediately. + """ + return self._send_msg(self.msgid, _MSG_TYPE_REQUEST, fname, list(args), *kwargs)