Skip to content

Commit 37e86d6

Browse files
committed
tools/test_serial.py: Add test for serial throughput.
This is a test script used to test USB CDC (or USB UART) serial reliability and throughput. Run against any MicroPython board with: $ python test_serial.py <device> Signed-off-by: Damien George <damien@micropython.org>
1 parent f9a755c commit 37e86d6

File tree

1 file changed

+303
-0
lines changed

1 file changed

+303
-0
lines changed

tools/test_serial.py

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
#!/usr/bin/env python
2+
#
3+
# Performance and reliability test for serial port communication.
4+
#
5+
# Usage:
6+
# test_serial.py [cdc-device]
7+
#
8+
# The `cdc-device` will default to /dev/ttyACM0.
9+
10+
import sys
11+
import time
12+
import argparse
13+
import serial
14+
15+
16+
def drain_input(ser):
17+
time.sleep(0.1)
18+
while ser.inWaiting() > 0:
19+
data = ser.read(ser.inWaiting())
20+
time.sleep(0.1)
21+
22+
def send_script(ser, script):
23+
chunk_size = 32
24+
for i in range(0, len(script), chunk_size):
25+
ser.write(script[i : i + chunk_size])
26+
time.sleep(0.01)
27+
ser.write(b"\x04") # eof
28+
ser.flush()
29+
response = ser.read(2)
30+
assert response == b"OK", response
31+
32+
read_test_script = """
33+
vcp_id = %u
34+
bin = True
35+
led = None
36+
try:
37+
import pyb
38+
pyb.LED(1).on()
39+
led = pyb.LED(2)
40+
assert pyb.USB_VCP(vcp_id).isconnected()
41+
wr=pyb.USB_VCP(vcp_id).send
42+
except:
43+
import sys
44+
if hasattr(sys.stdout,'buffer'):
45+
wr=sys.stdout.buffer.write
46+
else:
47+
wr=sys.stdout.write
48+
bin = False
49+
b=bytearray(%u)
50+
if bin:
51+
wr('BIN')
52+
for i in range(len(b)):
53+
b[i] = i & 0xff
54+
else:
55+
wr('TXT')
56+
for i in range(len(b)):
57+
b[i] = 0x20 + (i & 0x3f)
58+
for _ in range(%d):
59+
if led:
60+
led.toggle()
61+
wr(b)
62+
"""
63+
64+
65+
def read_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf):
66+
assert bufsize % 256 == 0 # for verify to work
67+
68+
# Load and run the read_test_script.
69+
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
70+
drain_input(ser_repl)
71+
script = bytes(read_test_script % (usb_vcp_id, bufsize, nbuf), "ascii")
72+
send_script(ser_repl, script)
73+
74+
# Read from the device the type of data that it will send (BIN or TXT).
75+
data_type = ser_data.read(3)
76+
77+
# Read data from the device, check it is correct, and measure throughput.
78+
n = 0
79+
last_byte = None
80+
t_start = time.time()
81+
remain = nbuf * bufsize
82+
total_data = bytearray(remain)
83+
while remain:
84+
t0 = time.monotonic_ns()
85+
while ser_data.inWaiting() == 0:
86+
if time.monotonic_ns() - t0 > 1e9:
87+
# timeout waiting for data from device
88+
break
89+
time.sleep(0.0001)
90+
if not ser_data.inWaiting():
91+
print("ERROR: timeout waiting for data")
92+
print(total_data[:n])
93+
return 0
94+
to_read = min(ser_data.inWaiting(), remain)
95+
data = ser_data.read(to_read)
96+
97+
# verify bytes coming in are in sequence
98+
# if last_byte is not None:
99+
# if data[0] != (last_byte + 1) & 0xff:
100+
# print('ERROR: first byte is not in sequence:', last_byte, data[0])
101+
# last_byte = data[-1]
102+
# for i in range(1, len(data)):
103+
# if data[i] != (data[i - 1] + 1) & 0xff:
104+
# print('ERROR: data not in sequence at position %d:' % i, data[i - 1], data[i])
105+
106+
remain -= len(data)
107+
print(n, nbuf * bufsize, end="\r")
108+
total_data[n : n + len(data)] = data
109+
n += len(data)
110+
t_end = time.time()
111+
for i in range(0, len(total_data)):
112+
if data_type == b"BIN":
113+
wanted = i & 0xFF
114+
else:
115+
wanted = 0x20 + (i & 0x3F)
116+
if total_data[i] != wanted:
117+
print("fail", i, wanted, total_data[i])
118+
ser_repl.write(b"\x03") # break
119+
t = t_end - t_start
120+
121+
# Print results.
122+
print(
123+
"DATA IN: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
124+
% (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
125+
)
126+
127+
return t
128+
129+
130+
write_test_script_verified = """
131+
import sys
132+
vcp_id = %u
133+
led=None
134+
try:
135+
import pyb
136+
pyb.LED(1).on()
137+
led=pyb.LED(2)
138+
assert pyb.USB_VCP(vcp_id).isconnected()
139+
rd=pyb.USB_VCP(vcp_id).recv
140+
except:
141+
rd=sys.stdin.readinto
142+
b=bytearray(%u)
143+
for _ in range(%u):
144+
if led:
145+
led.toggle()
146+
n = rd(b)
147+
fail = 0
148+
for i in range(n):
149+
if b[i] != 32 + (i & 0x3f):
150+
fail += 1
151+
if fail:
152+
sys.stdout.write(b'ER%%04u' %% fail)
153+
else:
154+
sys.stdout.write(b'OK%%04u' %% n)
155+
"""
156+
157+
write_test_script_unverified = """
158+
import sys
159+
vcp_id = %u
160+
led=None
161+
try:
162+
import pyb
163+
pyb.LED(1).on()
164+
led=pyb.LED(2)
165+
assert pyb.USB_VCP(vcp_id).isconnected()
166+
rd=pyb.USB_VCP(vcp_id).recv
167+
except:
168+
rd=sys.stdin.readinto
169+
b=bytearray(%u)
170+
for _ in range(%u):
171+
if led:
172+
led.toggle()
173+
n = rd(b)
174+
if n != len(b):
175+
sys.stdout.write(b'ER%%04u' %% n)
176+
else:
177+
sys.stdout.write(b'OK%%04u' %% n)
178+
"""
179+
180+
181+
def write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, verified):
182+
# Load and run the write_test_script.
183+
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
184+
drain_input(ser_repl)
185+
if verified:
186+
script = write_test_script_verified
187+
else:
188+
script = write_test_script_unverified
189+
script = bytes(script % (usb_vcp_id, bufsize, nbuf), "ascii")
190+
send_script(ser_repl, script)
191+
drain_input(ser_repl)
192+
193+
# Write data to the device, check it is correct, and measure throughput.
194+
n = 0
195+
t_start = time.time()
196+
buf = bytearray(bufsize)
197+
for i in range(len(buf)):
198+
buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars!
199+
for i in range(nbuf):
200+
ser_data.write(buf)
201+
n += len(buf)
202+
print(n, nbuf * bufsize, end="\r")
203+
# while ser_data.inWaiting() == 0:
204+
# time.sleep(0.001)
205+
# response = ser_data.read(ser_data.inWaiting())
206+
response = ser_repl.read(6)
207+
if response != b"OK%04u" % bufsize:
208+
print("bad response, expecting OK%04u, got %r" % (bufsize, response))
209+
t_end = time.time()
210+
ser_repl.write(b"\x03") # break
211+
t = t_end - t_start
212+
213+
# Print results.
214+
print(
215+
"DATA OUT: verify=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
216+
% (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
217+
)
218+
219+
return t
220+
221+
222+
def do_test(dev_repl, dev_data=None, usb_vcp_id=None, fast=False):
223+
if dev_data is None:
224+
print("REPL and data on", dev_repl)
225+
ser_repl = serial.Serial(dev_repl, baudrate=115200)
226+
ser_data = ser_repl
227+
usb_vcp_id = 0
228+
else:
229+
print("REPL on", dev_repl)
230+
print("data on", dev_data)
231+
print("USB VCP", usb_vcp_id)
232+
ser_repl = serial.Serial(dev_repl, baudrate=115200)
233+
ser_data = serial.Serial(dev_data, baudrate=115200)
234+
235+
if 0:
236+
for i in range(1000):
237+
print("======== TEST %04u ========" % i)
238+
read_test(ser_repl, ser_data, usb_vcp_id, 8000, 32)
239+
write_test(ser_repl, ser_data, usb_vcp_id, 8000, 32, True)
240+
return
241+
242+
read_test_params = [
243+
(256, 128),
244+
(512, 64),
245+
(1024, 64),
246+
(2048, 64),
247+
(4096, 64),
248+
(8192, 64),
249+
(16384, 64),
250+
]
251+
write_test_params = [
252+
(128, 32),
253+
(256, 16),
254+
(512, 16),
255+
(1024, 16),
256+
(2048, 16),
257+
(4096, 16),
258+
(8192, 32),
259+
(9999, 64),
260+
]
261+
262+
if fast:
263+
read_test_params = (read_test_params[-1],)
264+
write_test_params = (write_test_params[-1],)
265+
266+
for bufsize, nbuf in read_test_params:
267+
t = read_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf)
268+
if t > 4:
269+
break
270+
271+
for bufsize, nbuf in write_test_params:
272+
t = write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, True)
273+
if t > 4:
274+
break
275+
276+
for bufsize, nbuf in write_test_params:
277+
t = write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, False)
278+
if t > 4:
279+
break
280+
281+
ser_repl.close()
282+
ser_data.close()
283+
284+
285+
def main():
286+
dev_repl = "/dev/ttyACM0"
287+
dev_data = None
288+
usb_vcp_id = None
289+
fast = False
290+
if len(sys.argv) >= 2 and sys.argv[1] == "--fast":
291+
fast = True
292+
sys.argv.pop(1)
293+
if len(sys.argv) >= 2:
294+
dev_repl = sys.argv[1]
295+
if len(sys.argv) >= 3:
296+
assert len(sys.argv) >= 4
297+
dev_data = sys.argv[2]
298+
usb_vcp_id = int(sys.argv[3])
299+
do_test(dev_repl, dev_data, usb_vcp_id, fast)
300+
301+
302+
if __name__ == "__main__":
303+
main()

0 commit comments

Comments
 (0)