diff --git a/adafruit_bitbangio.py b/adafruit_bitbangio.py index 95c5d38..10bf6f5 100644 --- a/adafruit_bitbangio.py +++ b/adafruit_bitbangio.py @@ -190,13 +190,17 @@ def _sda_low(self) -> None: self._sda.switch_to_output(value=False) def _scl_release(self) -> None: - """Release and let the pullups lift""" - # Use self._timeout to add clock stretching + """Release and wait for the pullups to lift.""" self._scl.switch_to_input() + # Wait at most self._timeout seconds for any clock stretching. + end = monotonic() + self._timeout + while not self._scl.value and end > monotonic(): + pass + if not self._scl.value: + raise RuntimeError("Bus timed out.") def _sda_release(self) -> None: """Release and let the pullups lift""" - # Use self._timeout to add clock stretching self._sda.switch_to_input() def _start(self) -> None: @@ -288,7 +292,8 @@ def _write(self, address: int, buffer: ReadableBuffer, transmit_stop: bool) -> N # raise RuntimeError("Device not responding at 0x{:02X}".format(address)) raise RuntimeError(f"Device not responding at 0x{address:02X}") for byte in buffer: - self._write_byte(byte) + if not self._write_byte(byte): + raise RuntimeError(f"Device not responding at 0x{address:02X}") if transmit_stop: self._stop() @@ -323,9 +328,6 @@ def __init__( self._mosi = None self._miso = None - self.configure() - self.unlock() - # Set pins as outputs/inputs. self._sclk = DigitalInOut(clock) self._sclk.switch_to_output() @@ -338,6 +340,9 @@ def __init__( self._miso = DigitalInOut(MISO) self._miso.switch_to_input() + self.configure() + self.unlock() + def deinit(self) -> None: """Free any hardware used by the object.""" self._sclk.deinit() @@ -372,12 +377,30 @@ def configure( self._bits = bits self._half_period = (1 / self._baudrate) / 2 # 50% Duty Cyle delay + # Initialize the clock to the idle state. This is important to + # guarantee that the clock is at a known (idle) state before + # any read/write operations. + self._sclk.value = self._polarity + def _wait(self, start: Optional[int] = None) -> float: """Wait for up to one half cycle""" while (start + self._half_period) > monotonic(): pass return monotonic() # Return current time + def _should_write(self, to_active: Literal[0, 1]) -> bool: + """Return true if a bit should be written on the given clock transition.""" + # phase 0: write when active is 0 + # phase 1: write when active is 1 + return self._phase == to_active + + def _should_read(self, to_active: Literal[0, 1]) -> bool: + """Return true if a bit should be read on the given clock transition.""" + # phase 0: read when active is 1 + # phase 1: read when active is 0 + # Data is read on the idle->active transition only when the phase is 1 + return self._phase == 1 - to_active + def write( self, buffer: ReadableBuffer, start: int = 0, end: Optional[int] = None ) -> None: @@ -392,24 +415,26 @@ def write( if self._check_lock(): start_time = monotonic() + # Note: when we come here, our clock must always be its idle state. for byte in buffer[start:end]: for bit_position in range(self._bits): bit_value = byte & 0x80 >> bit_position - # Set clock to base - if not self._phase: # Mode 0, 2 + # clock: idle, or has made an active->idle transition. + if self._should_write(to_active=0): self._mosi.value = bit_value - self._sclk.value = not self._polarity + # clock: wait in idle for half a period start_time = self._wait(start_time) - - # Flip clock off base - if self._phase: # Mode 1, 3 + # clock: idle->active + self._sclk.value = not self._polarity + if self._should_write(to_active=1): self._mosi.value = bit_value - self._sclk.value = self._polarity + # clock: wait in active for half a period start_time = self._wait(start_time) - - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + # clock: active->idle + self._sclk.value = self._polarity + # clock: stay in idle for the last active->idle transition + # to settle. + start_time = self._wait(start_time) # pylint: disable=too-many-branches def readinto( @@ -433,26 +458,29 @@ def readinto( for bit_position in range(self._bits): bit_mask = 0x80 >> bit_position bit_value = write_value & 0x80 >> bit_position - # Return clock to base - self._sclk.value = self._polarity - start_time = self._wait(start_time) - # Handle read on leading edge of clock. - if not self._phase: # Mode 0, 2 + # clock: idle, or has made an active->idle transition. + if self._should_write(to_active=0): if self._mosi is not None: self._mosi.value = bit_value + # clock: wait half a period. + start_time = self._wait(start_time) + # clock: idle->active + self._sclk.value = not self._polarity + if self._should_read(to_active=1): if self._miso.value: # Set bit to 1 at appropriate location. buffer[byte_position] |= bit_mask else: # Set bit to 0 at appropriate location. buffer[byte_position] &= ~bit_mask - # Flip clock off base - self._sclk.value = not self._polarity - start_time = self._wait(start_time) - # Handle read on trailing edge of clock. - if self._phase: # Mode 1, 3 + if self._should_write(to_active=1): if self._mosi is not None: self._mosi.value = bit_value + # clock: wait half a period + start_time = self._wait(start_time) + # Clock: active->idle + self._sclk.value = self._polarity + if self._should_read(to_active=0): if self._miso.value: # Set bit to 1 at appropriate location. buffer[byte_position] |= bit_mask @@ -460,9 +488,8 @@ def readinto( # Set bit to 0 at appropriate location. buffer[byte_position] &= ~bit_mask - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + # clock: wait another half period for the last transition. + start_time = self._wait(start_time) def write_readinto( self, @@ -499,34 +526,34 @@ def write_readinto( buffer_out[byte_position + out_start] & 0x80 >> bit_position ) in_byte_position = byte_position + in_start - # Return clock to 0 - self._sclk.value = self._polarity - start_time = self._wait(start_time) - # Handle read on leading edge of clock. - if not self._phase: # Mode 0, 2 + # clock: idle, or has made an active->idle transition. + if self._should_write(to_active=0): self._mosi.value = bit_value + # clock: wait half a period. + start_time = self._wait(start_time) + # clock: idle->active + self._sclk.value = not self._polarity + if self._should_read(to_active=1): if self._miso.value: # Set bit to 1 at appropriate location. buffer_in[in_byte_position] |= bit_mask else: - # Set bit to 0 at appropriate location. buffer_in[in_byte_position] &= ~bit_mask - # Flip clock off base - self._sclk.value = not self._polarity - start_time = self._wait(start_time) - # Handle read on trailing edge of clock. - if self._phase: # Mode 1, 3 + if self._should_write(to_active=1): self._mosi.value = bit_value + # clock: wait half a period + start_time = self._wait(start_time) + # Clock: active->idle + self._sclk.value = self._polarity + if self._should_read(to_active=0): if self._miso.value: # Set bit to 1 at appropriate location. buffer_in[in_byte_position] |= bit_mask else: - # Set bit to 0 at appropriate location. buffer_in[in_byte_position] &= ~bit_mask - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + # clock: wait another half period for the last transition. + start_time = self._wait(start_time) # pylint: enable=too-many-branches diff --git a/tests/README.rst b/tests/README.rst new file mode 100644 index 0000000..aa2311e --- /dev/null +++ b/tests/README.rst @@ -0,0 +1,53 @@ +.. + SPDX-FileCopyrightText: KB Sriram + SPDX-License-Identifier: MIT +.. + +Bitbangio Tests +=============== + +These tests run under CPython, and are intended to verify that the +library passes some sanity checks, using a lightweight simulator as +the target device. + +These tests run automatically from the standard `circuitpython github +workflow `_. To run them manually, first install these packages +if necessary:: + + $ pip3 install pytest + +Then ensure you're in the *root* directory of the repository and run +the following command:: + + $ python -m pytest + +Notes on the simulator +====================== + +`simulator.py` implements a small logic level simulator and a few test +doubles so the library can run under CPython. + +The `Engine` class is used as a singleton in the module to co-ordinate +the simulation. + +A `Net` holds a list of `FakePins` that are connected together. It +also resolves the overall logic level of the net when a `FakePin` is +updated. It can optionally hold a history of logic level changes, +which may be useful for testing some timing expectations, or export +them as a VCD file for `Pulseview `_. Test code can also register +listeners on a `Net` when the net's level changes, so it can simulate +device behavior. + +A `FakePin` is a test double for the CircuitPython `Pin` class, and +implements all the functionality so it behaves appropriately in +CPython. + +A simulated device can create a `FakePin` for each of its terminals, +and connect them to one or more `Net` instances. It can listen for +level changes on the `Net`, and bitbang the `FakePin` to simulate +behavior. `simulated_spi_device.py` implements a peripheral device +that writes a constant value onto an SPI bus. + + +.. _wf: https://github.com/adafruit/workflows-circuitpython-libs/blob/6e1562eaabced4db1bd91173b698b1cc1dfd35ab/build/action.yml#L78-L84 +.. _pv: https://sigrok.org/wiki/PulseView diff --git a/tests/simulated_i2c.py b/tests/simulated_i2c.py new file mode 100644 index 0000000..c31a371 --- /dev/null +++ b/tests/simulated_i2c.py @@ -0,0 +1,209 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT +"""Implementation of testable I2C devices.""" + +from typing import Any, Callable, Optional, Union +import dataclasses +import enum +import signal +import types +from typing_extensions import TypeAlias +import simulator as sim + +_SignalHandler: TypeAlias = Union[ + Callable[[int, Optional[types.FrameType]], Any], int, None +] + + +@enum.unique +class State(enum.Enum): + IDLE = "idle" + ADDRESS = "address" + ACK = "ack" + ACK_DONE = "ack_done" + WAIT_ACK = "wait_ack" + READ = "read" + WRITE = "write" + + +@dataclasses.dataclass(frozen=True) +class I2CBus: + scl: sim.Net + sda: sim.Net + + +def _switch_to_output(pin: sim.FakePin, value: bool) -> None: + pin.mode = sim.Mode.OUT + pin.value(1 if value else 0) + + +def _switch_to_input(pin: sim.FakePin) -> None: + pin.init(mode=sim.Mode.IN) + pin.level = sim.Level.HIGH + + +class Constant: + """I2C device that sinks all data and can send a constant.""" + + # pylint:disable=too-many-instance-attributes + # pylint:disable=too-many-arguments + def __init__( + self, + name: str, + address: int, + bus: I2CBus, + ack_data: bool = True, + clock_stretch_sec: int = 0, + data_to_send: int = 0, + ) -> None: + self._address = address + self._scl = sim.FakePin(f"{name}_scl_pin", bus.scl) + self._sda = sim.FakePin(f"{name}_sda_pin", bus.sda) + self._last_scl_level = bus.scl.level + self._ack_data = ack_data + self._clock_stretch_sec = clock_stretch_sec + self._prev_signal: _SignalHandler = None + self._state = State.IDLE + self._bit_count = 0 + self._received = 0 + self._all_received = bytearray() + self._send_data = data_to_send + self._sent_bit_count = 0 + self._in_write = 0 + + bus.scl.on_level_change(self._on_level_change) + bus.sda.on_level_change(self._on_level_change) + + def _move_state(self, nstate: State) -> None: + self._state = nstate + + def _on_start(self) -> None: + # This resets our state machine unconditionally and + # starts waiting for an address. + self._bit_count = 0 + self._received = 0 + self._move_state(State.ADDRESS) + + def _on_stop(self) -> None: + # Reset and start idling. + self._reset() + + def _reset(self) -> None: + self._bit_count = 0 + self._received = 0 + self._move_state(State.IDLE) + + def _clock_release( + self, ignored_signum: int, ignored_frame: Optional[types.FrameType] = None + ) -> None: + # First release the scl line + _switch_to_input(self._scl) + # Remove alarms + signal.alarm(0) + # Restore any existing signal. + if self._prev_signal: + signal.signal(signal.SIGALRM, self._prev_signal) + self._prev_signal = None + + def _maybe_clock_stretch(self) -> None: + if not self._clock_stretch_sec: + return + if self._state == State.IDLE: + return + # pull the clock line low + _switch_to_output(self._scl, value=False) + # Set an alarm to release the line after some time. + self._prev_signal = signal.signal(signal.SIGALRM, self._clock_release) + signal.alarm(self._clock_stretch_sec) + + def _on_byte_read(self) -> None: + self._all_received.append(self._received) + + def _on_clock_fall(self) -> None: + self._maybe_clock_stretch() + + # Return early unless we need to send data. + if self._state not in (State.ACK, State.ACK_DONE, State.WRITE): + return + + if self._state == State.ACK: + # pull down the data line to start the ack. We want to hold + # it down until the next clock falling edge. + if self._ack_data or not self._all_received: + _switch_to_output(self._sda, value=False) + self._move_state(State.ACK_DONE) + return + if self._state == State.ACK_DONE: + # The data line has been held between one pair of falling edges - we can + # let go now if we need to start reading. + if self._in_write: + # Note: this will also write out the first bit later in this method. + self._move_state(State.WRITE) + else: + _switch_to_input(self._sda) + self._move_state(State.READ) + + if self._state == State.WRITE: + if self._sent_bit_count == 8: + _switch_to_input(self._sda) + self._sent_bit_count = 0 + self._move_state(State.WAIT_ACK) + else: + bit_value = (self._send_data >> (7 - self._sent_bit_count)) & 0x1 + _switch_to_output(self._sda, value=bit_value == 1) + self._sent_bit_count += 1 + + def _on_clock_rise(self) -> None: + if self._state not in (State.ADDRESS, State.READ, State.WAIT_ACK): + return + bit_value = 1 if self._sda.net.level == sim.Level.HIGH else 0 + if self._state == State.WAIT_ACK: + if bit_value: + # NACK, just reset. + self._move_state(State.IDLE) + else: + # ACK, continue writing. + self._move_state(State.ACK_DONE) + return + self._received = (self._received << 1) | bit_value + self._bit_count += 1 + if self._bit_count < 8: + return + + # We've read 8 bits of either address or data sent to us. + if self._state == State.ADDRESS and self._address != (self._received >> 1): + # This message isn't for us, reset and start idling. + self._reset() + return + # This message is for us, ack it. + if self._state == State.ADDRESS: + self._in_write = self._received & 0x1 + elif self._state == State.READ: + self._on_byte_read() + self._bit_count = 0 + self._received = 0 + self._move_state(State.ACK) + + def _on_level_change(self, net: sim.Net) -> None: + # Handle start/stop events directly. + if net == self._sda.net and self._scl.net.level == sim.Level.HIGH: + if net.level == sim.Level.LOW: + # sda hi->low with scl high + self._on_start() + else: + # sda low->hi with scl high + self._on_stop() + return + + # Everything else can be handled as state changes that occur + # either on the clock rising or falling edge. + if net == self._scl.net: + if net.level == sim.Level.HIGH: + # scl low->high + self._on_clock_rise() + else: + # scl high->low + self._on_clock_fall() + + def all_received_data(self) -> bytearray: + return self._all_received diff --git a/tests/simulated_spi.py b/tests/simulated_spi.py new file mode 100644 index 0000000..8e8ad0b --- /dev/null +++ b/tests/simulated_spi.py @@ -0,0 +1,67 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT +"""Implementation of testable SPI devices.""" + +import dataclasses +import simulator as sim + + +@dataclasses.dataclass(frozen=True) +class SpiBus: + enable: sim.Net + clock: sim.Net + copi: sim.Net + cipo: sim.Net + + +class Constant: + """Device that always writes a constant.""" + + def __init__(self, data: bytearray, bus: SpiBus, polarity: int, phase: int) -> None: + # convert to binary string array of bits for convenience + datalen = 8 * len(data) + self._data = f"{int.from_bytes(data, 'big'):0{datalen}b}" + self._bit_position = 0 + self._clock = sim.FakePin("const_clock_pin", bus.clock) + self._last_clock_level = bus.clock.level + self._cipo = sim.FakePin("const_cipo_pin", bus.cipo) + self._enable = sim.FakePin("const_enable_pin", bus.enable) + self._cipo.init(sim.Mode.OUT) + self._phase = phase + self._polarity = sim.Level.HIGH if polarity else sim.Level.LOW + self._enabled = False + bus.clock.on_level_change(self._on_level_change) + bus.enable.on_level_change(self._on_level_change) + + def write_bit(self) -> None: + """Writes the next bit to the cipo net.""" + if self._bit_position >= len(self._data): + # Just write a zero + self._cipo.value(0) # pylint: disable=not-callable + return + self._cipo.value( + int(self._data[self._bit_position]) # pylint: disable=not-callable + ) + self._bit_position += 1 + + def _on_level_change(self, net: sim.Net) -> None: + if net == self._enable.net: + # Assumes enable is active high. + self._enabled = net.level == sim.Level.HIGH + if self._enabled: + self._bit_position = 0 + if self._phase == 0: + # Write on enable or idle->active + self.write_bit() + return + if not self._enabled: + return + if net != self._clock.net: + return + cur_clock_level = net.level + if cur_clock_level == self._last_clock_level: + return + active = 0 if cur_clock_level == self._polarity else 1 + if self._phase == active: + self.write_bit() + self._last_clock_level = cur_clock_level diff --git a/tests/simulator.py b/tests/simulator.py new file mode 100644 index 0000000..2837a3a --- /dev/null +++ b/tests/simulator.py @@ -0,0 +1,253 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT +"""Simple logic level simulator to test I2C/SPI interactions.""" + +from typing import Any, Callable, List, Literal, Optional, Sequence +import dataclasses +import enum +import functools +import time + +import digitalio + + +@enum.unique +class Mode(enum.Enum): + IN = "IN" + OUT = "OUT" + + +@enum.unique +class Level(enum.Enum): + Z = "Z" + LOW = "LOW" + HIGH = "HIGH" + + +@enum.unique +class Pull(enum.Enum): + NONE = "NONE" + UP = "UP" + DOWN = "DOWN" + + +def _level_to_vcd(level: Level) -> str: + """Converts a level to a VCD understandable mnemonic.""" + if level == Level.Z: + return "Z" + if level == Level.HIGH: + return "1" + return "0" + + +@dataclasses.dataclass(frozen=True) +class Change: + """Container to record simulation events.""" + + net_name: str + time_us: int + level: Level + + +class Engine: + """Manages the overall simulation state and clock.""" + + def __init__(self) -> None: + self._start_us = int(time.monotonic() * 1e6) + self._nets: List["Net"] = [] + + def reset(self) -> None: + """Clears out all existing state and resets the simulation.""" + self._start_us = int(time.monotonic() * 1e6) + self._nets = [] + + def _find_net_by_pin_id(self, pin_id: str) -> Optional["Net"]: + """Returns a net (if any) that has a pin with the given id.""" + for net in self._nets: + if net.contains_pin_id(pin_id): + return net + return None + + def create_net( + self, net_id: str, default_level: Level = Level.Z, monitor: bool = False + ) -> "Net": + """Creates a new net with the given name. Monitored nets are also traced.""" + net = Net(net_id, default_level=default_level, monitor=monitor) + self._nets.append(net) + return net + + def change_history(self) -> Sequence[Change]: + """Returns an ordered history of all events in monitored nets.""" + monitored_nets = [net for net in self._nets if net.history] + combined: List[Change] = [] + for net in monitored_nets: + if net.history: + for time_us, level in net.history: + combined.append(Change(net.name, time_us, level)) + combined.sort(key=lambda v: v.time_us) + return combined + + def write_vcd(self, path: str) -> None: + """Writes monitored nets to the provided path as a VCD file.""" + with open(path, "wt") as vcdfile: + vcdfile.write("$version pytest output $end\n") + vcdfile.write("$timescale 1 us $end\n") + vcdfile.write("$scope module top $end\n") + monitored_nets = [net for net in self._nets if net.history] + for net in monitored_nets: + vcdfile.write(f"$var wire 1 {net.name} {net.name} $end\n") + vcdfile.write("$upscope $end\n") + vcdfile.write("$enddefinitions $end\n") + combined = self.change_history() + # History starts when the engine is first reset or initialized. + vcdfile.write(f"#{self._start_us}\n") + last_us = self._start_us + for change in combined: + if change.time_us != last_us: + vcdfile.write(f"#{change.time_us}\n") + last_us = change.time_us + vcdfile.write(f"{_level_to_vcd(change.level)}{change.net_name}\n") + + +# module global/singleton +engine = Engine() + + +class FakePin: + """Test double for a microcontroller pin used in tests.""" + + IN = Mode.IN # pylint: disable=invalid-name + OUT = Mode.OUT + PULL_NONE = Pull.NONE + PULL_UP = Pull.UP + PULL_DOWN = Pull.DOWN + + def __init__(self, pin_id: str, net: Optional["Net"] = None): + self.id = pin_id # pylint: disable=invalid-name + self.mode: Optional[Mode] = None + self.pull: Optional[Pull] = None + self.level: Level = Level.Z + if net: + # Created directly by the test. + if engine._find_net_by_pin_id(pin_id): + raise ValueError(f"{pin_id} has already been created.") + self.net = net + else: + # Created by the library by duplicating an existing id. + net = engine._find_net_by_pin_id(pin_id) + if not net: + raise ValueError(f"Unexpected pin without a net: {pin_id}") + self.net = net + self.id = f"{self.id}_dup" + self.net.add_pin(self) + + def init(self, mode: Mode = Mode.IN, pull: Optional[Pull] = None) -> None: + if mode != self.mode or pull != self.pull: + self.mode = mode + self.pull = pull + self.net.update() + + def value(self, val: Optional[Literal[0, 1]] = None) -> Optional[Literal[0, 1]]: + """Set or return the pin Value""" + if val is None: + if self.mode != Mode.IN: + raise ValueError(f"{self.id}: is not an input") + level = self.net.level + if level is None: + # Nothing is actively driving the line - we assume that during + # testing, this is an error either in the test setup, or + # something is asking for a value in an uninitialized state. + raise ValueError( + f"{self.id}: value read but nothing is driving the net." + ) + return 1 if level == Level.HIGH else 0 + if val in (0, 1): + if self.mode != Mode.OUT: + raise ValueError(f"{self.id}: is not an output") + nlevel = Level.HIGH if val else Level.LOW + if nlevel != self.level: + self.level = nlevel + self.net.update() + return None + raise RuntimeError(f"{self.id}: Invalid value {val} set on pin.") + + +class Net: + """A set of pins connected to each other.""" + + def __init__( + self, + name: str, + default_level: Level = Level.Z, + monitor: bool = False, + ) -> None: + self.name = name + self._pins: List[FakePin] = [] + self._default_level = default_level + self.level = default_level + self._triggers: List[Callable[["Net"], None]] = [] + self.history = [(engine._start_us, default_level)] if monitor else None + + def update(self) -> None: + """Resolves the state of this net based on all pins connected to it.""" + result = Level.Z + # Try to resolve the state of this net by looking at the pin levels + # for all output pins. + for pin in self._pins: + if pin.mode != Mode.OUT: + continue + if pin.level == result: + continue + if result == Level.Z: + # This pin is now driving the net. + result = pin.level + continue + # There are conflicting pins! + raise ValueError( + f"Conflicting pins on {self.name}: " + f"{pin.id} is {pin.level}, " + f" but net was already at {result}" + ) + # Finally, use any default net state if one was provided. (e.g. a pull-up net.) + result = self._default_level if result == Level.Z else result + + if result != self.level: + # Also record a state change if we're being monitored. + if self.history: + event_us = int(time.monotonic() * 1e6) + self.history.append((event_us, result)) + self.level = result + for trigger in self._triggers: + trigger(self) + + def add_pin(self, pin: FakePin) -> None: + self._pins.append(pin) + + def on_level_change(self, trigger: Callable[["Net"], None]) -> None: + """Calls the trigger whenever the net's level changes.""" + self._triggers.append(trigger) + + def contains_pin_id(self, pin_id: str) -> bool: + """Returns True if the net has a pin with the given id.""" + for pin in self._pins: + if pin.id == pin_id: + return True + return False + + +def stub(method: Callable) -> Callable: + """Decorator to safely insert and remove doubles within tests.""" + + @functools.wraps(method) + def wrapper(*args: Any, **kwds: Any) -> Any: + # First save any objects we're going to replace with a double. + pin_module = digitalio.Pin if hasattr(digitalio, "Pin") else None + try: + digitalio.Pin = FakePin + return method(*args, **kwds) + finally: + # Replace the saved objects after the test runs. + if pin_module: + digitalio.Pin = pin_module + + return wrapper diff --git a/tests/test_adafruit_bitbangio_i2c.py b/tests/test_adafruit_bitbangio_i2c.py new file mode 100644 index 0000000..2d8e5df --- /dev/null +++ b/tests/test_adafruit_bitbangio_i2c.py @@ -0,0 +1,188 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT + +from typing import Sequence +import pytest +import simulated_i2c as si2c +import simulator as sim +import adafruit_bitbangio + + +_SCL_NET = "scl" +_SDA_NET = "sda" + + +class TestBitbangI2C: + def setup_method(self) -> None: + sim.engine.reset() + # Create nets, with a pullup by default. + scl = sim.engine.create_net( + _SCL_NET, monitor=True, default_level=sim.Level.HIGH + ) + sda = sim.engine.create_net( + _SDA_NET, monitor=True, default_level=sim.Level.HIGH + ) + # pylint: disable=attribute-defined-outside-init + self.scl_pin = sim.FakePin("scl_pin", scl) + self.sda_pin = sim.FakePin("sda_pin", sda) + self.i2cbus = si2c.I2CBus(scl=scl, sda=sda) + # pylint: enable=attribute-defined-outside-init + + @sim.stub + @pytest.mark.parametrize("addresses", [[0x42, 0x43]]) + def test_scan(self, addresses: Sequence[int]) -> None: + # Create a set of data sinks, one for each address. + for address in addresses: + si2c.Constant(hex(address), address=address, bus=self.i2cbus) + + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000, timeout=1 + ) as i2c: + i2c.try_lock() + scanned = i2c.scan() + i2c.unlock() + + assert addresses == scanned + + @sim.stub + @pytest.mark.parametrize( + "data", ["11000011", "00111100", "1010101001010101", "1010101111010100"] + ) + def test_write( + self, + data: str, + ) -> None: + datalen = len(data) // 8 + data_array = bytearray(int(data, 2).to_bytes(datalen, byteorder="big")) + + # attach a device that records whatever we send to it. + device = si2c.Constant("target", address=0x42, bus=self.i2cbus) + + # Write data over the bus and verify the device received it. + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000 + ) as i2c: + i2c.try_lock() + i2c.writeto(address=0x42, buffer=data_array) + i2c.unlock() + + # Useful to debug signals in pulseview. + # sim.engine.write_vcd(f"/tmp/test_{data}.vcd") + assert data_array == device.all_received_data() + + @sim.stub + def test_write_no_ack(self) -> None: + # attach a device that will ack the address, but not the data. + si2c.Constant("target", address=0x42, bus=self.i2cbus, ack_data=False) + + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000 + ) as i2c: + i2c.try_lock() + with pytest.raises(RuntimeError) as info: + i2c.writeto(address=0x42, buffer=b"\x42") + i2c.unlock() + + assert "not responding" in str(info.value) + + @sim.stub + @pytest.mark.parametrize("data", ["11000011", "00111100"]) + def test_write_clock_stretching(self, data: str) -> None: + datalen = len(data) // 8 + data_array = bytearray(int(data, 2).to_bytes(datalen, byteorder="big")) + + # attach a device that does clock stretching, but not exceed our timeout. + device = si2c.Constant( + "target", address=0x42, bus=self.i2cbus, clock_stretch_sec=1 + ) + + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000, timeout=2.0 + ) as i2c: + i2c.try_lock() + i2c.writeto(address=0x42, buffer=data_array) + i2c.unlock() + + assert data_array == device.all_received_data() + + @sim.stub + def test_write_clock_timeout(self) -> None: + # attach a device that does clock stretching, but exceeds our timeout. + si2c.Constant("target", address=0x42, bus=self.i2cbus, clock_stretch_sec=3) + + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000, timeout=1 + ) as i2c: + i2c.try_lock() + with pytest.raises(RuntimeError) as info: + i2c.writeto(address=0x42, buffer=b"\x42") + i2c.unlock() + + assert "timed out" in str(info.value) + + @sim.stub + @pytest.mark.parametrize("count", [1, 2, 5]) + @pytest.mark.parametrize("data", ["11000011", "00111100", "10101010", "01010101"]) + def test_readfrom(self, count: int, data: str) -> None: + value = int(data, 2) + expected_array = bytearray([value] * count) + data_array = bytearray(count) + + # attach a device that sends a constant byte of data. + si2c.Constant("target", address=0x42, bus=self.i2cbus, data_to_send=value) + + # Confirm we were able to read back the data + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000 + ) as i2c: + i2c.try_lock() + i2c.readfrom_into(address=0x42, buffer=data_array) + i2c.unlock() + + # Useful to debug signals in pulseview. + # sim.engine.write_vcd(f"/tmp/test_{count}_{data}.vcd") + assert data_array == expected_array + + @sim.stub + @pytest.mark.parametrize( + "send_data", + [ + "11000011", + "00111100", + "10101010", + "0101010", + ], + ) + @pytest.mark.parametrize( + "expect_data", + [ + "11000011", + "00111100", + "10101010", + "01010101", + ], + ) + def test_writeto_readfrom(self, send_data: str, expect_data: str) -> None: + send_array = bytearray(int(send_data, 2).to_bytes(1, byteorder="big")) + expect_value = int(expect_data, 2) + data_array = bytearray(1) + + # attach a device that sends a constant byte of data. + device = si2c.Constant( + "target", address=0x42, bus=self.i2cbus, data_to_send=expect_value + ) + + # Send the send_data, and check we got back expect_data + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000 + ) as i2c: + i2c.try_lock() + i2c.writeto_then_readfrom( + address=0x42, buffer_out=send_array, buffer_in=data_array + ) + i2c.unlock() + + # Useful to debug signals in pulseview. + # sim.engine.write_vcd(f"/tmp/test_{send_data}_{expect_data}.vcd") + assert send_array == device.all_received_data() + assert data_array == bytearray([expect_value]) diff --git a/tests/test_adafruit_bitbangio_spi.py b/tests/test_adafruit_bitbangio_spi.py new file mode 100644 index 0000000..4da63f3 --- /dev/null +++ b/tests/test_adafruit_bitbangio_spi.py @@ -0,0 +1,211 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT + +from typing import Literal, Sequence +import pytest +import simulated_spi as sspi +import simulator as sim +import adafruit_bitbangio + + +_CLOCK_NET = "clock" +_COPI_NET = "copi" +_CIPO_NET = "cipo" +_ENABLE_NET = "enable" + + +def _check_bit( + data: bytearray, + bits_read: int, + last_copi_state: sim.Level, +) -> None: + """Checks that the copi state matches the bit we should be writing.""" + intdata = int.from_bytes(data, "big") + nbits = 8 * len(data) + expected_bit_value = (intdata >> (nbits - bits_read - 1)) & 0x1 + expected_level = sim.Level.HIGH if expected_bit_value else sim.Level.LOW + assert last_copi_state == expected_level + + +def _check_write( + data: bytearray, + change_history: Sequence[sim.Change], + polarity: Literal[0, 1], + phase: Literal[0, 1], + baud: int, +) -> None: + """Checks that the net level changes have a correct sequence of write events.""" + state = "disabled" + last_clock_state = sim.Level.Z + last_copi_state = sim.Level.Z + last_copi_us = 0 + idle, active = ( + (sim.Level.HIGH, sim.Level.LOW) if polarity else (sim.Level.LOW, sim.Level.HIGH) + ) + bits_read = 0 + # We want data to be written at least this long before a read + # transition. + quarter_period = 1e6 / baud / 4 + + for change in change_history: + if ( + state == "disabled" + and change.net_name == _ENABLE_NET + and change.level == sim.Level.HIGH + ): + # In this implementation, we should always start out with the + # clock in the idle state by the time the device is enabled. + assert last_clock_state == idle + bits_read = 0 + state = "wait_for_read" + elif state == "wait_for_read" and change.net_name == _CLOCK_NET: + # phase 0 reads on idle->active, and phase 1 reads on active->idle. + should_read = change.level == active if phase == 0 else change.level == idle + if should_read: + # Check we have the right data + _check_bit(data, bits_read, last_copi_state) + # Check the data was also set early enough. + assert change.time_us - last_copi_us > quarter_period + bits_read += 1 + if bits_read == 8: + return + # Track the last time we changed the clock and data values. + if change.net_name == _COPI_NET: + if last_copi_state != change.level: + last_copi_state = change.level + last_copi_us = change.time_us + elif change.net_name == _CLOCK_NET: + if last_clock_state != change.level: + last_clock_state = change.level + # If we came here, we haven't read enough bits. + pytest.fail("Only {bits_read} bits were read") + + +class TestBitbangSpi: + def setup_method(self) -> None: + sim.engine.reset() + clock = sim.engine.create_net(_CLOCK_NET, monitor=True) + copi = sim.engine.create_net(_COPI_NET, monitor=True) + cipo = sim.engine.create_net(_CIPO_NET, monitor=True) + enable = sim.engine.create_net(_ENABLE_NET, monitor=True) + # pylint: disable=attribute-defined-outside-init + self.clock_pin = sim.FakePin("clock_pin", clock) + self.copi_pin = sim.FakePin("copi_pin", copi) + self.cipo_pin = sim.FakePin("cipo_pin", cipo) + self.enable_pin = sim.FakePin("enable_pin", enable) + self.enable_pin.init(mode=sim.Mode.OUT) + self.spibus = sspi.SpiBus(clock=clock, copi=copi, cipo=cipo, enable=enable) + # pylint: enable=attribute-defined-outside-init + self._enable_net(0) + + def _enable_net(self, val: Literal[0, 1]) -> None: + self.enable_pin.value(val) # pylint: disable=not-callable + + @sim.stub + @pytest.mark.parametrize("baud", [100]) + @pytest.mark.parametrize("polarity", [0, 1]) + @pytest.mark.parametrize("phase", [0, 1]) + @pytest.mark.parametrize("data", ["10101010", "01010101", "01111110", "10000001"]) + def test_write( + self, baud: int, polarity: Literal[0, 1], phase: Literal[0, 1], data: str + ) -> None: + data_array = bytearray(int(data, 2).to_bytes(1, byteorder="big")) + # Send one byte of data into the void to verify write timing. + with adafruit_bitbangio.SPI(clock=self.clock_pin, MOSI=self.copi_pin) as spi: + spi.try_lock() + spi.configure(baudrate=baud, polarity=polarity, phase=phase, bits=8) + self._enable_net(1) + spi.write(data_array) + self._enable_net(0) + + # Monitored nets can be viewed in sigrock by dumping out a VCD file. + # sim.engine.write_vcd(f"/tmp/test_{polarity}_{phase}_{data}.vcd") + _check_write( + data_array, + sim.engine.change_history(), + polarity=polarity, + phase=phase, + baud=baud, + ) + + @sim.stub + @pytest.mark.parametrize("baud", [100]) + @pytest.mark.parametrize("polarity", [0, 1]) + @pytest.mark.parametrize("phase", [0, 1]) + @pytest.mark.parametrize("data", ["10101010", "01010101", "01111110", "10000001"]) + def test_readinto( + self, baud: int, polarity: Literal[0, 1], phase: Literal[0, 1], data: str + ) -> None: + data_int = int(data, 2) + data_array = bytearray(data_int.to_bytes(1, byteorder="big")) + # attach a device that sends a constant. + _ = sspi.Constant( + data=data_array, bus=self.spibus, polarity=polarity, phase=phase + ) + + # Read/write a byte of data + with adafruit_bitbangio.SPI( + clock=self.clock_pin, MOSI=self.copi_pin, MISO=self.cipo_pin + ) as spi: + spi.try_lock() + spi.configure(baudrate=baud, polarity=polarity, phase=phase, bits=8) + self._enable_net(1) + received_data = bytearray(1) + spi.readinto(received_data, write_value=data_int) + self._enable_net(0) + + # Monitored nets can be viewed in sigrock by dumping out a VCD file. + # sim.engine.write_vcd(f"/tmp/test_{polarity}_{phase}_{data}.vcd") + + # Check we read the constant correctly from our device. + assert data_array == received_data + # Check the timing on the data we wrote out. + _check_write( + data_array, + sim.engine.change_history(), + polarity=polarity, + phase=phase, + baud=baud, + ) + + @sim.stub + @pytest.mark.parametrize("baud", [100]) + @pytest.mark.parametrize("polarity", [0, 1]) + @pytest.mark.parametrize("phase", [0, 1]) + @pytest.mark.parametrize( + "data", ["10101010", "01010101", "01111110", "10000001", "1000010101111110"] + ) + def test_write_readinto( + self, baud: int, polarity: Literal[0, 1], phase: Literal[0, 1], data: str + ) -> None: + nbytes = len(data) // 8 + data_array = bytearray(int(data, 2).to_bytes(nbytes, byteorder="big")) + # attach a device that sends a constant. + _ = sspi.Constant( + data=data_array, bus=self.spibus, polarity=polarity, phase=phase + ) + + # Read/write data array + with adafruit_bitbangio.SPI( + clock=self.clock_pin, MOSI=self.copi_pin, MISO=self.cipo_pin + ) as spi: + spi.try_lock() + spi.configure(baudrate=baud, polarity=polarity, phase=phase, bits=8) + self._enable_net(1) + received_data = bytearray(nbytes) + spi.write_readinto(buffer_out=data_array, buffer_in=received_data) + self._enable_net(0) + + # Monitored nets can be viewed in sigrock by dumping out a VCD file. + # sim.engine.write_vcd(f"/tmp/test_{polarity}_{phase}_{data}.vcd") + + # Check we read the constant correctly from our device. + assert data_array == received_data + # Check the timing on the data we wrote out. + _check_write( + data_array, + sim.engine.change_history(), + polarity=polarity, + phase=phase, + baud=baud, + )