Skip to content

Add RPI PIO example for quadrature encoder. #6894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

sanyi
Copy link

@sanyi sanyi commented Feb 13, 2021

Example that is using StateMachine to offload quadrature encoder pulling.

Example that is using StateMachine to offload quadrature encoder pooling.
@sanyi sanyi changed the title Create pio_quadrature_encoder.py Add RPI PIO example for quadrature encoder Feb 13, 2021
@sanyi sanyi changed the title Add RPI PIO example for quadrature encoder Add RPI PIO example for quadrature encoder. Feb 13, 2021
@IhorNehrutsa
Copy link
Contributor

May you test?
MicroPython quadrature incremental encoder

@robert-hh
Copy link
Contributor

robert-hh commented Dec 28, 2021

After fixing two coding errors it works so far. At 125Mhz up to ~3.5KHz signal frequency, at 250MHz up to ~7 kHz.
The errors:
line 75: rp2 not defined. Cure: add a import rp2
line 92: pio_quad not defined. Cure: use pio_quadrature.

@sanyi
Copy link
Author

sanyi commented Dec 28, 2021

@robert-hh fixed, thank you. The stats are interesting. Not exactly expert in this, i am curious if what i did is actually helpful and or could be done more efficiently.

@robert-hh
Copy link
Contributor

robert-hh commented Dec 28, 2021

The example is very helpful, showing what can be done with PIO. The code is really tricky. The speed is limited by the irq latency. It can be marginally (~15%) improved by adding the @micropython.native decorator to encoder_state_changed_irq_handler().
It would be interesting to compare if to a simple pin.irq() base encoder.
Edit: The pin.irq() based encoder works up to ~2kHz at a 125 MHz clock freq.

@robert-hh
Copy link
Contributor

I spent a few moments to pack your encoder into the portable Encoder class of @peterhinch, hoping that class member accesses are faster than global symbol accesses. They are not. It's the opposite. But the code looks more like the other Encoder projects now.

# encoder_rp2.py

# Encoder Support for RPi Pico

# Copyright (c) 2021 Sandor Attila Gerendi (PIO adaption)
# Copyright (c) 2017-2021 Peter Hinch (base class)
# Released under the MIT License (MIT) - see LICENSE file

from machine import Pin
from rp2 import PIO, StateMachine,
import rp2
import array

class Encoder:
    def __init__(self, base_pin, scale=1):
        self.scale = scale
        self.forward = True
        self.base_pin = Pin(base_pin)
        self.state_look_up_table = array.array("b", [
            # Direction = 1
             0, # 00 to 00
            -1, # 00 to 01
            +1, # 00 to 10
            +2, # 00 to 11

            +1, # 01 to 00
             0, # 01 to 01
            +2, # 01 to 10
            -1, # 01 to 11

            -1, # 10 to 00
            +2, # 10 to 01
             0, # 10 to 10
            +1, # 10 to 11

            +2, # 11 to 00
            +1, # 11 to 01
            -1, # 11 to 10
             0, # 11 to 11

            # Direction = 0
             0, # 00 to 00
            -1, # 00 to 01
            +1, # 00 to 10
            -2, # 00 to 11

            +1, # 01 to 00
             0, # 01 to 01
            -2, # 01 to 10
            -1, # 01 to 11

            -1, # 10 to 00
            -2, # 10 to 01
             0, # 10 to 10
            +1, # 10 to 11

            -2, # 11 to 00
            +1, # 11 to 01
            -1, # 11 to 10
             0, # 11 to 11
            ])
        self._pos = 0
        self.direction = 0
        self.lut_index = 0

        self.sm = StateMachine(0, self.pio_quadrature, freq=12500000, in_base=self.base_pin)
        self.sm.irq(self.encoder_state_changed_irq_handler)
        self.sm.exec("set(y, 99)")  # add a last value for y that would be always different then the input
        self.sm.active(1)

    def encoder_state_changed_irq_handler(self, sm):
        self.lut_index |= sm.get() & 3
        self._pos += self.state_look_up_table[self.lut_index]
        if self.state_look_up_table[self.lut_index] != 0:
            self.direction = 1 if (self.state_look_up_table[self.lut_index] > 0) else 0
        self.lut_index = ((self.lut_index << 2) & 0b1100) | (self.direction << 4)

    @rp2.asm_pio()
    def pio_quadrature(in_init=rp2.PIO.IN_LOW):
        wrap_target()
        label("again")
        in_(pins, 2)
        mov(x, isr)
        jmp(x_not_y, "push_data")
        mov(isr, null)
        jmp("again")
        label("push_data")
        push()
        irq(block, rel(0))
        mov(y, x)
        wrap()

    def position(self, value=None):
        if value is not None:
            self._pos = round(value / self.scale)  # # Improvement provided by @IhorNehrutsa
        return self._pos * self.scale

    def value(self, value=None):
        if value is not None:
            self._pos = value
        return self._pos

@IhorNehrutsa
Copy link
Contributor

I think +2/-2 in state_look_up_table are wrong.
They are Error State transitions.

See Quadrature decoder state table
https://en.wikipedia.org/wiki/Incremental_encoder#Quadrature_decoder

@sanyi
Copy link
Author

sanyi commented Dec 28, 2021

@robert-hh I am not sure if this applies to micropython. But, objects store their attributes in dictionaries, and there is some related cost in creating, maintaining (RAM) and accessing them. My python code is kind of ugly, but I wanted to avoid any overhead.

However, in normal python, one can uses __slots__ that gives a slight boost in performance and storage space:

class A:
    __slots__ = ('a', 'b')
    def __init__(self):
        self.a = 1
        self.b = 2
    def increment(self):
        self.a += self.b

class B:
    def __init__(self):
        self.a = 1
        self.b = 2
    def increment(self):
        self.a += self.b

def test_a():
    a = A()
    for i in range(0, 100000):
        a.increment()
    print(a.a)


def test_b():
    b = B()
    for i in range(0, 100000):
        b.increment()
    print(b.a)


import timeit
print(timeit.timeit(test_a, number=1))
print(timeit.timeit(test_b, number=1))

However I cannot test this now on the RPI since I don't have one with me (I am not at home).

@sanyi
Copy link
Author

sanyi commented Dec 28, 2021

@IhorNehrutsa the Wikipedia example take into consideration only one step changes while this implementation can infer 2 steps given the previous direction was known. Please follow the link from where i was inspired and the related links from within, there are some quite interesting observations in those.
Seems pretty logical to me, but I could also be wrong since I did not spend time to lay out all on paper and check step with step.

@IhorNehrutsa
Copy link
Contributor

from utime import ticks_diff, ticks_us
from uarray import array

x1 = (0, -1, 1, 0, 1, 0, 0, -1, -1, 0, 0, 1, 0, 1, -1, 0)
x2 = array("b", (0, -1, 1, 0, 1, 0, 0, -1, -1, 0, 0, 1, 0, 1, -1, 0))
x3 = bytearray(b'\x00\xff\x01\x00\x01\x00\x00\xff\xff\x00\x00\x01\x00\x01\xff\x00')

print(x1)
print(x2)
print(x3, x3[1])

N = 100_000

s1 = 0
s2 = 0
s3 = 0

t1 = ticks_us()
for _ in range(N):
    s1 += x1[13]
t2 = ticks_us()
for _ in range(N):
    s2 += x2[13]
t3 = ticks_us()
for _ in range(N):
    s3 += x3[13]
t4 = ticks_us()
    
print(ticks_diff(t2, t1), ticks_diff(t3, t2), ticks_diff(t4, t3))
MicroPython v1.17-526-g9faaafda4-dirty on 2021-12-29; ESP32 module with ESP32
Type "help()" for more information.

(0, -1, 1, 0, 1, 0, 0, -1, -1, 0, 0, 1, 0, 1, -1, 0)
array('b', [0, -1, 1, 0, 1, 0, 0, -1, -1, 0, 0, 1, 0, 1, -1, 0])
bytearray(b'\x00\xff\x01\x00\x01\x00\x00\xff\xff\x00\x00\x01\x00\x01\xff\x00') 255
739730 763872 765394
  1. tuple is faster on ESP32
  2. bytearray(b'\xff') == 255 != -1

@IhorNehrutsa
Copy link
Contributor

this implementation can infer 2 steps given the previous direction was known.

I think that taking the previous direction is a weak assumption.
In fact both CCW and CW are possible.

@peterhinch
Copy link
Contributor

@robert-hh I've a new take on this. In my opinion your OO approach is the right way because it allows more than one instance to exist, so constructors take a SM no. and a Pin object as args. As I'm sure you've spotted I prefer an approach using simple logic rather than a LUT.

I have produced two versions, one uses Viper for the ISR, the other does not. You may want to compare the performance with the other solutions. As you probably know, getting a Viper function to retain state is difficult. Consequently this script is not as easy to follow: which to use depends on what these demos are intended to achieve.

Non-Viper:

from machine import Pin
import rp2

class Encoder:
    def __init__(self, sm_no, base_pin, scale=1):
        self.scale = scale
        self._pos = 0
        self._x0 = 0
        self.sm = rp2.StateMachine(sm_no, self.pio_quadrature, in_base=base_pin)
        self.sm.irq(self.isr)
        self.sm.exec("set(y, 99)")  # Ensure initial y differs from the input
        self.sm.active(1)

    def isr(self, sm):
        while sm.rx_fifo():
            v = sm.get() & 3
            x = v & 1
            y = v >> 1
            s = 1 if (x ^ y) else -1
            self._pos += s if (x ^ self._x0) else -s
            self._x0 = x

    @rp2.asm_pio()
    def pio_quadrature(in_init=rp2.PIO.IN_LOW):
        wrap_target()
        label("again")
        in_(pins, 2)
        mov(x, isr)
        jmp(x_not_y, "push_data")
        mov(isr, null)
        jmp("again")
        label("push_data")
        push()
        irq(block, rel(0))
        mov(y, x)
        wrap()

    def position(self, value=None):
        if value is not None:
            self._pos = round(value / self.scale)
        return self._pos * self.scale

    def value(self, value=None):
        if value is not None:
            self._pos = value
        return self._pos

Viper

from machine import Pin
from array import array
import rp2

def make_isr(pos):  # Closure enables Viper to retain state
    old_x = array('i', (0,))  # but nonlocal doesn't work so using arrays
    @micropython.viper
    def isr(sm):
        i = ptr32(pos)
        p = ptr32(old_x)
        while sm.rx_fifo():
            v : int = int(sm.get()) & 3
            x : int = v & 1
            y : int = v >> 1
            s : int = 1 if (x ^ y) else -1
            i[0] = i[0] + (s if (x ^ p[0]) else (0 - s))
            p[0] = x
    return isr

class Encoder:
    def __init__(self, sm_no, base_pin, scale=1):
        self.scale = scale
        self._pos = array("i", (0,))  # [pos]
        self.sm = rp2.StateMachine(sm_no, self.pio_quadrature, in_base=base_pin)
        self.sm.irq(make_isr(self._pos))  # Instantiate the closure
        self.sm.exec("set(y, 99)")  # Initialise y: guarantee different to the input
        self.sm.active(1)

    @rp2.asm_pio()
    def pio_quadrature(in_init=rp2.PIO.IN_LOW):
        wrap_target()
        label("again")
        in_(pins, 2)
        mov(x, isr)
        jmp(x_not_y, "push_data")
        mov(isr, null)
        jmp("again")
        label("push_data")
        push()
        irq(block, rel(0))
        mov(y, x)
        wrap()

    def position(self, value=None):
        if value is not None:
            self._pos[0] = round(value / self.scale)
        return self._pos[0] * self.scale

    def value(self, value=None):
        if value is not None:
            self._pos[0] = value
        return self._pos[0]

@robert-hh
Copy link
Contributor

At 125MHz clock and not other load, like in the other tests, the non-viper version works up tp ~6kHz, the viper version up to ~10kHz.

P.S.: I like the implementation of your ISR functions. Very clear and concise.

@robert-hh
Copy link
Contributor

Just for interest I checked the latency of the RPI, using Pin.IRQ. The latency there is ~22µs until a callback creates a pulse at a Pin. That matches the ~10Khz figure, which is 25µs for a quadrature signal phase.

@sanyi
Copy link
Author

sanyi commented Dec 30, 2021

@peterhinch agree OO is cleaner. Many interesting things learned here. Didn't know about Viper.
Maybe we could go for your solution and close this pull request.
My project is shelved for now and I cannot test my modifications properly.
Originally I wanted to build a simple motor controller with PID and some telemetry that would command multiple 4+ motors + encoders at once. I wanted to minimize the pulling effort, then I found RPI and its PIO hence the example I built.

EDIT: I think the pio_quadrature can be expanded to look for 4 encoder state changes at once.

@peterhinch
Copy link
Contributor

I think the pio_quadrature can be expanded to look for 4 encoder state changes at once.

I assume you're talking about handling two encoders. I think this would complicate - and slow down - the ISR. My solution should support up to 8 encoders (the number of SM's). Of course each encoder uses a SM which could be a problem if you need some for other purposes.

That said, your PIO code is very neat. I can't help thinking there must be other uses for firing an interrupt on a state change of any one of N inputs.

Didn't know about Viper.

It's fast but tricky to use. Support for closures is problematic.

@peterhinch
Copy link
Contributor

@robert-hh I have attempted to optimise the Viper closure as follows. The rest of the code is unchanged. You might like to run your timing test on it.

def make_isr(pos):
    vals = array("i", (1, -1, 0))  # vals[2] is previous x
    @micropython.viper
    def isr(sm):
        i = ptr32(pos)  # Position
        g = ptr32(vals)
        while sm.rx_fifo():
            v : int = int(sm.get()) & 3
            i[0] += g[int((v >> 1) ^ g[2])]
            g[2] = v & 1
    return isr

It's a bit cryptic, but I can't see a way to optimise it further.

@rkompass
Copy link

Hello,

perhaps we can let this topic open.
I devised encoders using RPi2040 PIO which are quite fast by counting X register within PIO up and down.
Codes are at https://forum.micropython.org/viewtopic.php?f=21&t=12277.
The overall code design is not as elegant as the above solutions yet.

Meanwhile I devised a simulation program which generates quadrature codes including some bouncing before the onset of stable impulses, also using PIO.
It is not fully finished yet but present code works:

from machine import Pin

from rp2 import PIO, StateMachine, asm_pio
from time import sleep_ms, sleep_us

class QGen_Pio:
    def __init__(self, pins, sm_id=4, freq=10_000_000):
        if not isinstance(pins, (tuple, list)) or len(pins) != 2:
            raise ValueError('pair of 2 successive pins required')
        pinA = int(str(pins[0]).split(')')[0].split('(')[1].split(',')[0])
        pinB = int(str(pins[1]).split(')')[0].split('(')[1].split(',')[0])
        if abs(pinA-pinB) != 1:
            raise ValueError('pair of 2 successive pins required')
        pin_base = pins[0] if pinA < pinB else pins[1]
        self.qgen = StateMachine(sm_id, self.sm_qgen, freq=freq, set_base=pin_base) # we have no out_count=2

    @staticmethod    # 30 instructions
    @rp2.asm_pio(set_init=(PIO.OUT_LOW, PIO.OUT_LOW), in_shiftdir=PIO.SHIFT_LEFT, out_shiftdir=PIO.SHIFT_RIGHT)
    def sm_qgen():
        wrap_target()
        pull(block)            
        mov(x, osr)            # x determines how many cycles to be run
        pull(block)
        mov(isr, osr)          # ISR saves inner cycle counts
        label("startdelay")
        set(pins, 0b00) [31]
        jmp(y_dec, "startdelay")
        # ---------------------
        label("loop")
        mov(osr, isr)          # and this value is recreated in OSR in every loop cycle
        out(y, 4)              # 4 bit count b_x
        label("bounceX")
        jmp(y_dec, "bounceX_sta")
        jmp("bounceX_fin")
        label("bounceX_sta")
        set(pins, 0b01) [7]
        set(pins, 0b00) [22]
        jmp("bounceX")
        label("bounceX_fin")
        out(y, 6)              # 6 bit count s_x
        label("stableX")
        set(pins, 0b01) [31]
        jmp(y_dec, "stableX")
        out(y, 4)              # 4 bit count b_y
        label("bounceY")
        jmp(y_dec, "bounceY_sta")
        jmp("bounceY_fin")
        label("bounceY_sta")
        set(pins, 0b11) [11]
        set(pins, 0b01) [18]
        jmp("bounceY")
        label("bounceY_fin")
        out(y, 6)              # 6 bit count s_yx
        label("stableYX")
        set(pins, 0b11) [31]
        jmp(y_dec, "stableYX")
        out(y, 6)              # 6 bit count s_y
        label("stableY")
        set(pins, 0b10) [31]
        jmp(y_dec, "stableY")
        out(y, 6)              # 6 bit count s_0
        label("stable0")
        set(pins, 0b00) [31]
        jmp(y_dec, "stable0")
        jmp(x_dec, "loop")
        wrap()

    def cycle_time(self, freq, b_x, s_x, b_y, s_yx, s_y, s_0):
        return (32 * (b_x + s_x + b_y + s_yx + s_y + s_0) + 6) * 1000000 / freq
    
    def generate(self, startdelay, cycles, b_x, s_x, b_y, s_yx, s_y, s_0):
        if cycles < 1:
            raise ValueError('cycles >= 1 required')
        if s_x < 1 or s_yx < 1 or s_y < 1 or s_0 < 1 or s_x > 64 or s_yx > 64 or s_y > 64 or s_0 > 64:
            raise ValueError('s_x, s_yx, s_y and s_0 have to be in the range 1 .. 64')
        if b_x < 0 or b_y < 0 or b_x > 15 or b_y > 15:
            raise ValueError('b_x and b_y have to be in the range 0 .. 15')
        counts = 0
        counts = counts << 6 |  (s_0-1) & 0b111111  # 1 - 64
        counts = counts << 6 |  (s_y-1) & 0b111111  # 1 - 64
        counts = counts << 6 | (s_yx-1) & 0b111111  # 1 - 64
        counts = counts << 4 |   b_y    & 0b1111    # 0 - 15
        counts = counts << 6 |  (s_x-1) & 0b111111  # 1 - 64
        counts = counts << 4 |   b_x    & 0b1111    # 0 - 15 ; last shifted to left, first read out to right
        self.qgen.active(0)           # stopped anywhere, perhaps in pull(block)
        self.qgen.exec("set(y, 0)")
        self.qgen.exec("mov(pc, y)")  # start at 0, we have 32 PIO instructions so this works 
        self.qgen.put(startdelay)
        self.qgen.exec("pull(block)")
        self.qgen.exec("mov(y, osr)")
        self.qgen.put(cycles-1)
        self.qgen.put(counts)
        self.qgen.active(1)

The generator has many parameters which are best understood by the appended illustration.
b_x ... bouncing x
s_x ... stable x
b_y ... bouncing y
...
QGenIllu1

b_y and b_y may be zero, then there is no bouncing.
A usage example:

pinA = Pin(8, Pin.OUT)
pinB = Pin(9, Pin.OUT)

freq=2_400_000
qgen = QGen_Pio((pinA, pinB), freq=freq)

print('Generate quadrature signal with Pio at', freq, 'Hz')
print('Cycle time: ', qgen.cycle_time(freq, 5, 5, 4, 5, 5, 5), 'us')
qgen.generate(100, 117, 5, 5, 4, 5, 5, 5)

This will allow for a systematic testing of many encoders.

@wallComputer
Copy link

wallComputer commented Jul 27, 2022

I think the pio_quadrature can be expanded to look for 4 encoder state changes at once.

I assume you're talking about handling two encoders. I think this would complicate - and slow down - the ISR. My solution should support up to 8 encoders (the number of SM's). Of course each encoder uses a SM which could be a problem if you need some for other purposes.

That said, your PIO code is very neat. I can't help thinking there must be other uses for firing an interrupt on a state change of any one of N inputs.

Didn't know about Viper.

It's fast but tricky to use. Support for closures is problematic.

@peterhinch , I tried just that, adding four magnetic wheel encoders to 4 pairs, and assigning them different SM (from 0 to 3). However changing the first encoder (rotating the wheel by hand connected to SM 0), affects the output of the other three too, while changing the other three does not affect their position values. This is same for both Viper and non-Viper codes. Could you please let me know where I'm going wrong here?
Thanks in advance.

I apologise for this comment, I overlooked a simple error on my part. The below code works just fine. I formatted my strings incorrectly.
Code:

import time
import rp2
from array import array
from machine import Pin

def make_isr(pos):  # Closure enables Viper to retain state
    old_x = array('i', (0,))  # but nonlocal doesn't work so using arrays

    @micropython.viper
    def isr(sm):
        i = ptr32(pos)
        p = ptr32(old_x)
        while sm.rx_fifo():
            v: int = int(sm.get()) & 3
            x: int = v & 1
            y: int = v >> 1
            s: int = 1 if (x ^ y) else -1
            i[0] = i[0] + (s if (x ^ p[0]) else (0 - s))
            p[0] = x

    return isr

class Encoder:
    def __init__(self, sm_no, base_pin, scale=1):
        self.scale = scale
        self._pos = array("i", (0,))  # [pos]
        self.sm = rp2.StateMachine(sm_no, self.pio_quadrature, in_base=base_pin)
        self.sm.irq(make_isr(self._pos))  # Instantiate the closure
        self.sm.exec("set(y, 99)")  # Initialise y: guarantee different to the input
        self.sm.active(1)

    @rp2.asm_pio()
    def pio_quadrature(in_init=rp2.PIO.IN_LOW):
        wrap_target()
        label("again")
        in_(pins, 2)
        mov(x, isr)
        jmp(x_not_y, "push_data")
        mov(isr, null)
        jmp("again")
        label("push_data")
        push()
        irq(block, rel(0))
        mov(y, x)
        wrap()

    def position(self, value=None):
        if value is not None:
            self._pos[0] = round(value / self.scale)
        return self._pos[0] * self.scale

    def value(self, value=None):
        if value is not None:
            self._pos[0] = value
        return self._pos[0]


e0 = Encoder(0, Pin(14), 1)
e1 = Encoder(1, Pin(12), 1) # Comment Edit: Fixed from Pin 15 to Pin 12
e2 = Encoder(2, Pin(0), 1)
e3 = Encoder(3, Pin(2), 1)
while True:
    # print("E0:{0}\tE1:{0}\tE2:{0}\tE3:{0}\t".format(e0.position(), e1.position(), e2.position(), e3.position()))
    print("E0:{0}\tE1:{1}\tE2:{2}\tE3:{3}\t".format(e0.position(), e1.position(), e2.position(), e3.position()))
    time.sleep(0.01)

This works correctly. Apologies for unnecessary comment.

Wind-stormger pushed a commit to BPI-STEAM/micropython that referenced this pull request Sep 15, 2022
Fix with PWM for brightness slowing devices down
@barbiani
Copy link

Anyone else thought about a count reset pin?

@IhorNehrutsa
Copy link
Contributor

@barbiani

from machine import Pin
_rst = Pin(17, mode=Pin.IN, pull=Pin.PULL_UP)
_rst.irq(lambda x:cnt.value(0), trigger=Pin.IRQ_FALLING)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants