Skip to content

lib/msgpackrpc: Add msgpackrpc module. #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/msgpackrpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
MessagePack RPC protocol implementation for MicroPython.
See https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md
60 changes: 60 additions & 0 deletions lib/msgpackrpc/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# This file is part of the msgpack-rpc module.
# Any copyright is dedicated to the Public Domain.
# https://creativecommons.org/publicdomain/zero/1.0/

import time
import logging
import msgpackrpc
import gc


class Adder:
def __init__(self):
pass

def add(self, a, b):
logging.info(f"add({a}, {b}) is called")
return a + b


def sub(a, b):
logging.info(f"sub({a}, {b}) is called")
return a - b


if __name__ == "__main__":
# Configure the logger.
# All message equal to or higher than the logger level are printed.
logging.basicConfig(
datefmt="%H:%M:%S",
format="%(asctime)s.%(msecs)03d %(message)s",
level=logging.INFO,
)

# Create an RPC object
rpc = msgpackrpc.MsgPackRPC()

# Register objects or functions to be called by the remote processor.
rpc.bind(Adder())
rpc.bind(sub)

# Start the remote processor and wait for it to be ready to communicate.
rpc.start(firmware=0x08180000, timeout=1000, num_channels=2)

while True:
alloc = gc.mem_alloc()
# Async calls
f1 = rpc.call_async("add", 0, 1)
f2 = rpc.call_async("add", 0, 2)
f3 = rpc.call_async("add", 0, 3)

# Can join async call in any order
logging.info("async add(0, 3) => %d", f3.join())

# Sync call
res = rpc.call("add", 2, 2)
logging.info("sync add(2, 2) => %d" % res)
logging.info("async add(0, 1) => %d" % f1.join())
logging.info("async add(0, 2) => %d" % f2.join())
logging.info("memory per iteration %d" % (gc.mem_alloc() - alloc))
time.sleep_ms(250)
7 changes: 7 additions & 0 deletions lib/msgpackrpc/manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
metadata(
description="MessagePack RPC protocol implementation for MicroPython.",
version="0.0.1",
)

module("msgpackrpc.py")
require("msgpack", library="arduino-lib")
63 changes: 63 additions & 0 deletions lib/msgpackrpc/md5sum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# This file is part of the msgpack-rpc module.
# Any copyright is dedicated to the Public Domain.
# https://creativecommons.org/publicdomain/zero/1.0/

import logging
import msgpackrpc
import hashlib
import random
import binascii
from time import ticks_ms
from time import ticks_diff


hash_in = 0
hash_out = 0
DATA_BUF_SIZE = 256


def md5hash(buf):
global hash_out
hash_out += 1
return hashlib.md5(buf).digest()


def randbytes(size):
return bytes(random.getrandbits(8) for x in range(size))


if __name__ == "__main__":
# Configure the logger.
# All message equal to or higher than the logger level are printed.
logging.basicConfig(
datefmt="%H:%M:%S",
format="%(asctime)s.%(msecs)03d %(message)s",
level=logging.INFO,
)

# Create an RPC object
rpc = msgpackrpc.MsgPackRPC()
# Register objects or functions to be called by the remote processor.
rpc.bind(md5hash)
# Start the remote processor and wait for it to be ready to communicate.
rpc.start(firmware=0x08180000, timeout=3000, num_channels=2)

# Set data buffer size
rpc.call("set_buffer_size", DATA_BUF_SIZE)

data = randbytes(DATA_BUF_SIZE)
ticks_start = ticks_ms()
while True:
f = rpc.call_async("md5hash", data)
# While waiting for the remote processor we can generate the checksum
# of the data that was just sent, and generate new random data block.
digest = hashlib.md5(data).digest()
data = randbytes(DATA_BUF_SIZE)

# Read back the checksum of the first block
digest_ret = f.join()
if digest != digest_ret:
raise Exception(f"MD5 checksum doesn't match {binascii.hexlify(digest)} {binascii.hexlify(digest_ret)}")
khs = (hash_in + hash_out) / ticks_diff(ticks_ms(), ticks_start)
logging.info(f"hashes generated: {hash_out} hashes received: {hash_in} size: {DATA_BUF_SIZE} bytes KH/S: {khs}")
hash_in += 1
201 changes: 201 additions & 0 deletions lib/msgpackrpc/msgpackrpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# This file is part of the msgpack-rpc module.
# Copyright (c) 2023 Arduino SA
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
#
# MessagePack RPC protocol implementation for MicroPython.
# https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md

import logging
import openamp
import msgpack
from micropython import const
from io import BytesIO
from time import sleep_ms, ticks_ms, ticks_diff

_MSG_TYPE_REQUEST = const(0)
_MSG_TYPE_RESPONSE = const(1)
_MSG_TYPE_NOTIFY = const(2)


def log_level_enabled(level):
return logging.getLogger().isEnabledFor(level)


class Future:
def __init__(self, msgid, msgbuf, fname, fargs):
self.msgid = msgid
self.msgbuf = msgbuf
self.fname = fname
self.fargs = fargs

def join(self, timeout=0):
if log_level_enabled(logging.DEBUG):
logging.debug(f"join {self.fname}()")

if timeout > 0:
t = ticks_ms()

while self.msgid not in self.msgbuf:
if timeout > 0 and ticks_diff(ticks_ms(), t) > timeout:
raise OSError(f"Timeout joining function {self.fname}")
sleep_ms(100)

obj = self.msgbuf.pop(self.msgid)
if obj[2] is not None:
raise (OSError(obj[2]))

if log_level_enabled(logging.DEBUG):
logging.debug(f"call {self.fname}({self.fargs}) => {obj}")
return obj[3]


class MsgPackIO:
def __init__(self):
self.stream = BytesIO()

def feed(self, data):
offset = self.stream.tell()
self.stream.write(data)
self.stream.seek(offset)

def readable(self):
if self.stream.read(1):
offset = self.stream.tell()
self.stream.seek(offset - 1)
return True
return False

def truncate(self):
if self.readable():
offset = self.stream.tell()
self.stream = BytesIO(self.stream.getvalue()[offset:])

def __iter__(self):
return self

def __next__(self):
offset = self.stream.tell()
try:
obj = msgpack.unpack(self.stream)
self.truncate()
return obj
except Exception:
self.stream.seek(offset)
raise StopIteration


class MsgPackRPC:
def __init__(self, streaming=False):
"""
Create a MsgPack RPC object.
streaming: If True, messages can span multiple buffers, otherwise a buffer contains
exactly one full message. Note streaming mode is slower, so it should be disabled
if it's not needed.
"""
self.epts = {}
self.msgid = 0
self.msgbuf = {}
self.msgio = MsgPackIO() if streaming else None
self.servers = []

def _bind_callback(self, src, name):
if log_level_enabled(logging.INFO):
logging.info(f'New service announcement src: {src} name: "{name}"')
self.epts[name] = openamp.Endpoint(name, self._recv_callback, dest=src)
self.epts[name].send(b"\x00")

def _recv_callback(self, src, data):
if log_level_enabled(logging.DEBUG):
logging.debug(f"Received message on endpoint: {src} data: {bytes(data)}")

if self.msgio is None:
obj = msgpack.unpackb(data)
self._process_unpacked_obj(obj)
else:
self.msgio.feed(data)
for obj in self.msgio:
self._process_unpacked_obj(obj)

def _process_unpacked_obj(self, obj):
if obj[0] == _MSG_TYPE_RESPONSE:
self.msgbuf[obj[1]] = obj
elif obj[0] == _MSG_TYPE_REQUEST:
self._dispatch(obj[1], obj[2], obj[-1])
if log_level_enabled(logging.DEBUG):
logging.debug(f"Unpacked {type(obj)} val: {obj}")

def _send_msg(self, msgid, msgtype, fname, fargs, **kwargs):
timeout = kwargs.pop("timeout", 1000)
endpoint = kwargs.pop("endpoint", "rpc")
self.epts[endpoint].send(msgpack.packb([msgtype, msgid, fname, fargs]), timeout=timeout)
if msgtype == _MSG_TYPE_REQUEST:
self.msgid += 1
return Future(msgid, self.msgbuf, fname, fargs)

def _dispatch(self, msgid, fname, fargs):
func = None
retobj = None
error = None
for obj in self.servers:
if callable(obj) and obj.__name__ == fname:
func = obj
elif hasattr(obj, fname):
func = getattr(obj, fname)
if func is not None:
break

if func is not None:
retobj = func(*fargs)
else:
error = "Unbound function called %s" % (fname)

self._send_msg(msgid, _MSG_TYPE_RESPONSE, error, retobj)

def bind(self, obj):
"""
Register an object or a function to be called by the remote processor.
obj: An object whose methods can be called by remote processors, or a function.
"""
self.servers.append(obj)

def start(self, firmware=None, num_channels=2, timeout=3000):
"""
Initializes OpenAMP, loads the remote processor's firmware and starts.
firmware: A path to an elf file stored in the filesystem, or an address to an entry point in flash.
num_channels: The number of channels to wait for the remote processor to
create before starting to communicate with it.
timeout: How long to wait for the remote processor to start, 0 means forever.
"""
# Initialize OpenAMP and set the New Service callback.
openamp.new_service_callback(self._bind_callback)

# Keep a reference to the remote processor object, to stop the GC from collecting
# it, which would call the finaliser and shut down the remote processor while it's
# still being used.
self.rproc = openamp.RemoteProc(firmware)
self.rproc.start()

# Wait for remote processor to announce the end points.
t = ticks_ms()
while len(self.epts) != num_channels:
if timeout > 0 and ticks_diff(ticks_ms(), t) > timeout:
raise OSError("timeout waiting for the remote processor to start")
sleep_ms(10)

# Introduce a brief delay to allow the M4 sufficient time
# to bind remote functions before invoking them.
sleep_ms(100)

def call(self, fname, *args, **kwargs):
"""
Synchronous call. The client is blocked until the RPC is finished.
"""
return self.call_async(fname, *args, *kwargs).join()

def call_async(self, fname, *args, **kwargs):
"""
Asynchronous call. The client returns a Future object immediately.
"""
return self._send_msg(self.msgid, _MSG_TYPE_REQUEST, fname, list(args), *kwargs)