Skip to content

Commit 8ba37ce

Browse files
jonathanhoggjimmo
authored andcommitted
esp32/modules/machine.py: Add Counter and Encoder classes.
Adds a Python override of the `machine` module, which delegates to the built-in module and adds an implementation of `Counter` and `Encoder`, based on the `esp32.PCNT` class. Original implementation by: Jonathan Hogg <me@jonathanhogg.com> Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
1 parent a4a9fd2 commit 8ba37ce

File tree

1 file changed

+174
-0
lines changed

1 file changed

+174
-0
lines changed

ports/esp32/modules/machine.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import sys
2+
3+
_path = sys.path
4+
sys.path = ()
5+
try:
6+
import machine as _machine
7+
finally:
8+
sys.path = _path
9+
del _path
10+
del sys
11+
12+
13+
from micropython import const
14+
import esp32
15+
16+
if hasattr(esp32, "PCNT"):
17+
_PCNT_RANGE = const(32000)
18+
19+
class _CounterBase:
20+
_PCNT = esp32.PCNT
21+
# Singletons, keyed by PCNT unit_id (shared by both Counter & Encoder).
22+
_INSTANCES = {}
23+
24+
def __new__(cls, unit_id, *_args, **_kwargs):
25+
# Find an existing instance for this PCNT unit id.
26+
self = cls._INSTANCES.get(unit_id)
27+
28+
if self:
29+
# Verify that this PCNT is being used for the same type
30+
# (Encoder or Counter).
31+
if not isinstance(self, cls):
32+
raise ValueError("PCNT in use")
33+
else:
34+
# Previously unused PCNT unit.
35+
self = object.__new__(cls)
36+
cls._INSTANCES[unit_id] = self
37+
38+
# __init__ will now be called with the same args.
39+
return self
40+
41+
def __init__(self, unit_id, *args, filter_ns=0, **kwargs):
42+
if not hasattr(self, "_pcnt"):
43+
# New instance, or previously deinit-ed.
44+
self._pcnt = self._PCNT(unit_id, minimum=-_PCNT_RANGE, maximum=_PCNT_RANGE)
45+
46+
# Counter- or Encoder-specific configuration of self._pcnt.
47+
self._configure(*args, **kwargs)
48+
49+
# Common unit configuration.
50+
self._pcnt.init(
51+
filter=min(max(0, filter_ns * 80 // 1000), 1023),
52+
reset=True,
53+
)
54+
55+
# Note: We track number-of-overflows rather than the actual count in
56+
# order to avoid the IRQ handler overflowing MicroPython's "small int"
57+
# range. This gives an effective range of 2**30 overflows. User code
58+
# should use counter.value(0) to reset the overflow count.
59+
# The ESP32 PCNT resets to zero on under/overflow (i.e. it does not wrap
60+
# around to the opposite limit), so each overflow corresponds to exactly
61+
# _PCNT_RANGE counts.
62+
63+
# Reset counter state.
64+
self._overflows = 0
65+
self._offset = 0
66+
67+
# Install IRQ handler to handle under/overflow.
68+
self._pcnt.irq(self._overflow, self._PCNT.IRQ_MINIMUM | self._PCNT.IRQ_MAXIMUM)
69+
70+
# Start counting.
71+
self._pcnt.start()
72+
73+
# Handle counter under/overflow.
74+
def _overflow(self, mask):
75+
if mask & self._PCNT.IRQ_MINIMUM:
76+
self._overflows -= 1
77+
elif mask & self._PCNT.IRQ_MAXIMUM:
78+
self._overflows += 1
79+
80+
# Public machine.Counter & machine.Encoder API.
81+
def init(self, *args, **kwargs):
82+
self.__init__(None, *args, **kwargs)
83+
84+
# Public machine.Counter & machine.Encoder API.
85+
def deinit(self):
86+
if hasattr(self, "_pcnt"):
87+
self._pcnt.deinit()
88+
del self._pcnt
89+
90+
# Public machine.Counter & machine.Encoder API.
91+
def value(self, value=None):
92+
# This loop deals with the possibility that a PCNT overflow occurs
93+
# between retrieving self._overflows and self._pcnt.value().
94+
while True:
95+
overflows = self._overflows
96+
current = self._pcnt.value()
97+
# Calling PCNT.value() forces any pending PCNT interrupts to run
98+
# (see esp32_pcnt_run_irq_handlers). So self._overflows must now
99+
# be the the value corresponding to the value we read.
100+
if self._overflows == overflows:
101+
break
102+
103+
# Compute the result including the number of times we've cycled
104+
# through the range, and any applied offset.
105+
result = overflows * _PCNT_RANGE + current + self._offset
106+
107+
# If a new value is specified, then zero out the overflows, and set
108+
# self._offset so that it zeros out the current PCNT value. The
109+
# mutation to self._overflows is atomic w.r.t. the overflow IRQ
110+
# handler because the scheduler only runs on branch instructions.
111+
if value is not None:
112+
self._overflows -= overflows
113+
self._offset = value - current
114+
115+
return result
116+
117+
class Counter(_CounterBase):
118+
# Public machine.Counter API.
119+
RISING = 1
120+
FALLING = 2
121+
UP = _CounterBase._PCNT.INCREMENT
122+
DOWN = _CounterBase._PCNT.DECREMENT
123+
124+
# Counter-specific configuration.
125+
def _configure(self, src, edge=RISING, direction=UP):
126+
# Only use the first channel.
127+
self._pcnt.init(
128+
channel=0,
129+
pin=src,
130+
rising=direction if edge & Counter.RISING else self._PCNT.IGNORE,
131+
falling=direction if edge & Counter.FALLING else self._PCNT.IGNORE,
132+
)
133+
134+
class Encoder(_CounterBase):
135+
# Encoder-specific configuration.
136+
def _configure(self, phase_a, phase_b, phases=1):
137+
if phases not in (1, 2, 4):
138+
raise ValueError("phases")
139+
# Configure the first channel.
140+
self._pcnt.init(
141+
channel=0,
142+
pin=phase_a,
143+
falling=self._PCNT.INCREMENT,
144+
rising=self._PCNT.DECREMENT,
145+
mode_pin=phase_b,
146+
mode_low=self._PCNT.HOLD if phases == 1 else self._PCNT.REVERSE,
147+
)
148+
if phases == 4:
149+
# For 4x quadrature, enable the second channel.
150+
self._pcnt.init(
151+
channel=1,
152+
pin=phase_b,
153+
falling=self._PCNT.DECREMENT,
154+
rising=self._PCNT.INCREMENT,
155+
mode_pin=phase_a,
156+
mode_low=self._PCNT.REVERSE,
157+
)
158+
else:
159+
# For 1x and 2x quadrature, disable the second channel.
160+
self._pcnt.init(channel=1, pin=None, rising=self._PCNT.IGNORE)
161+
self._phases = phases
162+
163+
def phases(self):
164+
return self._phases
165+
166+
del _CounterBase
167+
168+
169+
del esp32
170+
171+
172+
# Delegate to built-in machine module.
173+
def __getattr__(attr):
174+
return getattr(_machine, attr)

0 commit comments

Comments
 (0)