Skip to content

tools/test_serial.py: Add test for serial throughput. #15909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 303 additions & 0 deletions tools/test_serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
#!/usr/bin/env python
#
# Performance and reliability test for serial port communication.
#
# Usage:
# test_serial.py [cdc-device]
#
# The `cdc-device` will default to /dev/ttyACM0.

import sys
import time
import argparse
import serial


def drain_input(ser):
time.sleep(0.1)
while ser.inWaiting() > 0:
data = ser.read(ser.inWaiting())

Check failure on line 19 in tools/test_serial.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F841)

tools/test_serial.py:19:9: F841 Local variable `data` is assigned to but never used
time.sleep(0.1)

def send_script(ser, script):
chunk_size = 32
for i in range(0, len(script), chunk_size):
ser.write(script[i : i + chunk_size])
time.sleep(0.01)
ser.write(b"\x04") # eof
ser.flush()
response = ser.read(2)
assert response == b"OK", response

read_test_script = """
vcp_id = %u
bin = True
led = None
try:
import pyb
pyb.LED(1).on()
led = pyb.LED(2)
assert pyb.USB_VCP(vcp_id).isconnected()
wr=pyb.USB_VCP(vcp_id).send
except:
import sys
if hasattr(sys.stdout,'buffer'):
wr=sys.stdout.buffer.write
else:
wr=sys.stdout.write
bin = False
b=bytearray(%u)
if bin:
wr('BIN')
for i in range(len(b)):
b[i] = i & 0xff
else:
wr('TXT')
for i in range(len(b)):
b[i] = 0x20 + (i & 0x3f)
for _ in range(%d):
if led:
led.toggle()
wr(b)
"""


def read_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf):
assert bufsize % 256 == 0 # for verify to work

# Load and run the read_test_script.
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
drain_input(ser_repl)
script = bytes(read_test_script % (usb_vcp_id, bufsize, nbuf), "ascii")
send_script(ser_repl, script)

# Read from the device the type of data that it will send (BIN or TXT).
data_type = ser_data.read(3)

# Read data from the device, check it is correct, and measure throughput.
n = 0
last_byte = None

Check failure on line 79 in tools/test_serial.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F841)

tools/test_serial.py:79:5: F841 Local variable `last_byte` is assigned to but never used
t_start = time.time()
remain = nbuf * bufsize
total_data = bytearray(remain)
while remain:
t0 = time.monotonic_ns()
while ser_data.inWaiting() == 0:
if time.monotonic_ns() - t0 > 1e9:
# timeout waiting for data from device
break
time.sleep(0.0001)
if not ser_data.inWaiting():
print("ERROR: timeout waiting for data")
print(total_data[:n])
return 0
to_read = min(ser_data.inWaiting(), remain)
data = ser_data.read(to_read)

# verify bytes coming in are in sequence
# if last_byte is not None:
# if data[0] != (last_byte + 1) & 0xff:
# print('ERROR: first byte is not in sequence:', last_byte, data[0])
# last_byte = data[-1]
# for i in range(1, len(data)):
# if data[i] != (data[i - 1] + 1) & 0xff:
# print('ERROR: data not in sequence at position %d:' % i, data[i - 1], data[i])

remain -= len(data)
print(n, nbuf * bufsize, end="\r")
total_data[n : n + len(data)] = data
n += len(data)
t_end = time.time()
for i in range(0, len(total_data)):
if data_type == b"BIN":
wanted = i & 0xFF
else:
wanted = 0x20 + (i & 0x3F)
if total_data[i] != wanted:
print("fail", i, wanted, total_data[i])
ser_repl.write(b"\x03") # break
t = t_end - t_start

# Print results.
print(
"DATA IN: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
% (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
)

return t


write_test_script_verified = """
import sys
vcp_id = %u
led=None
try:
import pyb
pyb.LED(1).on()
led=pyb.LED(2)
assert pyb.USB_VCP(vcp_id).isconnected()
rd=pyb.USB_VCP(vcp_id).recv
except:
rd=sys.stdin.readinto
b=bytearray(%u)
for _ in range(%u):
if led:
led.toggle()
n = rd(b)
fail = 0
for i in range(n):
if b[i] != 32 + (i & 0x3f):
fail += 1
if fail:
sys.stdout.write(b'ER%%04u' %% fail)
else:
sys.stdout.write(b'OK%%04u' %% n)
"""

write_test_script_unverified = """
import sys
vcp_id = %u
led=None
try:
import pyb
pyb.LED(1).on()
led=pyb.LED(2)
assert pyb.USB_VCP(vcp_id).isconnected()
rd=pyb.USB_VCP(vcp_id).recv
except:
rd=sys.stdin.readinto
b=bytearray(%u)
for _ in range(%u):
if led:
led.toggle()
n = rd(b)
if n != len(b):
sys.stdout.write(b'ER%%04u' %% n)
else:
sys.stdout.write(b'OK%%04u' %% n)
"""


def write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, verified):
# Load and run the write_test_script.
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
drain_input(ser_repl)
if verified:
script = write_test_script_verified
else:
script = write_test_script_unverified
script = bytes(script % (usb_vcp_id, bufsize, nbuf), "ascii")
send_script(ser_repl, script)
drain_input(ser_repl)

# Write data to the device, check it is correct, and measure throughput.
n = 0
t_start = time.time()
buf = bytearray(bufsize)
for i in range(len(buf)):
buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars!
for i in range(nbuf):
ser_data.write(buf)
n += len(buf)
print(n, nbuf * bufsize, end="\r")
# while ser_data.inWaiting() == 0:
# time.sleep(0.001)
# response = ser_data.read(ser_data.inWaiting())
response = ser_repl.read(6)
if response != b"OK%04u" % bufsize:
print("bad response, expecting OK%04u, got %r" % (bufsize, response))
t_end = time.time()
ser_repl.write(b"\x03") # break
t = t_end - t_start

# Print results.
print(
"DATA OUT: verify=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
% (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
)

return t


def do_test(dev_repl, dev_data=None, usb_vcp_id=None, fast=False):
if dev_data is None:
print("REPL and data on", dev_repl)
ser_repl = serial.Serial(dev_repl, baudrate=115200)
ser_data = ser_repl
usb_vcp_id = 0
else:
print("REPL on", dev_repl)
print("data on", dev_data)
print("USB VCP", usb_vcp_id)
ser_repl = serial.Serial(dev_repl, baudrate=115200)
ser_data = serial.Serial(dev_data, baudrate=115200)

if 0:
for i in range(1000):
print("======== TEST %04u ========" % i)
read_test(ser_repl, ser_data, usb_vcp_id, 8000, 32)
write_test(ser_repl, ser_data, usb_vcp_id, 8000, 32, True)
return

read_test_params = [
(256, 128),
(512, 64),
(1024, 64),
(2048, 64),
(4096, 64),
(8192, 64),
(16384, 64),
]
write_test_params = [
(128, 32),
(256, 16),
(512, 16),
(1024, 16),
(2048, 16),
(4096, 16),
(8192, 32),
(9999, 64),
]

if fast:
read_test_params = (read_test_params[-1],)
write_test_params = (write_test_params[-1],)

for bufsize, nbuf in read_test_params:
t = read_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf)
if t > 4:
break

for bufsize, nbuf in write_test_params:
t = write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, True)
if t > 4:
break

for bufsize, nbuf in write_test_params:
t = write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, False)
if t > 4:
break

ser_repl.close()
ser_data.close()


def main():
dev_repl = "/dev/ttyACM0"
dev_data = None
usb_vcp_id = None
fast = False
if len(sys.argv) >= 2 and sys.argv[1] == "--fast":
fast = True
sys.argv.pop(1)
if len(sys.argv) >= 2:
dev_repl = sys.argv[1]
if len(sys.argv) >= 3:
assert len(sys.argv) >= 4
dev_data = sys.argv[2]
usb_vcp_id = int(sys.argv[3])
do_test(dev_repl, dev_data, usb_vcp_id, fast)


if __name__ == "__main__":
main()
Loading