Skip to content

Commit ffd7e0e

Browse files
committed
tests/ports/rp2: Convert rp2.DMA test to a unittest.
This test is rather complicated and benefits from being a unittest. Signed-off-by: Damien George <damien@micropython.org>
1 parent 3f1df4b commit ffd7e0e

File tree

2 files changed

+136
-128
lines changed

2 files changed

+136
-128
lines changed

tests/ports/rp2/rp2_dma.py

Lines changed: 136 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -4,103 +4,142 @@
44
import time
55
import machine
66
import rp2
7+
import unittest
78

89
is_rp2350 = "RP2350" in sys.implementation._machine
910

10-
src = bytes(i & 0xFF for i in range(16 * 1024))
11-
12-
print("# test basic usage")
13-
14-
dma = rp2.DMA()
15-
16-
# Test printing.
17-
print(dma)
18-
19-
# Test pack_ctrl/unpack_ctrl.
20-
ctrl_dict = rp2.DMA.unpack_ctrl(dma.pack_ctrl())
21-
if is_rp2350:
22-
for entry in ("inc_read_rev", "inc_write_rev"):
23-
assert entry in ctrl_dict
24-
del ctrl_dict[entry]
25-
for key, value in sorted(ctrl_dict.items()):
26-
print(key, value)
27-
28-
# Test register access.
29-
dma.read = 0
30-
dma.write = 0
31-
dma.count = 0
32-
dma.ctrl = dma.pack_ctrl()
33-
print(dma.read, dma.write, dma.count, dma.ctrl & 0x01F, dma.channel, dma.registers)
34-
dma.close()
35-
36-
# Test closing when already closed.
37-
dma.close()
38-
39-
# Test using when closed.
40-
try:
41-
dma.active()
42-
assert False
43-
except ValueError:
44-
print("ValueError")
45-
46-
# Test simple memory copy.
47-
print("# test memory copy")
48-
dest = bytearray(1024)
49-
dma = rp2.DMA()
50-
dma.config(read=src, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(), trigger=False)
51-
print(not any(dest))
52-
dma.active(True)
53-
while dma.active():
54-
pass
55-
print(dest[:8], dest[-8:])
56-
dma.close()
57-
58-
59-
# Test time taken for a large memory copy.
60-
def run_and_time_dma(dma):
61-
ticks_us = time.ticks_us
62-
irq_state = machine.disable_irq()
63-
t0 = ticks_us()
64-
dma.active(True)
65-
while dma.active():
66-
pass
67-
t1 = ticks_us()
68-
machine.enable_irq(irq_state)
69-
return time.ticks_diff(t1, t0)
70-
71-
72-
print("# test timing")
73-
dest = bytearray(16 * 1024)
74-
dma = rp2.DMA()
75-
dma.read = src
76-
dma.write = dest
77-
dma.count = len(dest) // 4
78-
dma.ctrl = dma.pack_ctrl()
79-
dt = run_and_time_dma(dma)
80-
expected_dt_range = range(40, 70) if is_rp2350 else range(70, 125)
81-
print(dt in expected_dt_range)
82-
print(dest[:8], dest[-8:])
83-
dma.close()
84-
85-
# Test using .config(trigger=True).
86-
print("# test immediate trigger")
87-
dest = bytearray(1024)
88-
dma = rp2.DMA()
89-
dma.config(read=src, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(), trigger=True)
90-
while dma.active():
91-
pass
92-
print(dest[:8], dest[-8:])
93-
dma.close()
94-
95-
# Test the DMA.irq() method.
96-
print("# test irq")
97-
dest = bytearray(1024)
98-
dma = rp2.DMA()
99-
dma.irq(lambda dma: print("irq fired", dma.irq().flags()))
100-
dma.config(
101-
read=src, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(irq_quiet=0), trigger=True
102-
)
103-
while dma.active():
104-
pass
105-
print(dest[:8], dest[-8:])
106-
dma.close()
11+
SRC = bytes(i & 0xFF for i in range(16 * 1024))
12+
13+
14+
class Test(unittest.TestCase):
15+
def setUp(self):
16+
self.dma = rp2.DMA()
17+
18+
def tearDown(self):
19+
self.dma.close()
20+
21+
def test_printing(self):
22+
dma = self.dma
23+
self.assertEqual(str(dma), "DMA(0)")
24+
25+
def test_pack_unpack_ctrl(self):
26+
dma = self.dma
27+
ctrl_dict = rp2.DMA.unpack_ctrl(dma.pack_ctrl())
28+
if is_rp2350:
29+
self.assertEqual(len(ctrl_dict), 18)
30+
self.assertTrue("inc_read_rev" in ctrl_dict)
31+
self.assertTrue("inc_write_rev" in ctrl_dict)
32+
else:
33+
self.assertEqual(len(ctrl_dict), 16)
34+
self.assertEqual(ctrl_dict["ahb_err"], 0)
35+
self.assertEqual(ctrl_dict["bswap"], 0)
36+
self.assertEqual(ctrl_dict["busy"], 0)
37+
self.assertEqual(ctrl_dict["chain_to"], 0)
38+
self.assertEqual(ctrl_dict["enable"], 1)
39+
self.assertEqual(ctrl_dict["high_pri"], 0)
40+
self.assertEqual(ctrl_dict["inc_read"], 1)
41+
self.assertEqual(ctrl_dict["inc_write"], 1)
42+
self.assertEqual(ctrl_dict["irq_quiet"], 1)
43+
self.assertEqual(ctrl_dict["read_err"], 0)
44+
self.assertEqual(ctrl_dict["ring_sel"], 0)
45+
self.assertEqual(ctrl_dict["ring_size"], 0)
46+
self.assertEqual(ctrl_dict["size"], 2)
47+
self.assertEqual(ctrl_dict["sniff_en"], 0)
48+
self.assertEqual(ctrl_dict["treq_sel"], 63)
49+
self.assertEqual(ctrl_dict["write_err"], 0)
50+
51+
def test_register_access(self):
52+
dma = self.dma
53+
dma.read = 0
54+
dma.write = 0
55+
dma.count = 0
56+
dma.ctrl = dma.pack_ctrl()
57+
self.assertEqual(dma.read, 0)
58+
self.assertEqual(dma.write, 0)
59+
self.assertEqual(dma.count, 0)
60+
self.assertEqual(dma.ctrl & 0x01F, 25)
61+
self.assertEqual(dma.channel, 0)
62+
self.assertIsInstance(dma.registers, memoryview)
63+
64+
def test_close(self):
65+
dma = self.dma
66+
dma.close()
67+
68+
# Test closing when already closed.
69+
dma.close()
70+
71+
# Test using when closed.
72+
with self.assertRaises(ValueError):
73+
dma.active()
74+
75+
def test_simple_memory_copy(self):
76+
dma = self.dma
77+
dest = bytearray(1024)
78+
dma.config(read=SRC, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(), trigger=False)
79+
self.assertFalse(any(dest))
80+
dma.active(True)
81+
while dma.active():
82+
pass
83+
self.assertEqual(dest[:8], SRC[:8])
84+
self.assertEqual(dest[-8:], SRC[-8:])
85+
86+
def test_time_taken_for_large_memory_copy(self):
87+
def run_and_time_dma(dma):
88+
ticks_us = time.ticks_us
89+
irq_state = machine.disable_irq()
90+
t0 = ticks_us()
91+
dma.active(True)
92+
while dma.active():
93+
pass
94+
t1 = ticks_us()
95+
machine.enable_irq(irq_state)
96+
return time.ticks_diff(t1, t0)
97+
98+
dma = self.dma
99+
dest = bytearray(16 * 1024)
100+
dma.read = SRC
101+
dma.write = dest
102+
dma.count = len(dest) // 4
103+
dma.ctrl = dma.pack_ctrl()
104+
dt = run_and_time_dma(dma)
105+
expected_dt_range = range(40, 70) if is_rp2350 else range(70, 125)
106+
self.assertIn(dt, expected_dt_range)
107+
self.assertEqual(dest[:8], SRC[:8])
108+
self.assertEqual(dest[-8:], SRC[-8:])
109+
110+
def test_config_trigger(self):
111+
# Test using .config(trigger=True) to start DMA immediately.
112+
dma = self.dma
113+
dest = bytearray(1024)
114+
dma.config(read=SRC, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(), trigger=True)
115+
while dma.active():
116+
pass
117+
self.assertEqual(dest[:8], SRC[:8])
118+
self.assertEqual(dest[-8:], SRC[-8:])
119+
120+
def test_irq(self):
121+
def callback(dma):
122+
nonlocal irq_flags
123+
print("irq fired")
124+
irq_flags = dma.irq().flags()
125+
126+
dma = self.dma
127+
irq_flags = None
128+
dest = bytearray(1024)
129+
dma.irq(callback)
130+
dma.config(
131+
read=SRC,
132+
write=dest,
133+
count=len(dest) // 4,
134+
ctrl=dma.pack_ctrl(irq_quiet=0),
135+
trigger=True,
136+
)
137+
while dma.active():
138+
pass
139+
self.assertEqual(irq_flags, 1)
140+
self.assertEqual(dest[:8], SRC[:8])
141+
self.assertEqual(dest[-8:], SRC[-8:])
142+
143+
144+
if __name__ == "__main__":
145+
unittest.main()

tests/ports/rp2/rp2_dma.py.exp

Lines changed: 0 additions & 31 deletions
This file was deleted.

0 commit comments

Comments
 (0)