diff --git a/.flake8 b/.flake8
index 732c12e..8f58749 100644
--- a/.flake8
+++ b/.flake8
@@ -1,7 +1,7 @@
[flake8]
docstring_style=sphinx
max-line-length = 127
-ignore = D400, Q000, S311, PLW, PLC, PLR
+ignore = D400, Q000, S311, W503, PLW, PLC, PLR
per-file-ignores =
buildhat/__init__.py:F401
exclude = docs/conf.py, docs/sphinxcontrib/cmtinc-buildhat.py, docs/sphinx_selective_exclude/*.py
diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml
index 8e72fc3..57a67fb 100644
--- a/.github/workflows/python-app.yml
+++ b/.github/workflows/python-app.yml
@@ -16,10 +16,10 @@ jobs:
steps:
- uses: actions/checkout@v2
- - name: Set up Python 3.7
+ - name: Set up Python 3.9
uses: actions/setup-python@v2
with:
- python-version: 3.7
+ python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
@@ -34,5 +34,3 @@ jobs:
flake8 . --version
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --show-source --statistics
- # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
- flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
diff --git a/.gitignore b/.gitignore
index f6d7f91..7112796 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,6 +34,7 @@ share/python-wheels/
.installed.cfg
*.egg
MANIFEST
+.pypirc
# Installer logs
pip-log.txt
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..bb16389
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,17 @@
+repos:
+- repo: https://github.com/PyCQA/flake8
+ rev: 6.0.0
+ hooks:
+ - id: flake8
+ additional_dependencies: [
+ 'flake8-bugbear>=22.10.27',
+ 'flake8-comprehensions>=3.10',
+ 'flake8-debugger',
+ 'flake8-docstrings>=1.6.0',
+ 'flake8-isort>=5.0',
+ 'flake8-pylint',
+ 'flake8-rst-docstrings',
+ 'flake8-string-format',
+ 'gpiozero',
+ 'pyserial'
+ ]
diff --git a/.readthedocs.yaml b/.readthedocs.yaml
index 3f5c924..121204f 100644
--- a/.readthedocs.yaml
+++ b/.readthedocs.yaml
@@ -6,10 +6,11 @@
version: 2
build:
- image: latest
+ os: ubuntu-22.04
+ tools:
+ python: "3.8"
python:
- version: 3.8
install:
- method: setuptools
path: .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 053a830..8bf1d81 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,77 @@
# Change Log
+## 0.9.0
+
+Fixes:
+
+* Resend 'version' if empty data received from the Build HAT
+
+## 0.8.0
+
+Adds:
+
+* Support for custom firmware (see: https://www.raspberrypi.com/news/build-hat-firmware-now-fully-open-source/)
+* Add W503 to Flake8 ignore
+* Increase interval for sensor while testing - avoids error that occurs when handling large amount of sensor data. In future, might be interesting to see if we can support faster UART baud rate.
+
+## 0.7.0
+
+Adds:
+
+* Mode 7 IR transmission to ColorDistanceSensor https://github.com/RaspberryPiFoundation/python-build-hat/pull/205
+* Debug log filename access https://github.com/RaspberryPiFoundation/python-build-hat/pull/204
+* Movement counter to WeDo 2.0 Motion Sensor https://github.com/RaspberryPiFoundation/python-build-hat/pull/201
+
+## 0.6.0
+
+### Added
+
+* Support for Raspberry Pi 5 (https://github.com/RaspberryPiFoundation/python-build-hat/pull/203)
+
+## 0.5.12
+
+### Added
+
+* Ability to set custom device interval for callbacks
+
+### Changed
+
+* New firmware with mitigation for motor disconnecting and selrate command
+
+### Removed
+
+n/a
+
+## 0.5.11
+
+### Added
+
+* Expose release property to allow motor to hold position
+* Updated documentation
+
+### Changed
+
+n/a
+
+### Removed
+
+n/a
+
+## 0.5.10
+
+### Added
+
+* Support for 88008 motor
+* get_distance support for ColourDistanceSensor
+
+### Changed
+
+* Use debug log level for logging
+
+### Removed
+
+n/a
+
## 0.5.9
### Added
diff --git a/LICENSE b/LICENSE
index 78d99a6..a43a2e8 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2020-2021 - Raspberry Pi Foundation
+Copyright (c) 2020-2025 - Raspberry Pi Foundation
Copyright (c) 2017-2021 - LEGO System A/S - Aastvej 1, 7190 Billund, DK
Permission is hereby granted, free of charge, to any person obtaining a copy
diff --git a/README.md b/README.md
index a20d9d5..0b5968a 100644
--- a/README.md
+++ b/README.md
@@ -20,7 +20,7 @@ Projects and inspiration: https://projects.raspberrypi.org/en/pathways/lego-intr
To install the Build HAT library, enter the following commands in a terminal:
- sudo pip3 install buildhat
+ pip3 install buildhat
## Usage
diff --git a/VERSION b/VERSION
index 416bfb0..ac39a10 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.5.9
+0.9.0
diff --git a/buildhat/color.py b/buildhat/color.py
index edf6030..5959263 100644
--- a/buildhat/color.py
+++ b/buildhat/color.py
@@ -210,4 +210,4 @@ def wait_for_new_color(self):
def on(self):
"""Turn on the sensor and LED"""
- self._write(f"port {self.port} ; plimit 1 ; set -1\r")
+ self.reverse()
diff --git a/buildhat/colordistance.py b/buildhat/colordistance.py
index 90f0f94..3c722a3 100644
--- a/buildhat/colordistance.py
+++ b/buildhat/colordistance.py
@@ -25,6 +25,9 @@ def __init__(self, port):
self.mode(6)
self.avg_reads = 4
self._old_color = None
+ self._ir_channel = 0x0
+ self._ir_address = 0x0
+ self._ir_toggle = 0x0
def segment_color(self, r, g, b):
"""Return the color name from HSV
@@ -115,6 +118,16 @@ def get_reflected_light(self):
readings.append(self.get()[0])
return int(sum(readings) / len(readings))
+ def get_distance(self):
+ """Return the distance
+
+ :return: Distance
+ :rtype: int
+ """
+ self.mode(1)
+ distance = self.get()[0]
+ return distance
+
def _clamp(self, val, small, large):
return max(small, min(val, large))
@@ -187,6 +200,337 @@ def wait_for_new_color(self):
self.callback(None)
return self._old_color
+ @property
+ def ir_channel(self):
+ """Get the IR channel for message transmission"""
+ return self._ir_channel
+
+ @ir_channel.setter
+ def ir_channel(self, channel=1):
+ """
+ Set the IR channel for RC Tx
+
+ :param channel: 1-4 indicating the selected IR channel on the reciever
+ """
+ check_chan = channel
+ if check_chan > 4:
+ check_chan = 4
+ elif check_chan < 1:
+ check_chan = 1
+ # Internally: 0-3
+ self._ir_channel = int(check_chan) - 1
+
+ @property
+ def ir_address(self):
+ """IR Address space of 0x0 for default PoweredUp or 0x1 for extra space"""
+ return self._ir_address
+
+ def toggle_ir_toggle(self):
+ """Toggle the IR toggle bit"""
+ # IYKYK, because the RC documents are not clear
+ if self._ir_toggle:
+ self._ir_toggle = 0x0
+ else:
+ self._ir_toggle = 0x1
+ return self._ir_toggle
+
+ def send_ir_sop(self, port, mode):
+ """
+ Send an IR message via Power Functions RC Protocol in Single Output PWM mode
+
+ PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm
+
+ Port B is blue
+
+ Valid values for mode are:
+ 0x0: Float output
+ 0x1: Forward/Clockwise at speed 1
+ 0x2: Forward/Clockwise at speed 2
+ 0x3: Forward/Clockwise at speed 3
+ 0x4: Forward/Clockwise at speed 4
+ 0x5: Forward/Clockwise at speed 5
+ 0x6: Forward/Clockwise at speed 6
+ 0x7: Forward/Clockwise at speed 7
+ 0x8: Brake (then float v1.20)
+ 0x9: Backwards/Counterclockwise at speed 7
+ 0xA: Backwards/Counterclockwise at speed 6
+ 0xB: Backwards/Counterclockwise at speed 5
+ 0xC: Backwards/Counterclockwise at speed 4
+ 0xD: Backwards/Counterclockwise at speed 3
+ 0xE: Backwards/Counterclockwise at speed 2
+ 0xF: Backwards/Counterclockwise at speed 1
+
+ :param port: 'A' or 'B'
+ :param mode: 0-15 indicating the port's mode to set
+ """
+ escape_modeselect = 0x0
+ escape = escape_modeselect
+
+ ir_mode_single_output = 0x4
+ ir_mode = ir_mode_single_output
+
+ so_mode_pwm = 0x0
+ so_mode = so_mode_pwm
+
+ output_port_a = 0x0
+ output_port_b = 0x1
+
+ output_port = None
+ if port == 'A' or port == 'a':
+ output_port = output_port_a
+ elif port == 'B' or port == 'b':
+ output_port = output_port_b
+ else:
+ return False
+
+ ir_mode = ir_mode | (so_mode << 1) | output_port
+
+ nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel
+ nibble2 = (self._ir_address << 3) | ir_mode
+
+ # Mode range checked here
+ return self._send_ir_nibbles(nibble1, nibble2, mode)
+
+ def send_ir_socstid(self, port, mode):
+ """
+ Send an IR message via Power Functions RC Protocol in Single Output Clear/Set/Toggle/Increment/Decrement mode
+
+ PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm
+
+ Valid values for mode are:
+ 0x0: Toggle full Clockwise/Forward (Stop to Clockwise, Clockwise to Stop, Counterclockwise to Clockwise)
+ 0x1: Toggle direction
+ 0x2: Increment numerical PWM
+ 0x3: Decrement numerical PWM
+ 0x4: Increment PWM
+ 0x5: Decrement PWM
+ 0x6: Full Clockwise/Forward
+ 0x7: Full Counterclockwise/Backward
+ 0x8: Toggle full (defaults to Forward, first)
+ 0x9: Clear C1 (C1 to High)
+ 0xA: Set C1 (C1 to Low)
+ 0xB: Toggle C1
+ 0xC: Clear C2 (C2 to High)
+ 0xD: Set C2 (C2 to Low)
+ 0xE: Toggle C2
+ 0xF: Toggle full Counterclockwise/Backward (Stop to Clockwise, Counterclockwise to Stop, Clockwise to Counterclockwise)
+
+ :param port: 'A' or 'B'
+ :param mode: 0-15 indicating the port's mode to set
+ """
+ escape_modeselect = 0x0
+ escape = escape_modeselect
+
+ ir_mode_single_output = 0x4
+ ir_mode = ir_mode_single_output
+
+ so_mode_cstid = 0x1
+ so_mode = so_mode_cstid
+
+ output_port_a = 0x0
+ output_port_b = 0x1
+
+ output_port = None
+ if port == 'A' or port == 'a':
+ output_port = output_port_a
+ elif port == 'B' or port == 'b':
+ output_port = output_port_b
+ else:
+ return False
+
+ ir_mode = ir_mode | (so_mode << 1) | output_port
+
+ nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel
+ nibble2 = (self._ir_address << 3) | ir_mode
+
+ # Mode range checked here
+ return self._send_ir_nibbles(nibble1, nibble2, mode)
+
+ def send_ir_combo_pwm(self, port_b_mode, port_a_mode):
+ """
+ Send an IR message via Power Functions RC Protocol in Combo PWM mode
+
+ PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm
+
+ Valid values for the modes are:
+ 0x0 Float
+ 0x1 PWM Forward step 1
+ 0x2 PWM Forward step 2
+ 0x3 PWM Forward step 3
+ 0x4 PWM Forward step 4
+ 0x5 PWM Forward step 5
+ 0x6 PWM Forward step 6
+ 0x7 PWM Forward step 7
+ 0x8 Brake (then float v1.20)
+ 0x9 PWM Backward step 7
+ 0xA PWM Backward step 6
+ 0xB PWM Backward step 5
+ 0xC PWM Backward step 4
+ 0xD PWM Backward step 3
+ 0xE PWM Backward step 2
+ 0xF PWM Backward step 1
+
+ :param port_b_mode: 0-15 indicating the command to send to port B
+ :param port_a_mode: 0-15 indicating the command to send to port A
+ """
+ escape_combo_pwm = 0x1
+ escape = escape_combo_pwm
+
+ nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel
+
+ # Port modes are range checked here
+ return self._send_ir_nibbles(nibble1, port_b_mode, port_a_mode)
+
+ def send_ir_combo_direct(self, port_b_output, port_a_output):
+ """
+ Send an IR message via Power Functions RC Protocol in Combo Direct mode
+
+ PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm
+
+ Valid values for the output variables are:
+ 0x0: Float output
+ 0x1: Clockwise/Forward
+ 0x2: Counterclockwise/Backwards
+ 0x3: Brake then float
+
+ :param port_b_output: 0-3 indicating the output to send to port B
+ :param port_a_output: 0-3 indicating the output to send to port A
+ """
+ escape_modeselect = 0x0
+ escape = escape_modeselect
+
+ ir_mode_combo_direct = 0x1
+ ir_mode = ir_mode_combo_direct
+
+ nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel
+ nibble2 = (self._ir_address << 3) | ir_mode
+
+ if port_b_output > 0x3 or port_a_output > 0x3:
+ return False
+ if port_b_output < 0x0 or port_a_output < 0x0:
+ return False
+
+ nibble3 = (port_b_output << 2) | port_a_output
+
+ return self._send_ir_nibbles(nibble1, nibble2, nibble3)
+
+ def send_ir_extended(self, mode):
+ """
+ Send an IR message via Power Functions RC Protocol in Extended mode
+
+ PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm
+
+ Valid values for the mode are:
+ 0x0: Brake Port A (timeout)
+ 0x1: Increment Speed on Port A
+ 0x2: Decrement Speed on Port A
+
+ 0x4: Toggle Forward/Clockwise/Float on Port B
+
+ 0x6: Toggle Address bit
+ 0x7: Align toggle bit
+
+ :param mode: 0-2,4,6-7
+ """
+ escape_modeselect = 0x0
+ escape = escape_modeselect
+
+ ir_mode_extended = 0x0
+ ir_mode = ir_mode_extended
+
+ nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel
+ nibble2 = (self._ir_address << 3) | ir_mode
+
+ if mode < 0x0 or mode == 0x3 or mode == 0x5 or mode > 0x7:
+ return False
+
+ return self._send_ir_nibbles(nibble1, nibble2, mode)
+
+ def send_ir_single_pin(self, port, pin, mode, timeout):
+ """
+ Send an IR message via Power Functions RC Protocol in Single Pin mode
+
+ PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm
+
+ Valid values for the mode are:
+ 0x0: No-op
+ 0x1: Clear
+ 0x2: Set
+ 0x3: Toggle
+
+ Note: The unlabeled IR receiver (vs the one labeled V2) has a "firmware bug in Single Pin mode"
+ https://www.philohome.com/pfrec/pfrec.htm
+
+ :param port: 'A' or 'B'
+ :param pin: 1 or 2
+ :param mode: 0-3 indicating the pin's mode to set
+ :param timeout: True or False
+ """
+ escape_mode = 0x0
+ escape = escape_mode
+
+ ir_mode_single_continuous = 0x2
+ ir_mode_single_timeout = 0x3
+ ir_mode = None
+ if timeout:
+ ir_mode = ir_mode_single_timeout
+ else:
+ ir_mode = ir_mode_single_continuous
+
+ output_port_a = 0x0
+ output_port_b = 0x1
+
+ output_port = None
+ if port == 'A' or port == 'a':
+ output_port = output_port_a
+ elif port == 'B' or port == 'b':
+ output_port = output_port_b
+ else:
+ return False
+
+ if pin != 1 and pin != 2:
+ return False
+ pin_value = pin - 1
+
+ if mode > 0x3 or mode < 0x0:
+ return False
+
+ nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel
+ nibble2 = (self._ir_address << 3) | ir_mode
+ nibble3 = (output_port << 3) | (pin_value << 3) | mode
+
+ return self._send_ir_nibbles(nibble1, nibble2, nibble3)
+
+ def _send_ir_nibbles(self, nibble1, nibble2, nibble3):
+
+ # M7 IR Tx SI = N/A
+ # format count=1 type=1 chars=5 dp=0
+ # RAW: 00000000 0000FFFF PCT: 00000000 00000064 SI: 00000000 0000FFFF
+
+ mode = 7
+ self.mode(mode)
+
+ # The upper bits of data[2] are ignored
+ if nibble1 > 0xF or nibble2 > 0xF or nibble3 > 0xF:
+ return False
+ if nibble1 < 0x0 or nibble2 < 0x0 or nibble3 < 0x0:
+ return False
+
+ byte_two = (nibble2 << 4) | nibble3
+
+ data = bytearray(3)
+ data[0] = (0xc << 4) | mode
+ data[1] = byte_two
+ data[2] = nibble1
+
+ # print(" ".join('{:04b}'.format(nibble1)))
+ # print(" ".join('{:04b}'.format(nibble2)))
+ # print(" ".join('{:04b}'.format(nibble3)))
+ # print(" ".join('{:08b}'.format(n) for n in data))
+
+ self._write1(data)
+ return True
+
def on(self):
"""Turn on the sensor and LED"""
- self._write(f"port {self.port} ; plimit 1 ; set -1\r")
+ self.reverse()
diff --git a/buildhat/data/firmware.bin b/buildhat/data/firmware.bin
index d29a920..f6dcde3 100644
Binary files a/buildhat/data/firmware.bin and b/buildhat/data/firmware.bin differ
diff --git a/buildhat/data/signature.bin b/buildhat/data/signature.bin
index 72ba9a3..8f495fa 100644
--- a/buildhat/data/signature.bin
+++ b/buildhat/data/signature.bin
@@ -1,2 +1 @@
-ñcF½ŒÌ Ðm™ÛÄã°¶vt•?‚#‹îYÁFxQ
-é¶H“|#9ÔJÒ‘49ÚÈÁ‚MàG˜$dIÓ›
\ No newline at end of file
+ôãNÔ.2šwSm¢Q/I¦Ómï
)io\~¦ŸI™Az4’ĘɅñPßã¼wê>\w
¡˜LòŸV™
\ No newline at end of file
diff --git a/buildhat/data/version b/buildhat/data/version
index f5df97f..1f3109b 100644
--- a/buildhat/data/version
+++ b/buildhat/data/version
@@ -1 +1 @@
-1643737936
+1737564117
diff --git a/buildhat/devices.py b/buildhat/devices.py
index 3b433b2..80dec80 100644
--- a/buildhat/devices.py
+++ b/buildhat/devices.py
@@ -3,6 +3,7 @@
import os
import sys
import weakref
+from concurrent.futures import Future
from .exc import DeviceError
from .serinterface import BuildHAT
@@ -57,7 +58,9 @@ def __init__(self, port):
Device._setup()
self._simplemode = -1
self._combimode = -1
+ self._modestr = ""
self._typeid = self._conn.typeid
+ self._interval = 10
if (
self._typeid in Device._device_names
and Device._device_names[self._typeid][0] != type(self).__name__ # noqa: W503
@@ -69,7 +72,15 @@ def __init__(self, port):
def _setup(**kwargs):
if Device._instance:
return
- data = os.path.join(os.path.dirname(sys.modules["buildhat"].__file__), "data/")
+ if (
+ os.path.isdir(os.path.join(os.getcwd(), "data/"))
+ and os.path.isfile(os.path.join(os.getcwd(), "data", "firmware.bin"))
+ and os.path.isfile(os.path.join(os.getcwd(), "data", "signature.bin"))
+ and os.path.isfile(os.path.join(os.getcwd(), "data", "version"))
+ ):
+ data = os.path.join(os.getcwd(), "data/")
+ else:
+ data = os.path.join(os.path.dirname(sys.modules["buildhat"].__file__), "data/")
firm = os.path.join(data, "firmware.bin")
sig = os.path.join(data, "signature.bin")
ver = os.path.join(data, "version")
@@ -185,7 +196,7 @@ def isconnected(self):
def reverse(self):
"""Reverse polarity"""
- self._write(f"port {self.port} ; plimit 1 ; set -1\r")
+ self._write(f"port {self.port} ; port_plimit 1 ; set -1\r")
def get(self):
"""Extract information from device
@@ -194,18 +205,11 @@ def get(self):
:raises DeviceError: Occurs if device not in valid mode
"""
self.isconnected()
- idx = -1
- if self._simplemode != -1:
- idx = self._simplemode
- elif self._combimode != -1:
- idx = self._combimode
- else:
+ if self._simplemode == -1 and self._combimode == -1:
raise DeviceError("Not in simple or combimode")
- self._write(f"port {self.port} ; selonce {idx}\r")
- # wait for data
- with Device._instance.portcond[self.port]:
- Device._instance.portcond[self.port].wait()
- return self._conn.data
+ ftr = Future()
+ self._hat.portftr[self.port].append(ftr)
+ return ftr.result()
def mode(self, modev):
"""Set combimode or simple mode
@@ -214,18 +218,32 @@ def mode(self, modev):
"""
self.isconnected()
if isinstance(modev, list):
- self._combimode = 0
modestr = ""
for t in modev:
modestr += f"{t[0]} {t[1]} "
- self._write(f"port {self.port} ; combi {self._combimode} {modestr}\r")
+ if self._simplemode == -1 and self._combimode == 0 and self._modestr == modestr:
+ return
+ self._write(f"port {self.port}; select\r")
+ self._combimode = 0
+ self._write((f"port {self.port} ; combi {self._combimode} {modestr} ; "
+ f"select {self._combimode} ; "
+ f"selrate {self._interval}\r"))
self._simplemode = -1
+ self._modestr = modestr
+ self._conn.combimode = 0
+ self._conn.simplemode = -1
else:
+ if self._combimode == -1 and self._simplemode == int(modev):
+ return
# Remove combi mode
if self._combimode != -1:
self._write(f"port {self.port} ; combi {self._combimode}\r")
+ self._write(f"port {self.port}; select\r")
self._combimode = -1
self._simplemode = int(modev)
+ self._write(f"port {self.port} ; select {int(modev)} ; selrate {self._interval}\r")
+ self._conn.combimode = -1
+ self._conn.simplemode = int(modev)
def select(self):
"""Request data from mode
@@ -239,11 +257,11 @@ def select(self):
idx = self._combimode
else:
raise DeviceError("Not in simple or combimode")
- self._write(f"port {self.port} ; select {idx}\r")
+ self._write(f"port {self.port} ; select {idx} ; selrate {self._interval}\r")
def on(self):
"""Turn on sensor"""
- self._write(f"port {self.port} ; plimit 1 ; on\r")
+ self._write(f"port {self.port} ; port_plimit 1 ; on\r")
def off(self):
"""Turn off sensor"""
@@ -274,3 +292,28 @@ def callback(self, func):
self._conn.callit = None
else:
self._conn.callit = weakref.WeakMethod(func)
+
+ @property
+ def interval(self):
+ """Interval between data points in milliseconds
+
+ :getter: Gets interval
+ :setter: Sets interval
+ :return: Device interval
+ :rtype: int
+ """
+ return self._interval
+
+ @interval.setter
+ def interval(self, value):
+ """Interval between data points in milliseconds
+
+ :param value: Interval
+ :type value: int
+ :raises DeviceError: Occurs if invalid interval passed
+ """
+ if isinstance(value, int) and value >= 0 and value <= 1000000000:
+ self._interval = value
+ self._write(f"port {self.port} ; selrate {self._interval}\r")
+ else:
+ raise DeviceError("Invalid interval")
diff --git a/buildhat/hat.py b/buildhat/hat.py
index 782a87f..0a277eb 100644
--- a/buildhat/hat.py
+++ b/buildhat/hat.py
@@ -1,5 +1,7 @@
"""HAT handling functionality"""
+from concurrent.futures import Future
+
from .devices import Device
@@ -39,17 +41,24 @@ def get(self):
"description": desc}
return devices
+ def get_logfile(self):
+ """Get the filename of the debug log (If enabled, None otherwise)
+
+ :return: Path of the debug logfile
+ :rtype: str or None
+ """
+ return Device._instance.debug_filename
+
def get_vin(self):
"""Get the voltage present on the input power jack
:return: Voltage on the input power jack
:rtype: float
"""
+ ftr = Future()
+ Device._instance.vinftr.append(ftr)
Device._instance.write(b"vin\r")
- with Device._instance.vincond:
- Device._instance.vincond.wait()
-
- return Device._instance.vin
+ return ftr.result()
def _set_led(self, intmode):
if isinstance(intmode, int) and intmode >= -1 and intmode <= 3:
diff --git a/buildhat/light.py b/buildhat/light.py
index 7472f5c..1748e55 100644
--- a/buildhat/light.py
+++ b/buildhat/light.py
@@ -30,4 +30,12 @@ def brightness(self, brightness):
"""
if not (brightness >= 0 and brightness <= 100):
raise LightError("Need brightness arg, of 0 to 100")
- self._write(f"port {self.port} ; on ; plimit {brightness / 100.0}\r")
+ if brightness > 0:
+ self._write(f"port {self.port} ; on ; set {brightness / 100.0}\r")
+ else:
+ self.off()
+
+ def off(self):
+ """Turn off lights"""
+ # Using coast to turn off DIY lights completely
+ self._write(f"port {self.port} ; coast\r")
diff --git a/buildhat/motors.py b/buildhat/motors.py
index 62f80ea..38786c0 100644
--- a/buildhat/motors.py
+++ b/buildhat/motors.py
@@ -3,6 +3,7 @@
import threading
import time
from collections import deque
+from concurrent.futures import Future
from enum import Enum
from threading import Condition
@@ -26,7 +27,6 @@ def __init__(self, port):
self._default_speed = 20
self._currentspeed = 0
self.plimit(0.7)
- self.bias(0.3)
def set_default_speed(self, default_speed):
"""Set the default speed of the motor
@@ -71,17 +71,17 @@ def plimit(self, plimit):
"""
if not (plimit >= 0 and plimit <= 1):
raise MotorError("plimit should be 0 to 1")
- self._write(f"port {self.port} ; plimit {plimit}\r")
+ self._write(f"port {self.port} ; port_plimit {plimit}\r")
def bias(self, bias):
"""Bias motor
:param bias: Value 0 to 1
:raises MotorError: Occurs if invalid bias value passed
- """
- if not (bias >= 0 and bias <= 1):
- raise MotorError("bias should be 0 to 1")
- self._write(f"port {self.port} ; bias {bias}\r")
+
+ .. deprecated:: 0.6.0
+ """ # noqa: RST303
+ raise MotorError("Bias no longer available")
class MotorRunmode(Enum):
@@ -108,9 +108,17 @@ def __init__(self, port):
super().__init__(port)
self.default_speed = 20
self._currentspeed = 0
- self.mode([(1, 0), (2, 0), (3, 0)])
+ if self._typeid in {38}:
+ self.mode([(1, 0), (2, 0)])
+ self._combi = "1 0 2 0"
+ self._noapos = True
+ else:
+ self.mode([(1, 0), (2, 0), (3, 0)])
+ self._combi = "1 0 2 0 3 0"
+ self._noapos = False
self.plimit(0.7)
- self.bias(0.3)
+ self.pwmparams(0.65, 0.01)
+ self._rpm = False
self._release = True
self._bqueue = deque(maxlen=5)
self._cvqueue = Condition()
@@ -118,6 +126,13 @@ def __init__(self, port):
self._oldpos = None
self._runmode = MotorRunmode.NONE
+ def set_speed_unit_rpm(self, rpm=False):
+ """Set whether to use RPM for speed units or not
+
+ :param rpm: Boolean to determine whether to use RPM for units
+ """
+ self._rpm = rpm
+
def set_default_speed(self, default_speed):
"""Set the default speed of the motor
@@ -160,7 +175,10 @@ def _run_to_position(self, degrees, speed, direction):
self._runmode = MotorRunmode.DEGREES
data = self.get()
pos = data[1]
- apos = data[2]
+ if self._noapos:
+ apos = pos
+ else:
+ apos = data[2]
diff = (degrees - apos + 180) % 360 - 180
newpos = (pos + diff) / 360
v1 = (degrees - apos) % 360
@@ -189,14 +207,18 @@ def _run_positional_ramp(self, pos, newpos, speed):
:param newpos: New motor postion in decimal rotations (from preset position)
:param speed: -100 to 100
"""
- # Collapse speed range to -5 to 5
- speed *= 0.05
+ if self._rpm:
+ speed = self._speed_process(speed)
+ else:
+ speed *= 0.05 # Collapse speed range to -5 to 5
dur = abs((newpos - pos) / speed)
- cmd = (f"port {self.port}; combi 0 1 0 2 0 3 0 ; select 0 ; pid {self.port} 0 1 s4 0.0027777778 0 5 0 .1 3 ; "
+ cmd = (f"port {self.port}; select 0 ; selrate {self._interval}; "
+ f"pid {self.port} 0 1 s4 0.0027777778 0 5 0 .1 3 0.01; "
f"set ramp {pos} {newpos} {dur} 0\r")
+ ftr = Future()
+ self._hat.rampftr[self.port].append(ftr)
self._write(cmd)
- with self._hat.rampcond[self.port]:
- self._hat.rampcond[self.port].wait()
+ ftr.result()
if self._release:
time.sleep(0.2)
self.coast()
@@ -217,10 +239,9 @@ def run_for_degrees(self, degrees, speed=None, blocking=True):
if not (speed >= -100 and speed <= 100):
raise MotorError("Invalid Speed")
if not blocking:
- th = threading.Thread(target=self._run_for_degrees, args=(degrees, speed))
- th.daemon = True
- th.start()
+ self._queue((self._run_for_degrees, (degrees, speed)))
else:
+ self._wait_for_nonblocking()
self._run_for_degrees(degrees, speed)
def run_to_position(self, degrees, speed=None, blocking=True, direction="shortest"):
@@ -240,19 +261,25 @@ def run_to_position(self, degrees, speed=None, blocking=True, direction="shortes
if degrees < -180 or degrees > 180:
raise MotorError("Invalid angle")
if not blocking:
- th = threading.Thread(target=self._run_to_position, args=(degrees, speed, direction))
- th.daemon = True
- th.start()
+ self._queue((self._run_to_position, (degrees, speed, direction)))
else:
+ self._wait_for_nonblocking()
self._run_to_position(degrees, speed, direction)
def _run_for_seconds(self, seconds, speed):
+ speed = self._speed_process(speed)
self._runmode = MotorRunmode.SECONDS
- cmd = (f"port {self.port} ; combi 0 1 0 2 0 3 0 ; select 0 ; pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100; "
+ if self._rpm:
+ pid = f"pid_diff {self.port} 0 5 s2 0.0027777778 1 0 2.5 0 .4 0.01; "
+ else:
+ pid = f"pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100 0.01;"
+ cmd = (f"port {self.port} ; select 0 ; selrate {self._interval}; "
+ f"{pid}"
f"set pulse {speed} 0.0 {seconds} 0\r")
+ ftr = Future()
+ self._hat.pulseftr[self.port].append(ftr)
self._write(cmd)
- with self._hat.pulsecond[self.port]:
- self._hat.pulsecond[self.port].wait()
+ ftr.result()
if self._release:
self.coast()
self._runmode = MotorRunmode.NONE
@@ -271,10 +298,9 @@ def run_for_seconds(self, seconds, speed=None, blocking=True):
if not (speed >= -100 and speed <= 100):
raise MotorError("Invalid Speed")
if not blocking:
- th = threading.Thread(target=self._run_for_seconds, args=(seconds, speed))
- th.daemon = True
- th.start()
+ self._queue((self._run_for_seconds, (seconds, speed)))
else:
+ self._wait_for_nonblocking()
self._run_for_seconds(seconds, speed)
def start(self, speed=None):
@@ -283,6 +309,7 @@ def start(self, speed=None):
:param speed: Speed ranging from -100 to 100
:raises MotorError: Occurs when invalid speed specified
"""
+ self._wait_for_nonblocking()
if self._runmode == MotorRunmode.FREE:
if self._currentspeed == speed:
# Already running at this speed, do nothing
@@ -296,9 +323,15 @@ def start(self, speed=None):
else:
if not (speed >= -100 and speed <= 100):
raise MotorError("Invalid Speed")
+ speed = self._speed_process(speed)
cmd = f"port {self.port} ; set {speed}\r"
if self._runmode == MotorRunmode.NONE:
- cmd = (f"port {self.port} ; combi 0 1 0 2 0 3 0 ; select 0 ; pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100; "
+ if self._rpm:
+ pid = f"pid_diff {self.port} 0 5 s2 0.0027777778 1 0 2.5 0 .4 0.01; "
+ else:
+ pid = f"pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100 0.01; "
+ cmd = (f"port {self.port} ; select 0 ; selrate {self._interval}; "
+ f"{pid}"
f"set {speed}\r")
self._runmode = MotorRunmode.FREE
self._currentspeed = speed
@@ -306,6 +339,7 @@ def start(self, speed=None):
def stop(self):
"""Stop motor"""
+ self._wait_for_nonblocking()
self._runmode = MotorRunmode.NONE
self._currentspeed = 0
self.coast()
@@ -324,7 +358,10 @@ def get_aposition(self):
:return: Absolute position of motor from -180 to 180
:rtype: int
"""
- return self.get()[2]
+ if self._noapos:
+ raise MotorError("No absolute position with this motor")
+ else:
+ return self.get()[2]
def get_speed(self):
"""Get speed of motor
@@ -346,7 +383,11 @@ def when_rotated(self):
return self._when_rotated
def _intermediate(self, data):
- speed, pos, apos = data
+ if self._noapos:
+ speed, pos = data
+ apos = None
+ else:
+ speed, pos, apos = data
if self._oldpos is None:
self._oldpos = pos
return
@@ -372,17 +413,30 @@ def plimit(self, plimit):
"""
if not (plimit >= 0 and plimit <= 1):
raise MotorError("plimit should be 0 to 1")
- self._write(f"port {self.port} ; plimit {plimit}\r")
+ self._write(f"port {self.port} ; port_plimit {plimit}\r")
def bias(self, bias):
"""Bias motor
:param bias: Value 0 to 1
:raises MotorError: Occurs if invalid bias value passed
+
+ .. deprecated:: 0.6.0
+ """ # noqa: RST303
+ raise MotorError("Bias no longer available")
+
+ def pwmparams(self, pwmthresh, minpwm):
+ """PWM thresholds
+
+ :param pwmthresh: Value 0 to 1, threshold below, will switch from fast to slow, PWM
+ :param minpwm: Value 0 to 1, threshold below which it switches off the drive altogether
+ :raises MotorError: Occurs if invalid values are passed
"""
- if not (bias >= 0 and bias <= 1):
- raise MotorError("bias should be 0 to 1")
- self._write(f"port {self.port} ; bias {bias}\r")
+ if not (pwmthresh >= 0 and pwmthresh <= 1):
+ raise MotorError("pwmthresh should be 0 to 1")
+ if not (minpwm >= 0 and minpwm <= 1):
+ raise MotorError("minpwm should be 0 to 1")
+ self._write(f"port {self.port} ; pwmparams {pwmthresh} {minpwm}\r")
def pwm(self, pwmv):
"""PWM motor
@@ -402,6 +456,42 @@ def float(self):
"""Float motor"""
self.pwm(0)
+ @property
+ def release(self):
+ """Determine if motor is released after running, so can be turned by hand
+
+ :getter: Returns whether motor is released, so can be turned by hand
+ :setter: Sets whether motor is released, so can be turned by hand
+ :return: Whether motor is released, so can be turned by hand
+ :rtype: bool
+ """
+ return self._release
+
+ @release.setter
+ def release(self, value):
+ """Determine if the motor is released after running, so can be turned by hand
+
+ :param value: Whether motor should be released, so can be turned by hand
+ :type value: bool
+ """
+ if not isinstance(value, bool):
+ raise MotorError("Must pass boolean")
+ self._release = value
+
+ def _queue(self, cmd):
+ Device._instance.motorqueue[self.port].put(cmd)
+
+ def _wait_for_nonblocking(self):
+ """Wait for nonblocking commands to finish"""
+ Device._instance.motorqueue[self.port].join()
+
+ def _speed_process(self, speed):
+ """Lower speed value"""
+ if self._rpm:
+ return speed / 60
+ else:
+ return speed
+
class MotorPair:
"""Pair of motors
@@ -421,6 +511,8 @@ def __init__(self, leftport, rightport):
self._leftmotor = Motor(leftport)
self._rightmotor = Motor(rightport)
self.default_speed = 20
+ self._release = True
+ self._rpm = False
def set_default_speed(self, default_speed):
"""Set the default speed of the motor
@@ -429,6 +521,15 @@ def set_default_speed(self, default_speed):
"""
self.default_speed = default_speed
+ def set_speed_unit_rpm(self, rpm=False):
+ """Set whether to use RPM for speed units or not
+
+ :param rpm: Boolean to determine whether to use RPM for units
+ """
+ self._rpm = rpm
+ self._leftmotor.set_speed_unit_rpm(rpm)
+ self._rightmotor.set_speed_unit_rpm(rpm)
+
def run_for_rotations(self, rotations, speedl=None, speedr=None):
"""Run pair of motors for N rotations
@@ -520,3 +621,27 @@ def run_to_position(self, degreesl, degreesr, speed=None, direction="shortest"):
th2.start()
th1.join()
th2.join()
+
+ @property
+ def release(self):
+ """Determine if motors are released after running, so can be turned by hand
+
+ :getter: Returns whether motors are released, so can be turned by hand
+ :setter: Sets whether motors are released, so can be turned by hand
+ :return: Whether motors are released, so can be turned by hand
+ :rtype: bool
+ """
+ return self._release
+
+ @release.setter
+ def release(self, value):
+ """Determine if motors are released after running, so can be turned by hand
+
+ :param value: Whether motors should be released, so can be turned by hand
+ :type value: bool
+ """
+ if not isinstance(value, bool):
+ raise MotorError("Must pass boolean")
+ self._release = value
+ self._leftmotor.release = value
+ self._rightmotor.release = value
diff --git a/buildhat/serinterface.py b/buildhat/serinterface.py
index 14e6803..901dc2e 100644
--- a/buildhat/serinterface.py
+++ b/buildhat/serinterface.py
@@ -1,6 +1,7 @@
"""Build HAT handling functionality"""
import logging
+import os
import queue
import tempfile
import threading
@@ -31,6 +32,8 @@ def __init__(self):
self.typeid = -1
self.connected = False
self.callit = None
+ self.simplemode = -1
+ self.combimode = -1
def update(self, typeid, connected, callit=None):
"""Update connection information for port
@@ -84,39 +87,39 @@ def __init__(self, firmware, signature, version, device="/dev/serial0", debug=Fa
self.cond = Condition()
self.state = HatState.OTHER
self.connections = []
- self.portcond = []
- self.pulsecond = []
- self.rampcond = []
+ self.portftr = []
+ self.pulseftr = []
+ self.rampftr = []
+ self.vinftr = []
+ self.motorqueue = []
self.fin = False
self.running = True
- self.vincond = Condition()
- self.vin = None
+ self.debug_filename = None
if debug:
tmp = tempfile.NamedTemporaryFile(suffix=".log", prefix="buildhat-", delete=False)
+ self.debug_filename = tmp.name
logging.basicConfig(filename=tmp.name, format='%(asctime)s %(message)s',
- level=logging.INFO)
+ level=logging.DEBUG)
for _ in range(4):
self.connections.append(Connection())
- self.portcond.append(Condition())
- self.pulsecond.append(Condition())
- self.rampcond.append(Condition())
-
+ self.portftr.append([])
+ self.pulseftr.append([])
+ self.rampftr.append([])
+ self.motorqueue.append(queue.Queue())
+
+ # On a Pi 5 /dev/serial0 will point to /dev/ttyAMA10 (which *only*
+ # exists on a Pi 5, and is the 3-pin debug UART connector)
+ # The UART on the Pi 5 GPIO header is /dev/ttyAMA0
+ if device == "/dev/serial0" and os.readlink(device) == "ttyAMA10":
+ device = "/dev/ttyAMA0"
self.ser = serial.Serial(device, 115200, timeout=5)
# Check if we're in the bootloader or the firmware
self.write(b"version\r")
- emptydata = 0
incdata = 0
while True:
line = self.read()
- if len(line) == 0:
- # Didn't receive any data
- emptydata += 1
- if emptydata > 3:
- break
- else:
- continue
if cmp(line, BuildHAT.FIRMWARE):
self.state = HatState.FIRMWARE
ver = line[len(BuildHAT.FIRMWARE):].split(' ')
@@ -150,15 +153,22 @@ def __init__(self, firmware, signature, version, device="/dev/serial0", debug=Fa
self.cb.daemon = True
self.cb.start()
+ for q in self.motorqueue:
+ ml = threading.Thread(target=self.motorloop, args=(q,))
+ ml.daemon = True
+ ml.start()
+
# Drop timeout value to 1s
+ listevt = threading.Event()
self.ser.timeout = 1
- self.th = threading.Thread(target=self.loop, args=(self.cond, self.state == HatState.FIRMWARE, self.cbqueue))
+ self.th = threading.Thread(target=self.loop, args=(self.cond, self.state == HatState.FIRMWARE, self.cbqueue, listevt))
self.th.daemon = True
self.th.start()
if self.state == HatState.FIRMWARE:
self.write(b"port 0 ; select ; port 1 ; select ; port 2 ; select ; port 3 ; select ; echo 0\r")
self.write(b"list\r")
+ listevt.set()
elif self.state == HatState.NEEDNEWFIRMWARE or self.state == HatState.BOOTLOADER:
self.write(b"reboot\r")
@@ -239,9 +249,9 @@ def write(self, data, log=True, replace=""):
self.ser.write(data)
if not self.fin and log:
if replace != "":
- logging.info(f"> {replace}")
+ logging.debug(f"> {replace}")
else:
- logging.info(f"> {data.decode('utf-8', 'ignore').strip()}")
+ logging.debug(f"> {data.decode('utf-8', 'ignore').strip()}")
def read(self):
"""Read data from the serial port of Build HAT
@@ -254,7 +264,7 @@ def read(self):
except serial.SerialException:
pass
if line != "":
- logging.info(f"< {line}")
+ logging.debug(f"< {line}")
return line
def shutdown(self):
@@ -264,6 +274,8 @@ def shutdown(self):
self.running = False
self.th.join()
self.cbqueue.put(())
+ for q in self.motorqueue:
+ q.put((None, None))
self.cb.join()
turnoff = ""
for p in range(4):
@@ -276,6 +288,21 @@ def shutdown(self):
self.write(f"{turnoff}\r".encode())
self.write(b"port 0 ; select ; port 1 ; select ; port 2 ; select ; port 3 ; select ; echo 0\r")
+ def motorloop(self, q):
+ """Event handling for non-blocking motor commands
+
+ :param q: Queue of motor functions
+ """
+ while self.running:
+ func, data = q.get()
+ if func is None:
+ break
+ else:
+ func(*data)
+ func = None # Necessary for 'del' to function correctly on motor object
+ data = None
+ q.task_done()
+
def callbackloop(self, q):
"""Event handling for callbacks
@@ -292,7 +319,7 @@ def callbackloop(self, q):
cb[0]()(cb[1])
q.task_done()
- def loop(self, cond, uselist, q):
+ def loop(self, cond, uselist, q, listevt):
"""Event handling for Build HAT
:param cond: Condition used to block user's script till we're ready
@@ -312,12 +339,12 @@ def loop(self, cond, uselist, q):
self.connections[portid].update(typeid, True)
if typeid == 64:
self.write(f"port {portid} ; on\r".encode())
- if uselist:
+ if uselist and listevt.is_set():
count += 1
elif cmp(msg, BuildHAT.CONNECTEDPASSIVE):
typeid = int(line[2 + len(BuildHAT.CONNECTEDPASSIVE):], 16)
self.connections[portid].update(typeid, True)
- if uselist:
+ if uselist and listevt.is_set():
count += 1
elif cmp(msg, BuildHAT.DISCONNECTED):
self.connections[portid].update(-1, False)
@@ -325,14 +352,14 @@ def loop(self, cond, uselist, q):
self.connections[portid].update(-1, False)
elif cmp(msg, BuildHAT.NOTCONNECTED):
self.connections[portid].update(-1, False)
- if uselist:
+ if uselist and listevt.is_set():
count += 1
elif cmp(msg, BuildHAT.RAMPDONE):
- with self.rampcond[portid]:
- self.rampcond[portid].notify()
+ ftr = self.rampftr[portid].pop()
+ ftr.set_result(True)
elif cmp(msg, BuildHAT.PULSEDONE):
- with self.pulsecond[portid]:
- self.pulsecond[portid].notify()
+ ftr = self.pulseftr[portid].pop()
+ ftr.set_result(True)
if uselist and count == 4:
with cond:
@@ -343,7 +370,7 @@ def loop(self, cond, uselist, q):
def runit():
with cond:
cond.notify()
- t = Timer(8.0, runit)
+ t = Timer(11.0, runit)
t.start()
if line[0] == "P" and (line[2] == "C" or line[2] == "M"):
@@ -356,15 +383,22 @@ def runit():
else:
if d != "":
newdata.append(int(d))
+ # Check data was for our current mode
+ if line[2] == "M" and self.connections[portid].simplemode != int(line[3]):
+ continue
+ elif line[2] == "C" and self.connections[portid].combimode != int(line[3]):
+ continue
callit = self.connections[portid].callit
if callit is not None:
q.put((callit, newdata))
self.connections[portid].data = newdata
- with self.portcond[portid]:
- self.portcond[portid].notify()
+ try:
+ ftr = self.portftr[portid].pop()
+ ftr.set_result(newdata)
+ except IndexError:
+ pass
if len(line) >= 5 and line[1] == "." and line.endswith(" V"):
vin = float(line.split(" ")[0])
- self.vin = vin
- with self.vincond:
- self.vincond.notify()
+ ftr = self.vinftr.pop()
+ ftr.set_result(vin)
diff --git a/buildhat/wedo.py b/buildhat/wedo.py
index df99dd6..372524e 100644
--- a/buildhat/wedo.py
+++ b/buildhat/wedo.py
@@ -36,6 +36,8 @@ class MotionSensor(Device):
:raises DeviceError: Occurs if there is no motion sensor attached to port
"""
+ default_mode = 0
+
def __init__(self, port):
"""
Initialise motion sensor
@@ -43,7 +45,19 @@ def __init__(self, port):
:param port: Port of device
"""
super().__init__(port)
- self.mode(0)
+ self.mode(self.default_mode)
+
+ def set_default_data_mode(self, mode):
+ """
+ Set the mode most often queried from this device.
+
+ This significantly improves performance when repeatedly accessing data
+
+ :param mode: 0 for distance (default), 1 for movement count
+ """
+ if mode == 1 or mode == 0:
+ self.default_mode = mode
+ self.mode(mode)
def get_distance(self):
"""
@@ -52,4 +66,26 @@ def get_distance(self):
:return: Distance from motion sensor
:rtype: int
"""
- return self.get()[0]
+ return self._get_data_from_mode(0)
+
+ def get_movement_count(self):
+ """
+ Return the movement counter
+
+ This is the count of how many times the sensor has detected an object
+ that moved within 4 blocks of the sensor since the sensor has been
+ plugged in or the BuildHAT reset
+
+ :return: Count of objects detected
+ :rtype: int
+ """
+ return self._get_data_from_mode(1)
+
+ def _get_data_from_mode(self, mode):
+ if self.default_mode == mode:
+ return self.get()[0]
+ else:
+ self.mode(mode)
+ retval = self.get()[0]
+ self.mode(self.default_mode)
+ return retval
diff --git a/docs/buildhat/colordistance.py b/docs/buildhat/colordistance.py
index 8a73964..d8a3466 100755
--- a/docs/buildhat/colordistance.py
+++ b/docs/buildhat/colordistance.py
@@ -4,6 +4,7 @@
color = ColorDistanceSensor('C')
+print("Distance", color.get_distance())
print("RGBI", color.get_color_rgb())
print("Ambient", color.get_ambient_light())
print("Reflected", color.get_reflected_light())
diff --git a/docs/buildhat/index.rst b/docs/buildhat/index.rst
index 9ac86b4..f4f7fc4 100644
--- a/docs/buildhat/index.rst
+++ b/docs/buildhat/index.rst
@@ -28,10 +28,51 @@ power supply. For best results, use the `official Raspberry Pi Build HAT power s
.. _official Raspberry Pi Build HAT power supply: http://raspberrypi.com/products/build-hat-power-supply
+It is now possible to use custom firmware with the library. To do this you can follow the steps below.
+
+.. code-block::
+ :caption: Using custom firmware
+
+ sudo apt install cmake python3 build-essential gcc-arm-none-eabi libnewlib-arm-none-eabi libstdc++-arm-none-eabi-newlib
+
+ git clone https://github.com/raspberrypi/pico-sdk.git --recursive
+ git clone https://github.com/raspberrypi/buildhat.git --recursive
+
+ cd buildhat
+ export PICO_SDK_PATH="$(pwd)/../pico-sdk/"
+ make
+
+ cd ..
+ mkdir test
+ cd test
+ mkdir data
+
+ cp ../buildhat/firmware-pico/build/main.bin data/firmware.bin
+ cp ../buildhat/bhbl-pico/signature.bin data/signature.bin
+ cat ../buildhat/firmware-pico/version.h | sed 's/#define FWVERSION "//g; s/ .*//g' > data/version
+
+Then place your script, such as the following, within the test/ directory.
+
+.. code-block::
+ :caption: Create test.py in test directory
+
+ import time
+ from buildhat import Motor
+
+ m = Motor('A')
+ m.start()
+
+ time.sleep(5)
+
+Then use: ``python test.py`` in the test directory, to run your script with your custom firmware.
+
+Note if you want python to always reload the firmware from your **data/** directory each time
+you run your script, simply write the value: -1 to **data/version**.
+
.. warning::
The API for the Build HAT is undergoing active development and is subject
- to change.
+ to change.
.. toctree::
:maxdepth: 2
@@ -40,9 +81,11 @@ power supply. For best results, use the `official Raspberry Pi Build HAT power s
colordistancesensor.rst
distancesensor.rst
forcesensor.rst
+ light.rst
matrix.rst
+ motionsensor.rst
motor.rst
motorpair.rst
passivemotor.rst
+ tiltsensor.rst
hat.rst
- light.rst
diff --git a/docs/buildhat/motion.py b/docs/buildhat/motion.py
new file mode 100755
index 0000000..cf48829
--- /dev/null
+++ b/docs/buildhat/motion.py
@@ -0,0 +1,11 @@
+"""Example using motion sensor"""
+
+from time import sleep
+
+from buildhat import MotionSensor
+
+motion = MotionSensor('A')
+
+for _ in range(50):
+ print(motion.get_distance())
+ sleep(0.1)
diff --git a/docs/buildhat/motionsensor.rst b/docs/buildhat/motionsensor.rst
new file mode 100644
index 0000000..e9d0f5f
--- /dev/null
+++ b/docs/buildhat/motionsensor.rst
@@ -0,0 +1,19 @@
+Motion Sensor
+=============
+
+|location_link|
+
+.. |location_link| raw:: html
+
+ BrickLink item
+
+.. autoclass:: buildhat.MotionSensor
+ :members:
+ :inherited-members:
+
+Example
+-------
+
+.. literalinclude:: motion.py
+
+
diff --git a/docs/buildhat/passivemotor.rst b/docs/buildhat/passivemotor.rst
index 2b57b20..357055e 100644
--- a/docs/buildhat/passivemotor.rst
+++ b/docs/buildhat/passivemotor.rst
@@ -6,7 +6,7 @@ LEGO® TECHNIC™ motors which do not have an integrated rotation sensor (encode
.. |location_link1| raw:: html
- LEGO® Train motor 88011
+ LEGO® Train motor 88011
.. |location_link1b| raw:: html
diff --git a/docs/buildhat/tilt.py b/docs/buildhat/tilt.py
new file mode 100755
index 0000000..36ec8d1
--- /dev/null
+++ b/docs/buildhat/tilt.py
@@ -0,0 +1,11 @@
+"""Example using tilt sensor"""
+
+from time import sleep
+
+from buildhat import TiltSensor
+
+tilt = TiltSensor('A')
+
+for _ in range(50):
+ print(tilt.get_tilt())
+ sleep(0.1)
diff --git a/docs/buildhat/tiltsensor.rst b/docs/buildhat/tiltsensor.rst
new file mode 100644
index 0000000..e428882
--- /dev/null
+++ b/docs/buildhat/tiltsensor.rst
@@ -0,0 +1,19 @@
+Tilt Sensor
+===========
+
+|location_link|
+
+.. |location_link| raw:: html
+
+ BrickLink item
+
+.. autoclass:: buildhat.TiltSensor
+ :members:
+ :inherited-members:
+
+Example
+-------
+
+.. literalinclude:: tilt.py
+
+
diff --git a/docs/conf.py b/docs/conf.py
index e17f1fd..aae1e0d 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -59,7 +59,7 @@
# General information about the project.
project = 'Raspberry Pi Build HAT'
-copyright = '''2020-2021 - Raspberry Pi Foundation;
+copyright = '''2020-2025 - Raspberry Pi Foundation;
2017-2020 - LEGO System A/S - Aastvej 1, 7190 Billund, DK.'''
# The version info for the project you're documenting, acts as replacement for
diff --git a/docs/license.rst b/docs/license.rst
index 2ad53fd..af5c040 100644
--- a/docs/license.rst
+++ b/docs/license.rst
@@ -3,7 +3,7 @@ License information
The MIT License (MIT)
-Copyright (c) 2020-2021 - Raspberry Pi Foundation
+Copyright (c) 2020-2025 - Raspberry Pi Foundation
Copyright (c) 2017-2021 - LEGO System A/S - Aastvej 1, 7190 Billund, DK
diff --git a/requirements-test.txt b/requirements-test.txt
index b6d3acc..8d7b912 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,16 +1,10 @@
-flake8
-flake8-bandit
-flake8-broken-line
-flake8-bugbear
-flake8-commas
-flake8-comprehensions
+flake8>=6.0.0
+flake8-bugbear>=22.10.27
+flake8-comprehensions>=3.10
flake8-debugger
-flake8-docstrings
-flake8-eradicate
-flake8-isort
-flake8-isort
+flake8-docstrings>=1.6.0
+flake8-isort>=5.0
flake8-pylint
-flake8-quotes
flake8-rst-docstrings
flake8-string-format
gpiozero
diff --git a/setup.py b/setup.py
index fe788ea..38ae4f1 100755
--- a/setup.py
+++ b/setup.py
@@ -2,7 +2,7 @@
"""Setup file"""
-# Copyright (c) 2020-2021 Raspberry Pi Foundation
+# Copyright (c) 2020-2025 Raspberry Pi Foundation
#
# SPDX-License-Identifier: MIT
diff --git a/test/color.py b/test/color.py
new file mode 100644
index 0000000..ba23ff2
--- /dev/null
+++ b/test/color.py
@@ -0,0 +1,47 @@
+"""Test Color Sensor functionality"""
+import time
+import unittest
+
+from buildhat import ColorSensor
+
+
+class TestColor(unittest.TestCase):
+ """Test color sensor functions"""
+
+ def test_color_interval(self):
+ """Test color sensor interval"""
+ color = ColorSensor('A')
+ color.avg_reads = 1
+ color.interval = 10
+ count = 1000
+ expected_dur = count * color.interval * 1e-3
+
+ start = time.time()
+ for _ in range(count):
+ color.get_ambient_light()
+ end = time.time()
+ diff = abs((end - start) - expected_dur)
+ self.assertLess(diff, 0.25)
+
+ start = time.time()
+ for _ in range(count):
+ color.get_color_rgbi()
+ end = time.time()
+ diff = abs((end - start) - expected_dur)
+ self.assertLess(diff, 0.25)
+
+ def test_caching(self):
+ """Test to make sure we're not reading cached data"""
+ color = ColorSensor('A')
+ color.avg_reads = 1
+ color.interval = 10
+
+ for _ in range(100):
+ color.mode(2)
+ self.assertEqual(len(color.get()), 1)
+ color.mode(5)
+ self.assertEqual(len(color.get()), 4)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/motors.py b/test/motors.py
index 4d3275f..46d01ec 100644
--- a/test/motors.py
+++ b/test/motors.py
@@ -3,13 +3,15 @@
import time
import unittest
-from buildhat import Motor
+from buildhat import Hat, Motor
from buildhat.exc import DeviceError, MotorError
class TestMotor(unittest.TestCase):
"""Test motors"""
+ THRESHOLD_DISTANCE = 15
+
def test_rotations(self):
"""Test motor rotating"""
m = Motor('A')
@@ -19,18 +21,83 @@ def test_rotations(self):
rotated = (pos2 - pos1) / 360
self.assertLess(abs(rotated - 2), 0.5)
+ def test_nonblocking(self):
+ """Test motor nonblocking mode"""
+ m = Motor('A')
+ m.set_default_speed(10)
+ last = 0
+ for delay in [1, 0]:
+ for _ in range(3):
+ m.run_to_position(90, blocking=False)
+ time.sleep(delay)
+ m.run_to_position(90, blocking=False)
+ time.sleep(delay)
+ m.run_to_position(90, blocking=False)
+ time.sleep(delay)
+ m.run_to_position(last, blocking=False)
+ time.sleep(delay)
+ # Wait for a bit, before reading last position
+ time.sleep(7)
+ pos1 = m.get_aposition()
+ diff = abs((last - pos1 + 180) % 360 - 180)
+ self.assertLess(diff, self.THRESHOLD_DISTANCE)
+
+ def test_nonblocking_multiple(self):
+ """Test motor nonblocking mode"""
+ m1 = Motor('A')
+ m1.set_default_speed(10)
+ m2 = Motor('B')
+ m2.set_default_speed(10)
+ last = 0
+ for delay in [1, 0]:
+ for _ in range(3):
+ m1.run_to_position(90, blocking=False)
+ m2.run_to_position(90, blocking=False)
+ time.sleep(delay)
+ m1.run_to_position(90, blocking=False)
+ m2.run_to_position(90, blocking=False)
+ time.sleep(delay)
+ m1.run_to_position(90, blocking=False)
+ m2.run_to_position(90, blocking=False)
+ time.sleep(delay)
+ m1.run_to_position(last, blocking=False)
+ m2.run_to_position(last, blocking=False)
+ time.sleep(delay)
+ # Wait for a bit, before reading last position
+ time.sleep(7)
+ pos1 = m1.get_aposition()
+ diff = abs((last - pos1 + 180) % 360 - 180)
+ self.assertLess(diff, self.THRESHOLD_DISTANCE)
+ pos2 = m2.get_aposition()
+ diff = abs((last - pos2 + 180) % 360 - 180)
+ self.assertLess(diff, self.THRESHOLD_DISTANCE)
+
+ def test_nonblocking_mixed(self):
+ """Test motor nonblocking mode mixed with blocking mode"""
+ m = Motor('A')
+ m.run_for_seconds(5, blocking=False)
+ m.run_for_degrees(360)
+ m.run_for_seconds(5, blocking=False)
+ m.run_to_position(180)
+ m.run_for_seconds(5, blocking=False)
+ m.run_for_seconds(5)
+ m.run_for_seconds(5, blocking=False)
+ m.start()
+ m.run_for_seconds(5, blocking=False)
+ m.stop()
+
def test_position(self):
"""Test motor goes to desired position"""
m = Motor('A')
m.run_to_position(0)
pos1 = m.get_aposition()
diff = abs((0 - pos1 + 180) % 360 - 180)
- self.assertLess(diff, 10)
+ self.assertLess(diff, self.THRESHOLD_DISTANCE)
m.run_to_position(180)
pos1 = m.get_aposition()
diff = abs((180 - pos1 + 180) % 360 - 180)
- self.assertLess(diff, 10)
+ self.assertLess(diff, self.THRESHOLD_DISTANCE)
def test_time(self):
"""Test motor runs for correct duration"""
@@ -54,13 +121,6 @@ def test_plimit(self):
self.assertRaises(MotorError, m.plimit, -1)
self.assertRaises(MotorError, m.plimit, 2)
- def test_bias(self):
- """Test setting motor bias"""
- m = Motor('A')
- m.bias(0.5)
- self.assertRaises(MotorError, m.bias, -1)
- self.assertRaises(MotorError, m.bias, 2)
-
def test_pwm(self):
"""Test PWMing motor"""
m = Motor('A')
@@ -81,6 +141,18 @@ def handle_motor(speed, pos, apos):
m.run_for_seconds(1)
self.assertGreater(handle_motor.evt, 0)
+ def test_callback_interval(self):
+ """Test setting callback and interval"""
+ m = Motor('A')
+ m.interval = 10
+
+ def handle_motor(speed, pos, apos):
+ handle_motor.evt += 1
+ handle_motor.evt = 0
+ m.when_rotated = handle_motor
+ m.run_for_seconds(5)
+ self.assertGreater(handle_motor.evt, 0.8 * ((1 / ((m.interval) * 1e-3)) * 5))
+
def test_none_callback(self):
"""Test setting empty callback"""
m = Motor('A')
@@ -104,22 +176,67 @@ def test_continuous_start(self):
"""Test starting motor for 5mins"""
t = time.time() + (60 * 5)
m = Motor('A')
+ toggle = 0
while time.time() < t:
- m.start(0)
+ m.start(toggle)
+ toggle ^= 1
def test_continuous_degrees(self):
"""Test setting degrees for 5mins"""
t = time.time() + (60 * 5)
m = Motor('A')
+ toggle = 0
while time.time() < t:
- m.run_for_degrees(0)
+ m.run_for_degrees(toggle)
+ toggle ^= 1
def test_continuous_position(self):
"""Test setting position of motor for 5mins"""
t = time.time() + (60 * 5)
m = Motor('A')
+ toggle = 0
while time.time() < t:
- m.run_to_position(0)
+ m.run_to_position(toggle)
+ toggle ^= 1
+
+ def test_continuous_feedback(self):
+ """Test feedback of motor for 30mins"""
+ Hat(debug=True)
+ t = time.time() + (60 * 30)
+ m = Motor('A')
+ m.start(40)
+ while time.time() < t:
+ _ = (m.get_speed(), m.get_position(), m.get_aposition())
+
+ def test_interval(self):
+ """Test motor interval"""
+ m = Motor('A')
+ m.interval = 10
+ count = 1000
+ expected_dur = count * m.interval * 1e-3
+ start = time.time()
+ for _ in range(count):
+ m.get_position()
+ end = time.time()
+ diff = abs((end - start) - expected_dur)
+ self.assertLess(diff, expected_dur * 0.1)
+
+ def test_dual_interval(self):
+ """Test dual motor interval"""
+ m1 = Motor('A')
+ m2 = Motor('B')
+ for interval in [20, 10]:
+ m1.interval = interval
+ m2.interval = interval
+ count = 1000
+ expected_dur = count * m1.interval * 1e-3
+ start = time.time()
+ for _ in range(count):
+ m1.get_position()
+ m2.get_position()
+ end = time.time()
+ diff = abs((end - start) - expected_dur)
+ self.assertLess(diff, expected_dur * 0.1)
if __name__ == '__main__':