diff --git a/.flake8 b/.flake8 index 732c12e..8f58749 100644 --- a/.flake8 +++ b/.flake8 @@ -1,7 +1,7 @@ [flake8] docstring_style=sphinx max-line-length = 127 -ignore = D400, Q000, S311, PLW, PLC, PLR +ignore = D400, Q000, S311, W503, PLW, PLC, PLR per-file-ignores = buildhat/__init__.py:F401 exclude = docs/conf.py, docs/sphinxcontrib/cmtinc-buildhat.py, docs/sphinx_selective_exclude/*.py diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 8e72fc3..57a67fb 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -16,10 +16,10 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up Python 3.7 + - name: Set up Python 3.9 uses: actions/setup-python@v2 with: - python-version: 3.7 + python-version: 3.9 - name: Install dependencies run: | python -m pip install --upgrade pip @@ -34,5 +34,3 @@ jobs: flake8 . --version # stop the build if there are Python syntax errors or undefined names flake8 . --count --show-source --statistics - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics diff --git a/.gitignore b/.gitignore index f6d7f91..7112796 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,7 @@ share/python-wheels/ .installed.cfg *.egg MANIFEST +.pypirc # Installer logs pip-log.txt diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..bb16389 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,17 @@ +repos: +- repo: https://github.com/PyCQA/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + additional_dependencies: [ + 'flake8-bugbear>=22.10.27', + 'flake8-comprehensions>=3.10', + 'flake8-debugger', + 'flake8-docstrings>=1.6.0', + 'flake8-isort>=5.0', + 'flake8-pylint', + 'flake8-rst-docstrings', + 'flake8-string-format', + 'gpiozero', + 'pyserial' + ] diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 3f5c924..121204f 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -6,10 +6,11 @@ version: 2 build: - image: latest + os: ubuntu-22.04 + tools: + python: "3.8" python: - version: 3.8 install: - method: setuptools path: . diff --git a/CHANGELOG.md b/CHANGELOG.md index 053a830..8bf1d81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,77 @@ # Change Log +## 0.9.0 + +Fixes: + +* Resend 'version' if empty data received from the Build HAT + +## 0.8.0 + +Adds: + +* Support for custom firmware (see: https://www.raspberrypi.com/news/build-hat-firmware-now-fully-open-source/) +* Add W503 to Flake8 ignore +* Increase interval for sensor while testing - avoids error that occurs when handling large amount of sensor data. In future, might be interesting to see if we can support faster UART baud rate. + +## 0.7.0 + +Adds: + +* Mode 7 IR transmission to ColorDistanceSensor https://github.com/RaspberryPiFoundation/python-build-hat/pull/205 +* Debug log filename access https://github.com/RaspberryPiFoundation/python-build-hat/pull/204 +* Movement counter to WeDo 2.0 Motion Sensor https://github.com/RaspberryPiFoundation/python-build-hat/pull/201 + +## 0.6.0 + +### Added + +* Support for Raspberry Pi 5 (https://github.com/RaspberryPiFoundation/python-build-hat/pull/203) + +## 0.5.12 + +### Added + +* Ability to set custom device interval for callbacks + +### Changed + +* New firmware with mitigation for motor disconnecting and selrate command + +### Removed + +n/a + +## 0.5.11 + +### Added + +* Expose release property to allow motor to hold position +* Updated documentation + +### Changed + +n/a + +### Removed + +n/a + +## 0.5.10 + +### Added + +* Support for 88008 motor +* get_distance support for ColourDistanceSensor + +### Changed + +* Use debug log level for logging + +### Removed + +n/a + ## 0.5.9 ### Added diff --git a/LICENSE b/LICENSE index 78d99a6..a43a2e8 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2020-2021 - Raspberry Pi Foundation +Copyright (c) 2020-2025 - Raspberry Pi Foundation Copyright (c) 2017-2021 - LEGO System A/S - Aastvej 1, 7190 Billund, DK Permission is hereby granted, free of charge, to any person obtaining a copy diff --git a/README.md b/README.md index a20d9d5..0b5968a 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Projects and inspiration: https://projects.raspberrypi.org/en/pathways/lego-intr To install the Build HAT library, enter the following commands in a terminal: - sudo pip3 install buildhat + pip3 install buildhat ## Usage diff --git a/VERSION b/VERSION index 416bfb0..ac39a10 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.5.9 +0.9.0 diff --git a/buildhat/color.py b/buildhat/color.py index edf6030..5959263 100644 --- a/buildhat/color.py +++ b/buildhat/color.py @@ -210,4 +210,4 @@ def wait_for_new_color(self): def on(self): """Turn on the sensor and LED""" - self._write(f"port {self.port} ; plimit 1 ; set -1\r") + self.reverse() diff --git a/buildhat/colordistance.py b/buildhat/colordistance.py index 90f0f94..3c722a3 100644 --- a/buildhat/colordistance.py +++ b/buildhat/colordistance.py @@ -25,6 +25,9 @@ def __init__(self, port): self.mode(6) self.avg_reads = 4 self._old_color = None + self._ir_channel = 0x0 + self._ir_address = 0x0 + self._ir_toggle = 0x0 def segment_color(self, r, g, b): """Return the color name from HSV @@ -115,6 +118,16 @@ def get_reflected_light(self): readings.append(self.get()[0]) return int(sum(readings) / len(readings)) + def get_distance(self): + """Return the distance + + :return: Distance + :rtype: int + """ + self.mode(1) + distance = self.get()[0] + return distance + def _clamp(self, val, small, large): return max(small, min(val, large)) @@ -187,6 +200,337 @@ def wait_for_new_color(self): self.callback(None) return self._old_color + @property + def ir_channel(self): + """Get the IR channel for message transmission""" + return self._ir_channel + + @ir_channel.setter + def ir_channel(self, channel=1): + """ + Set the IR channel for RC Tx + + :param channel: 1-4 indicating the selected IR channel on the reciever + """ + check_chan = channel + if check_chan > 4: + check_chan = 4 + elif check_chan < 1: + check_chan = 1 + # Internally: 0-3 + self._ir_channel = int(check_chan) - 1 + + @property + def ir_address(self): + """IR Address space of 0x0 for default PoweredUp or 0x1 for extra space""" + return self._ir_address + + def toggle_ir_toggle(self): + """Toggle the IR toggle bit""" + # IYKYK, because the RC documents are not clear + if self._ir_toggle: + self._ir_toggle = 0x0 + else: + self._ir_toggle = 0x1 + return self._ir_toggle + + def send_ir_sop(self, port, mode): + """ + Send an IR message via Power Functions RC Protocol in Single Output PWM mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Port B is blue + + Valid values for mode are: + 0x0: Float output + 0x1: Forward/Clockwise at speed 1 + 0x2: Forward/Clockwise at speed 2 + 0x3: Forward/Clockwise at speed 3 + 0x4: Forward/Clockwise at speed 4 + 0x5: Forward/Clockwise at speed 5 + 0x6: Forward/Clockwise at speed 6 + 0x7: Forward/Clockwise at speed 7 + 0x8: Brake (then float v1.20) + 0x9: Backwards/Counterclockwise at speed 7 + 0xA: Backwards/Counterclockwise at speed 6 + 0xB: Backwards/Counterclockwise at speed 5 + 0xC: Backwards/Counterclockwise at speed 4 + 0xD: Backwards/Counterclockwise at speed 3 + 0xE: Backwards/Counterclockwise at speed 2 + 0xF: Backwards/Counterclockwise at speed 1 + + :param port: 'A' or 'B' + :param mode: 0-15 indicating the port's mode to set + """ + escape_modeselect = 0x0 + escape = escape_modeselect + + ir_mode_single_output = 0x4 + ir_mode = ir_mode_single_output + + so_mode_pwm = 0x0 + so_mode = so_mode_pwm + + output_port_a = 0x0 + output_port_b = 0x1 + + output_port = None + if port == 'A' or port == 'a': + output_port = output_port_a + elif port == 'B' or port == 'b': + output_port = output_port_b + else: + return False + + ir_mode = ir_mode | (so_mode << 1) | output_port + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + + # Mode range checked here + return self._send_ir_nibbles(nibble1, nibble2, mode) + + def send_ir_socstid(self, port, mode): + """ + Send an IR message via Power Functions RC Protocol in Single Output Clear/Set/Toggle/Increment/Decrement mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for mode are: + 0x0: Toggle full Clockwise/Forward (Stop to Clockwise, Clockwise to Stop, Counterclockwise to Clockwise) + 0x1: Toggle direction + 0x2: Increment numerical PWM + 0x3: Decrement numerical PWM + 0x4: Increment PWM + 0x5: Decrement PWM + 0x6: Full Clockwise/Forward + 0x7: Full Counterclockwise/Backward + 0x8: Toggle full (defaults to Forward, first) + 0x9: Clear C1 (C1 to High) + 0xA: Set C1 (C1 to Low) + 0xB: Toggle C1 + 0xC: Clear C2 (C2 to High) + 0xD: Set C2 (C2 to Low) + 0xE: Toggle C2 + 0xF: Toggle full Counterclockwise/Backward (Stop to Clockwise, Counterclockwise to Stop, Clockwise to Counterclockwise) + + :param port: 'A' or 'B' + :param mode: 0-15 indicating the port's mode to set + """ + escape_modeselect = 0x0 + escape = escape_modeselect + + ir_mode_single_output = 0x4 + ir_mode = ir_mode_single_output + + so_mode_cstid = 0x1 + so_mode = so_mode_cstid + + output_port_a = 0x0 + output_port_b = 0x1 + + output_port = None + if port == 'A' or port == 'a': + output_port = output_port_a + elif port == 'B' or port == 'b': + output_port = output_port_b + else: + return False + + ir_mode = ir_mode | (so_mode << 1) | output_port + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + + # Mode range checked here + return self._send_ir_nibbles(nibble1, nibble2, mode) + + def send_ir_combo_pwm(self, port_b_mode, port_a_mode): + """ + Send an IR message via Power Functions RC Protocol in Combo PWM mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for the modes are: + 0x0 Float + 0x1 PWM Forward step 1 + 0x2 PWM Forward step 2 + 0x3 PWM Forward step 3 + 0x4 PWM Forward step 4 + 0x5 PWM Forward step 5 + 0x6 PWM Forward step 6 + 0x7 PWM Forward step 7 + 0x8 Brake (then float v1.20) + 0x9 PWM Backward step 7 + 0xA PWM Backward step 6 + 0xB PWM Backward step 5 + 0xC PWM Backward step 4 + 0xD PWM Backward step 3 + 0xE PWM Backward step 2 + 0xF PWM Backward step 1 + + :param port_b_mode: 0-15 indicating the command to send to port B + :param port_a_mode: 0-15 indicating the command to send to port A + """ + escape_combo_pwm = 0x1 + escape = escape_combo_pwm + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + + # Port modes are range checked here + return self._send_ir_nibbles(nibble1, port_b_mode, port_a_mode) + + def send_ir_combo_direct(self, port_b_output, port_a_output): + """ + Send an IR message via Power Functions RC Protocol in Combo Direct mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for the output variables are: + 0x0: Float output + 0x1: Clockwise/Forward + 0x2: Counterclockwise/Backwards + 0x3: Brake then float + + :param port_b_output: 0-3 indicating the output to send to port B + :param port_a_output: 0-3 indicating the output to send to port A + """ + escape_modeselect = 0x0 + escape = escape_modeselect + + ir_mode_combo_direct = 0x1 + ir_mode = ir_mode_combo_direct + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + + if port_b_output > 0x3 or port_a_output > 0x3: + return False + if port_b_output < 0x0 or port_a_output < 0x0: + return False + + nibble3 = (port_b_output << 2) | port_a_output + + return self._send_ir_nibbles(nibble1, nibble2, nibble3) + + def send_ir_extended(self, mode): + """ + Send an IR message via Power Functions RC Protocol in Extended mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for the mode are: + 0x0: Brake Port A (timeout) + 0x1: Increment Speed on Port A + 0x2: Decrement Speed on Port A + + 0x4: Toggle Forward/Clockwise/Float on Port B + + 0x6: Toggle Address bit + 0x7: Align toggle bit + + :param mode: 0-2,4,6-7 + """ + escape_modeselect = 0x0 + escape = escape_modeselect + + ir_mode_extended = 0x0 + ir_mode = ir_mode_extended + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + + if mode < 0x0 or mode == 0x3 or mode == 0x5 or mode > 0x7: + return False + + return self._send_ir_nibbles(nibble1, nibble2, mode) + + def send_ir_single_pin(self, port, pin, mode, timeout): + """ + Send an IR message via Power Functions RC Protocol in Single Pin mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for the mode are: + 0x0: No-op + 0x1: Clear + 0x2: Set + 0x3: Toggle + + Note: The unlabeled IR receiver (vs the one labeled V2) has a "firmware bug in Single Pin mode" + https://www.philohome.com/pfrec/pfrec.htm + + :param port: 'A' or 'B' + :param pin: 1 or 2 + :param mode: 0-3 indicating the pin's mode to set + :param timeout: True or False + """ + escape_mode = 0x0 + escape = escape_mode + + ir_mode_single_continuous = 0x2 + ir_mode_single_timeout = 0x3 + ir_mode = None + if timeout: + ir_mode = ir_mode_single_timeout + else: + ir_mode = ir_mode_single_continuous + + output_port_a = 0x0 + output_port_b = 0x1 + + output_port = None + if port == 'A' or port == 'a': + output_port = output_port_a + elif port == 'B' or port == 'b': + output_port = output_port_b + else: + return False + + if pin != 1 and pin != 2: + return False + pin_value = pin - 1 + + if mode > 0x3 or mode < 0x0: + return False + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + nibble3 = (output_port << 3) | (pin_value << 3) | mode + + return self._send_ir_nibbles(nibble1, nibble2, nibble3) + + def _send_ir_nibbles(self, nibble1, nibble2, nibble3): + + # M7 IR Tx SI = N/A + # format count=1 type=1 chars=5 dp=0 + # RAW: 00000000 0000FFFF PCT: 00000000 00000064 SI: 00000000 0000FFFF + + mode = 7 + self.mode(mode) + + # The upper bits of data[2] are ignored + if nibble1 > 0xF or nibble2 > 0xF or nibble3 > 0xF: + return False + if nibble1 < 0x0 or nibble2 < 0x0 or nibble3 < 0x0: + return False + + byte_two = (nibble2 << 4) | nibble3 + + data = bytearray(3) + data[0] = (0xc << 4) | mode + data[1] = byte_two + data[2] = nibble1 + + # print(" ".join('{:04b}'.format(nibble1))) + # print(" ".join('{:04b}'.format(nibble2))) + # print(" ".join('{:04b}'.format(nibble3))) + # print(" ".join('{:08b}'.format(n) for n in data)) + + self._write1(data) + return True + def on(self): """Turn on the sensor and LED""" - self._write(f"port {self.port} ; plimit 1 ; set -1\r") + self.reverse() diff --git a/buildhat/data/firmware.bin b/buildhat/data/firmware.bin index d29a920..f6dcde3 100644 Binary files a/buildhat/data/firmware.bin and b/buildhat/data/firmware.bin differ diff --git a/buildhat/data/signature.bin b/buildhat/data/signature.bin index 72ba9a3..8f495fa 100644 --- a/buildhat/data/signature.bin +++ b/buildhat/data/signature.bin @@ -1,2 +1 @@ -ñcF½ŒÌ Ðm™ÛÄã°¶v t•?‚#‹îYÁFxQ -é¶H“|#9ÔJÒ‘49ÚÈÁ‚MàG˜ $dIÓ› \ No newline at end of file +ôãNÔ.2šwSm¢Q/I¦Ómï )io\~¦ŸI™Az4’ĘɅñPßã¼wê>\w ¡˜LòŸV™ \ No newline at end of file diff --git a/buildhat/data/version b/buildhat/data/version index f5df97f..1f3109b 100644 --- a/buildhat/data/version +++ b/buildhat/data/version @@ -1 +1 @@ -1643737936 +1737564117 diff --git a/buildhat/devices.py b/buildhat/devices.py index 3b433b2..80dec80 100644 --- a/buildhat/devices.py +++ b/buildhat/devices.py @@ -3,6 +3,7 @@ import os import sys import weakref +from concurrent.futures import Future from .exc import DeviceError from .serinterface import BuildHAT @@ -57,7 +58,9 @@ def __init__(self, port): Device._setup() self._simplemode = -1 self._combimode = -1 + self._modestr = "" self._typeid = self._conn.typeid + self._interval = 10 if ( self._typeid in Device._device_names and Device._device_names[self._typeid][0] != type(self).__name__ # noqa: W503 @@ -69,7 +72,15 @@ def __init__(self, port): def _setup(**kwargs): if Device._instance: return - data = os.path.join(os.path.dirname(sys.modules["buildhat"].__file__), "data/") + if ( + os.path.isdir(os.path.join(os.getcwd(), "data/")) + and os.path.isfile(os.path.join(os.getcwd(), "data", "firmware.bin")) + and os.path.isfile(os.path.join(os.getcwd(), "data", "signature.bin")) + and os.path.isfile(os.path.join(os.getcwd(), "data", "version")) + ): + data = os.path.join(os.getcwd(), "data/") + else: + data = os.path.join(os.path.dirname(sys.modules["buildhat"].__file__), "data/") firm = os.path.join(data, "firmware.bin") sig = os.path.join(data, "signature.bin") ver = os.path.join(data, "version") @@ -185,7 +196,7 @@ def isconnected(self): def reverse(self): """Reverse polarity""" - self._write(f"port {self.port} ; plimit 1 ; set -1\r") + self._write(f"port {self.port} ; port_plimit 1 ; set -1\r") def get(self): """Extract information from device @@ -194,18 +205,11 @@ def get(self): :raises DeviceError: Occurs if device not in valid mode """ self.isconnected() - idx = -1 - if self._simplemode != -1: - idx = self._simplemode - elif self._combimode != -1: - idx = self._combimode - else: + if self._simplemode == -1 and self._combimode == -1: raise DeviceError("Not in simple or combimode") - self._write(f"port {self.port} ; selonce {idx}\r") - # wait for data - with Device._instance.portcond[self.port]: - Device._instance.portcond[self.port].wait() - return self._conn.data + ftr = Future() + self._hat.portftr[self.port].append(ftr) + return ftr.result() def mode(self, modev): """Set combimode or simple mode @@ -214,18 +218,32 @@ def mode(self, modev): """ self.isconnected() if isinstance(modev, list): - self._combimode = 0 modestr = "" for t in modev: modestr += f"{t[0]} {t[1]} " - self._write(f"port {self.port} ; combi {self._combimode} {modestr}\r") + if self._simplemode == -1 and self._combimode == 0 and self._modestr == modestr: + return + self._write(f"port {self.port}; select\r") + self._combimode = 0 + self._write((f"port {self.port} ; combi {self._combimode} {modestr} ; " + f"select {self._combimode} ; " + f"selrate {self._interval}\r")) self._simplemode = -1 + self._modestr = modestr + self._conn.combimode = 0 + self._conn.simplemode = -1 else: + if self._combimode == -1 and self._simplemode == int(modev): + return # Remove combi mode if self._combimode != -1: self._write(f"port {self.port} ; combi {self._combimode}\r") + self._write(f"port {self.port}; select\r") self._combimode = -1 self._simplemode = int(modev) + self._write(f"port {self.port} ; select {int(modev)} ; selrate {self._interval}\r") + self._conn.combimode = -1 + self._conn.simplemode = int(modev) def select(self): """Request data from mode @@ -239,11 +257,11 @@ def select(self): idx = self._combimode else: raise DeviceError("Not in simple or combimode") - self._write(f"port {self.port} ; select {idx}\r") + self._write(f"port {self.port} ; select {idx} ; selrate {self._interval}\r") def on(self): """Turn on sensor""" - self._write(f"port {self.port} ; plimit 1 ; on\r") + self._write(f"port {self.port} ; port_plimit 1 ; on\r") def off(self): """Turn off sensor""" @@ -274,3 +292,28 @@ def callback(self, func): self._conn.callit = None else: self._conn.callit = weakref.WeakMethod(func) + + @property + def interval(self): + """Interval between data points in milliseconds + + :getter: Gets interval + :setter: Sets interval + :return: Device interval + :rtype: int + """ + return self._interval + + @interval.setter + def interval(self, value): + """Interval between data points in milliseconds + + :param value: Interval + :type value: int + :raises DeviceError: Occurs if invalid interval passed + """ + if isinstance(value, int) and value >= 0 and value <= 1000000000: + self._interval = value + self._write(f"port {self.port} ; selrate {self._interval}\r") + else: + raise DeviceError("Invalid interval") diff --git a/buildhat/hat.py b/buildhat/hat.py index 782a87f..0a277eb 100644 --- a/buildhat/hat.py +++ b/buildhat/hat.py @@ -1,5 +1,7 @@ """HAT handling functionality""" +from concurrent.futures import Future + from .devices import Device @@ -39,17 +41,24 @@ def get(self): "description": desc} return devices + def get_logfile(self): + """Get the filename of the debug log (If enabled, None otherwise) + + :return: Path of the debug logfile + :rtype: str or None + """ + return Device._instance.debug_filename + def get_vin(self): """Get the voltage present on the input power jack :return: Voltage on the input power jack :rtype: float """ + ftr = Future() + Device._instance.vinftr.append(ftr) Device._instance.write(b"vin\r") - with Device._instance.vincond: - Device._instance.vincond.wait() - - return Device._instance.vin + return ftr.result() def _set_led(self, intmode): if isinstance(intmode, int) and intmode >= -1 and intmode <= 3: diff --git a/buildhat/light.py b/buildhat/light.py index 7472f5c..1748e55 100644 --- a/buildhat/light.py +++ b/buildhat/light.py @@ -30,4 +30,12 @@ def brightness(self, brightness): """ if not (brightness >= 0 and brightness <= 100): raise LightError("Need brightness arg, of 0 to 100") - self._write(f"port {self.port} ; on ; plimit {brightness / 100.0}\r") + if brightness > 0: + self._write(f"port {self.port} ; on ; set {brightness / 100.0}\r") + else: + self.off() + + def off(self): + """Turn off lights""" + # Using coast to turn off DIY lights completely + self._write(f"port {self.port} ; coast\r") diff --git a/buildhat/motors.py b/buildhat/motors.py index 62f80ea..38786c0 100644 --- a/buildhat/motors.py +++ b/buildhat/motors.py @@ -3,6 +3,7 @@ import threading import time from collections import deque +from concurrent.futures import Future from enum import Enum from threading import Condition @@ -26,7 +27,6 @@ def __init__(self, port): self._default_speed = 20 self._currentspeed = 0 self.plimit(0.7) - self.bias(0.3) def set_default_speed(self, default_speed): """Set the default speed of the motor @@ -71,17 +71,17 @@ def plimit(self, plimit): """ if not (plimit >= 0 and plimit <= 1): raise MotorError("plimit should be 0 to 1") - self._write(f"port {self.port} ; plimit {plimit}\r") + self._write(f"port {self.port} ; port_plimit {plimit}\r") def bias(self, bias): """Bias motor :param bias: Value 0 to 1 :raises MotorError: Occurs if invalid bias value passed - """ - if not (bias >= 0 and bias <= 1): - raise MotorError("bias should be 0 to 1") - self._write(f"port {self.port} ; bias {bias}\r") + + .. deprecated:: 0.6.0 + """ # noqa: RST303 + raise MotorError("Bias no longer available") class MotorRunmode(Enum): @@ -108,9 +108,17 @@ def __init__(self, port): super().__init__(port) self.default_speed = 20 self._currentspeed = 0 - self.mode([(1, 0), (2, 0), (3, 0)]) + if self._typeid in {38}: + self.mode([(1, 0), (2, 0)]) + self._combi = "1 0 2 0" + self._noapos = True + else: + self.mode([(1, 0), (2, 0), (3, 0)]) + self._combi = "1 0 2 0 3 0" + self._noapos = False self.plimit(0.7) - self.bias(0.3) + self.pwmparams(0.65, 0.01) + self._rpm = False self._release = True self._bqueue = deque(maxlen=5) self._cvqueue = Condition() @@ -118,6 +126,13 @@ def __init__(self, port): self._oldpos = None self._runmode = MotorRunmode.NONE + def set_speed_unit_rpm(self, rpm=False): + """Set whether to use RPM for speed units or not + + :param rpm: Boolean to determine whether to use RPM for units + """ + self._rpm = rpm + def set_default_speed(self, default_speed): """Set the default speed of the motor @@ -160,7 +175,10 @@ def _run_to_position(self, degrees, speed, direction): self._runmode = MotorRunmode.DEGREES data = self.get() pos = data[1] - apos = data[2] + if self._noapos: + apos = pos + else: + apos = data[2] diff = (degrees - apos + 180) % 360 - 180 newpos = (pos + diff) / 360 v1 = (degrees - apos) % 360 @@ -189,14 +207,18 @@ def _run_positional_ramp(self, pos, newpos, speed): :param newpos: New motor postion in decimal rotations (from preset position) :param speed: -100 to 100 """ - # Collapse speed range to -5 to 5 - speed *= 0.05 + if self._rpm: + speed = self._speed_process(speed) + else: + speed *= 0.05 # Collapse speed range to -5 to 5 dur = abs((newpos - pos) / speed) - cmd = (f"port {self.port}; combi 0 1 0 2 0 3 0 ; select 0 ; pid {self.port} 0 1 s4 0.0027777778 0 5 0 .1 3 ; " + cmd = (f"port {self.port}; select 0 ; selrate {self._interval}; " + f"pid {self.port} 0 1 s4 0.0027777778 0 5 0 .1 3 0.01; " f"set ramp {pos} {newpos} {dur} 0\r") + ftr = Future() + self._hat.rampftr[self.port].append(ftr) self._write(cmd) - with self._hat.rampcond[self.port]: - self._hat.rampcond[self.port].wait() + ftr.result() if self._release: time.sleep(0.2) self.coast() @@ -217,10 +239,9 @@ def run_for_degrees(self, degrees, speed=None, blocking=True): if not (speed >= -100 and speed <= 100): raise MotorError("Invalid Speed") if not blocking: - th = threading.Thread(target=self._run_for_degrees, args=(degrees, speed)) - th.daemon = True - th.start() + self._queue((self._run_for_degrees, (degrees, speed))) else: + self._wait_for_nonblocking() self._run_for_degrees(degrees, speed) def run_to_position(self, degrees, speed=None, blocking=True, direction="shortest"): @@ -240,19 +261,25 @@ def run_to_position(self, degrees, speed=None, blocking=True, direction="shortes if degrees < -180 or degrees > 180: raise MotorError("Invalid angle") if not blocking: - th = threading.Thread(target=self._run_to_position, args=(degrees, speed, direction)) - th.daemon = True - th.start() + self._queue((self._run_to_position, (degrees, speed, direction))) else: + self._wait_for_nonblocking() self._run_to_position(degrees, speed, direction) def _run_for_seconds(self, seconds, speed): + speed = self._speed_process(speed) self._runmode = MotorRunmode.SECONDS - cmd = (f"port {self.port} ; combi 0 1 0 2 0 3 0 ; select 0 ; pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100; " + if self._rpm: + pid = f"pid_diff {self.port} 0 5 s2 0.0027777778 1 0 2.5 0 .4 0.01; " + else: + pid = f"pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100 0.01;" + cmd = (f"port {self.port} ; select 0 ; selrate {self._interval}; " + f"{pid}" f"set pulse {speed} 0.0 {seconds} 0\r") + ftr = Future() + self._hat.pulseftr[self.port].append(ftr) self._write(cmd) - with self._hat.pulsecond[self.port]: - self._hat.pulsecond[self.port].wait() + ftr.result() if self._release: self.coast() self._runmode = MotorRunmode.NONE @@ -271,10 +298,9 @@ def run_for_seconds(self, seconds, speed=None, blocking=True): if not (speed >= -100 and speed <= 100): raise MotorError("Invalid Speed") if not blocking: - th = threading.Thread(target=self._run_for_seconds, args=(seconds, speed)) - th.daemon = True - th.start() + self._queue((self._run_for_seconds, (seconds, speed))) else: + self._wait_for_nonblocking() self._run_for_seconds(seconds, speed) def start(self, speed=None): @@ -283,6 +309,7 @@ def start(self, speed=None): :param speed: Speed ranging from -100 to 100 :raises MotorError: Occurs when invalid speed specified """ + self._wait_for_nonblocking() if self._runmode == MotorRunmode.FREE: if self._currentspeed == speed: # Already running at this speed, do nothing @@ -296,9 +323,15 @@ def start(self, speed=None): else: if not (speed >= -100 and speed <= 100): raise MotorError("Invalid Speed") + speed = self._speed_process(speed) cmd = f"port {self.port} ; set {speed}\r" if self._runmode == MotorRunmode.NONE: - cmd = (f"port {self.port} ; combi 0 1 0 2 0 3 0 ; select 0 ; pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100; " + if self._rpm: + pid = f"pid_diff {self.port} 0 5 s2 0.0027777778 1 0 2.5 0 .4 0.01; " + else: + pid = f"pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100 0.01; " + cmd = (f"port {self.port} ; select 0 ; selrate {self._interval}; " + f"{pid}" f"set {speed}\r") self._runmode = MotorRunmode.FREE self._currentspeed = speed @@ -306,6 +339,7 @@ def start(self, speed=None): def stop(self): """Stop motor""" + self._wait_for_nonblocking() self._runmode = MotorRunmode.NONE self._currentspeed = 0 self.coast() @@ -324,7 +358,10 @@ def get_aposition(self): :return: Absolute position of motor from -180 to 180 :rtype: int """ - return self.get()[2] + if self._noapos: + raise MotorError("No absolute position with this motor") + else: + return self.get()[2] def get_speed(self): """Get speed of motor @@ -346,7 +383,11 @@ def when_rotated(self): return self._when_rotated def _intermediate(self, data): - speed, pos, apos = data + if self._noapos: + speed, pos = data + apos = None + else: + speed, pos, apos = data if self._oldpos is None: self._oldpos = pos return @@ -372,17 +413,30 @@ def plimit(self, plimit): """ if not (plimit >= 0 and plimit <= 1): raise MotorError("plimit should be 0 to 1") - self._write(f"port {self.port} ; plimit {plimit}\r") + self._write(f"port {self.port} ; port_plimit {plimit}\r") def bias(self, bias): """Bias motor :param bias: Value 0 to 1 :raises MotorError: Occurs if invalid bias value passed + + .. deprecated:: 0.6.0 + """ # noqa: RST303 + raise MotorError("Bias no longer available") + + def pwmparams(self, pwmthresh, minpwm): + """PWM thresholds + + :param pwmthresh: Value 0 to 1, threshold below, will switch from fast to slow, PWM + :param minpwm: Value 0 to 1, threshold below which it switches off the drive altogether + :raises MotorError: Occurs if invalid values are passed """ - if not (bias >= 0 and bias <= 1): - raise MotorError("bias should be 0 to 1") - self._write(f"port {self.port} ; bias {bias}\r") + if not (pwmthresh >= 0 and pwmthresh <= 1): + raise MotorError("pwmthresh should be 0 to 1") + if not (minpwm >= 0 and minpwm <= 1): + raise MotorError("minpwm should be 0 to 1") + self._write(f"port {self.port} ; pwmparams {pwmthresh} {minpwm}\r") def pwm(self, pwmv): """PWM motor @@ -402,6 +456,42 @@ def float(self): """Float motor""" self.pwm(0) + @property + def release(self): + """Determine if motor is released after running, so can be turned by hand + + :getter: Returns whether motor is released, so can be turned by hand + :setter: Sets whether motor is released, so can be turned by hand + :return: Whether motor is released, so can be turned by hand + :rtype: bool + """ + return self._release + + @release.setter + def release(self, value): + """Determine if the motor is released after running, so can be turned by hand + + :param value: Whether motor should be released, so can be turned by hand + :type value: bool + """ + if not isinstance(value, bool): + raise MotorError("Must pass boolean") + self._release = value + + def _queue(self, cmd): + Device._instance.motorqueue[self.port].put(cmd) + + def _wait_for_nonblocking(self): + """Wait for nonblocking commands to finish""" + Device._instance.motorqueue[self.port].join() + + def _speed_process(self, speed): + """Lower speed value""" + if self._rpm: + return speed / 60 + else: + return speed + class MotorPair: """Pair of motors @@ -421,6 +511,8 @@ def __init__(self, leftport, rightport): self._leftmotor = Motor(leftport) self._rightmotor = Motor(rightport) self.default_speed = 20 + self._release = True + self._rpm = False def set_default_speed(self, default_speed): """Set the default speed of the motor @@ -429,6 +521,15 @@ def set_default_speed(self, default_speed): """ self.default_speed = default_speed + def set_speed_unit_rpm(self, rpm=False): + """Set whether to use RPM for speed units or not + + :param rpm: Boolean to determine whether to use RPM for units + """ + self._rpm = rpm + self._leftmotor.set_speed_unit_rpm(rpm) + self._rightmotor.set_speed_unit_rpm(rpm) + def run_for_rotations(self, rotations, speedl=None, speedr=None): """Run pair of motors for N rotations @@ -520,3 +621,27 @@ def run_to_position(self, degreesl, degreesr, speed=None, direction="shortest"): th2.start() th1.join() th2.join() + + @property + def release(self): + """Determine if motors are released after running, so can be turned by hand + + :getter: Returns whether motors are released, so can be turned by hand + :setter: Sets whether motors are released, so can be turned by hand + :return: Whether motors are released, so can be turned by hand + :rtype: bool + """ + return self._release + + @release.setter + def release(self, value): + """Determine if motors are released after running, so can be turned by hand + + :param value: Whether motors should be released, so can be turned by hand + :type value: bool + """ + if not isinstance(value, bool): + raise MotorError("Must pass boolean") + self._release = value + self._leftmotor.release = value + self._rightmotor.release = value diff --git a/buildhat/serinterface.py b/buildhat/serinterface.py index 14e6803..901dc2e 100644 --- a/buildhat/serinterface.py +++ b/buildhat/serinterface.py @@ -1,6 +1,7 @@ """Build HAT handling functionality""" import logging +import os import queue import tempfile import threading @@ -31,6 +32,8 @@ def __init__(self): self.typeid = -1 self.connected = False self.callit = None + self.simplemode = -1 + self.combimode = -1 def update(self, typeid, connected, callit=None): """Update connection information for port @@ -84,39 +87,39 @@ def __init__(self, firmware, signature, version, device="/dev/serial0", debug=Fa self.cond = Condition() self.state = HatState.OTHER self.connections = [] - self.portcond = [] - self.pulsecond = [] - self.rampcond = [] + self.portftr = [] + self.pulseftr = [] + self.rampftr = [] + self.vinftr = [] + self.motorqueue = [] self.fin = False self.running = True - self.vincond = Condition() - self.vin = None + self.debug_filename = None if debug: tmp = tempfile.NamedTemporaryFile(suffix=".log", prefix="buildhat-", delete=False) + self.debug_filename = tmp.name logging.basicConfig(filename=tmp.name, format='%(asctime)s %(message)s', - level=logging.INFO) + level=logging.DEBUG) for _ in range(4): self.connections.append(Connection()) - self.portcond.append(Condition()) - self.pulsecond.append(Condition()) - self.rampcond.append(Condition()) - + self.portftr.append([]) + self.pulseftr.append([]) + self.rampftr.append([]) + self.motorqueue.append(queue.Queue()) + + # On a Pi 5 /dev/serial0 will point to /dev/ttyAMA10 (which *only* + # exists on a Pi 5, and is the 3-pin debug UART connector) + # The UART on the Pi 5 GPIO header is /dev/ttyAMA0 + if device == "/dev/serial0" and os.readlink(device) == "ttyAMA10": + device = "/dev/ttyAMA0" self.ser = serial.Serial(device, 115200, timeout=5) # Check if we're in the bootloader or the firmware self.write(b"version\r") - emptydata = 0 incdata = 0 while True: line = self.read() - if len(line) == 0: - # Didn't receive any data - emptydata += 1 - if emptydata > 3: - break - else: - continue if cmp(line, BuildHAT.FIRMWARE): self.state = HatState.FIRMWARE ver = line[len(BuildHAT.FIRMWARE):].split(' ') @@ -150,15 +153,22 @@ def __init__(self, firmware, signature, version, device="/dev/serial0", debug=Fa self.cb.daemon = True self.cb.start() + for q in self.motorqueue: + ml = threading.Thread(target=self.motorloop, args=(q,)) + ml.daemon = True + ml.start() + # Drop timeout value to 1s + listevt = threading.Event() self.ser.timeout = 1 - self.th = threading.Thread(target=self.loop, args=(self.cond, self.state == HatState.FIRMWARE, self.cbqueue)) + self.th = threading.Thread(target=self.loop, args=(self.cond, self.state == HatState.FIRMWARE, self.cbqueue, listevt)) self.th.daemon = True self.th.start() if self.state == HatState.FIRMWARE: self.write(b"port 0 ; select ; port 1 ; select ; port 2 ; select ; port 3 ; select ; echo 0\r") self.write(b"list\r") + listevt.set() elif self.state == HatState.NEEDNEWFIRMWARE or self.state == HatState.BOOTLOADER: self.write(b"reboot\r") @@ -239,9 +249,9 @@ def write(self, data, log=True, replace=""): self.ser.write(data) if not self.fin and log: if replace != "": - logging.info(f"> {replace}") + logging.debug(f"> {replace}") else: - logging.info(f"> {data.decode('utf-8', 'ignore').strip()}") + logging.debug(f"> {data.decode('utf-8', 'ignore').strip()}") def read(self): """Read data from the serial port of Build HAT @@ -254,7 +264,7 @@ def read(self): except serial.SerialException: pass if line != "": - logging.info(f"< {line}") + logging.debug(f"< {line}") return line def shutdown(self): @@ -264,6 +274,8 @@ def shutdown(self): self.running = False self.th.join() self.cbqueue.put(()) + for q in self.motorqueue: + q.put((None, None)) self.cb.join() turnoff = "" for p in range(4): @@ -276,6 +288,21 @@ def shutdown(self): self.write(f"{turnoff}\r".encode()) self.write(b"port 0 ; select ; port 1 ; select ; port 2 ; select ; port 3 ; select ; echo 0\r") + def motorloop(self, q): + """Event handling for non-blocking motor commands + + :param q: Queue of motor functions + """ + while self.running: + func, data = q.get() + if func is None: + break + else: + func(*data) + func = None # Necessary for 'del' to function correctly on motor object + data = None + q.task_done() + def callbackloop(self, q): """Event handling for callbacks @@ -292,7 +319,7 @@ def callbackloop(self, q): cb[0]()(cb[1]) q.task_done() - def loop(self, cond, uselist, q): + def loop(self, cond, uselist, q, listevt): """Event handling for Build HAT :param cond: Condition used to block user's script till we're ready @@ -312,12 +339,12 @@ def loop(self, cond, uselist, q): self.connections[portid].update(typeid, True) if typeid == 64: self.write(f"port {portid} ; on\r".encode()) - if uselist: + if uselist and listevt.is_set(): count += 1 elif cmp(msg, BuildHAT.CONNECTEDPASSIVE): typeid = int(line[2 + len(BuildHAT.CONNECTEDPASSIVE):], 16) self.connections[portid].update(typeid, True) - if uselist: + if uselist and listevt.is_set(): count += 1 elif cmp(msg, BuildHAT.DISCONNECTED): self.connections[portid].update(-1, False) @@ -325,14 +352,14 @@ def loop(self, cond, uselist, q): self.connections[portid].update(-1, False) elif cmp(msg, BuildHAT.NOTCONNECTED): self.connections[portid].update(-1, False) - if uselist: + if uselist and listevt.is_set(): count += 1 elif cmp(msg, BuildHAT.RAMPDONE): - with self.rampcond[portid]: - self.rampcond[portid].notify() + ftr = self.rampftr[portid].pop() + ftr.set_result(True) elif cmp(msg, BuildHAT.PULSEDONE): - with self.pulsecond[portid]: - self.pulsecond[portid].notify() + ftr = self.pulseftr[portid].pop() + ftr.set_result(True) if uselist and count == 4: with cond: @@ -343,7 +370,7 @@ def loop(self, cond, uselist, q): def runit(): with cond: cond.notify() - t = Timer(8.0, runit) + t = Timer(11.0, runit) t.start() if line[0] == "P" and (line[2] == "C" or line[2] == "M"): @@ -356,15 +383,22 @@ def runit(): else: if d != "": newdata.append(int(d)) + # Check data was for our current mode + if line[2] == "M" and self.connections[portid].simplemode != int(line[3]): + continue + elif line[2] == "C" and self.connections[portid].combimode != int(line[3]): + continue callit = self.connections[portid].callit if callit is not None: q.put((callit, newdata)) self.connections[portid].data = newdata - with self.portcond[portid]: - self.portcond[portid].notify() + try: + ftr = self.portftr[portid].pop() + ftr.set_result(newdata) + except IndexError: + pass if len(line) >= 5 and line[1] == "." and line.endswith(" V"): vin = float(line.split(" ")[0]) - self.vin = vin - with self.vincond: - self.vincond.notify() + ftr = self.vinftr.pop() + ftr.set_result(vin) diff --git a/buildhat/wedo.py b/buildhat/wedo.py index df99dd6..372524e 100644 --- a/buildhat/wedo.py +++ b/buildhat/wedo.py @@ -36,6 +36,8 @@ class MotionSensor(Device): :raises DeviceError: Occurs if there is no motion sensor attached to port """ + default_mode = 0 + def __init__(self, port): """ Initialise motion sensor @@ -43,7 +45,19 @@ def __init__(self, port): :param port: Port of device """ super().__init__(port) - self.mode(0) + self.mode(self.default_mode) + + def set_default_data_mode(self, mode): + """ + Set the mode most often queried from this device. + + This significantly improves performance when repeatedly accessing data + + :param mode: 0 for distance (default), 1 for movement count + """ + if mode == 1 or mode == 0: + self.default_mode = mode + self.mode(mode) def get_distance(self): """ @@ -52,4 +66,26 @@ def get_distance(self): :return: Distance from motion sensor :rtype: int """ - return self.get()[0] + return self._get_data_from_mode(0) + + def get_movement_count(self): + """ + Return the movement counter + + This is the count of how many times the sensor has detected an object + that moved within 4 blocks of the sensor since the sensor has been + plugged in or the BuildHAT reset + + :return: Count of objects detected + :rtype: int + """ + return self._get_data_from_mode(1) + + def _get_data_from_mode(self, mode): + if self.default_mode == mode: + return self.get()[0] + else: + self.mode(mode) + retval = self.get()[0] + self.mode(self.default_mode) + return retval diff --git a/docs/buildhat/colordistance.py b/docs/buildhat/colordistance.py index 8a73964..d8a3466 100755 --- a/docs/buildhat/colordistance.py +++ b/docs/buildhat/colordistance.py @@ -4,6 +4,7 @@ color = ColorDistanceSensor('C') +print("Distance", color.get_distance()) print("RGBI", color.get_color_rgb()) print("Ambient", color.get_ambient_light()) print("Reflected", color.get_reflected_light()) diff --git a/docs/buildhat/index.rst b/docs/buildhat/index.rst index 9ac86b4..f4f7fc4 100644 --- a/docs/buildhat/index.rst +++ b/docs/buildhat/index.rst @@ -28,10 +28,51 @@ power supply. For best results, use the `official Raspberry Pi Build HAT power s .. _official Raspberry Pi Build HAT power supply: http://raspberrypi.com/products/build-hat-power-supply +It is now possible to use custom firmware with the library. To do this you can follow the steps below. + +.. code-block:: + :caption: Using custom firmware + + sudo apt install cmake python3 build-essential gcc-arm-none-eabi libnewlib-arm-none-eabi libstdc++-arm-none-eabi-newlib + + git clone https://github.com/raspberrypi/pico-sdk.git --recursive + git clone https://github.com/raspberrypi/buildhat.git --recursive + + cd buildhat + export PICO_SDK_PATH="$(pwd)/../pico-sdk/" + make + + cd .. + mkdir test + cd test + mkdir data + + cp ../buildhat/firmware-pico/build/main.bin data/firmware.bin + cp ../buildhat/bhbl-pico/signature.bin data/signature.bin + cat ../buildhat/firmware-pico/version.h | sed 's/#define FWVERSION "//g; s/ .*//g' > data/version + +Then place your script, such as the following, within the test/ directory. + +.. code-block:: + :caption: Create test.py in test directory + + import time + from buildhat import Motor + + m = Motor('A') + m.start() + + time.sleep(5) + +Then use: ``python test.py`` in the test directory, to run your script with your custom firmware. + +Note if you want python to always reload the firmware from your **data/** directory each time +you run your script, simply write the value: -1 to **data/version**. + .. warning:: The API for the Build HAT is undergoing active development and is subject - to change. + to change. .. toctree:: :maxdepth: 2 @@ -40,9 +81,11 @@ power supply. For best results, use the `official Raspberry Pi Build HAT power s colordistancesensor.rst distancesensor.rst forcesensor.rst + light.rst matrix.rst + motionsensor.rst motor.rst motorpair.rst passivemotor.rst + tiltsensor.rst hat.rst - light.rst diff --git a/docs/buildhat/motion.py b/docs/buildhat/motion.py new file mode 100755 index 0000000..cf48829 --- /dev/null +++ b/docs/buildhat/motion.py @@ -0,0 +1,11 @@ +"""Example using motion sensor""" + +from time import sleep + +from buildhat import MotionSensor + +motion = MotionSensor('A') + +for _ in range(50): + print(motion.get_distance()) + sleep(0.1) diff --git a/docs/buildhat/motionsensor.rst b/docs/buildhat/motionsensor.rst new file mode 100644 index 0000000..e9d0f5f --- /dev/null +++ b/docs/buildhat/motionsensor.rst @@ -0,0 +1,19 @@ +Motion Sensor +============= + +|location_link| + +.. |location_link| raw:: html + + BrickLink item + +.. autoclass:: buildhat.MotionSensor + :members: + :inherited-members: + +Example +------- + +.. literalinclude:: motion.py + + diff --git a/docs/buildhat/passivemotor.rst b/docs/buildhat/passivemotor.rst index 2b57b20..357055e 100644 --- a/docs/buildhat/passivemotor.rst +++ b/docs/buildhat/passivemotor.rst @@ -6,7 +6,7 @@ LEGO® TECHNICâ„¢ motors which do not have an integrated rotation sensor (encode .. |location_link1| raw:: html - LEGO® Train motor 88011 + LEGO® Train motor 88011 .. |location_link1b| raw:: html diff --git a/docs/buildhat/tilt.py b/docs/buildhat/tilt.py new file mode 100755 index 0000000..36ec8d1 --- /dev/null +++ b/docs/buildhat/tilt.py @@ -0,0 +1,11 @@ +"""Example using tilt sensor""" + +from time import sleep + +from buildhat import TiltSensor + +tilt = TiltSensor('A') + +for _ in range(50): + print(tilt.get_tilt()) + sleep(0.1) diff --git a/docs/buildhat/tiltsensor.rst b/docs/buildhat/tiltsensor.rst new file mode 100644 index 0000000..e428882 --- /dev/null +++ b/docs/buildhat/tiltsensor.rst @@ -0,0 +1,19 @@ +Tilt Sensor +=========== + +|location_link| + +.. |location_link| raw:: html + + BrickLink item + +.. autoclass:: buildhat.TiltSensor + :members: + :inherited-members: + +Example +------- + +.. literalinclude:: tilt.py + + diff --git a/docs/conf.py b/docs/conf.py index e17f1fd..aae1e0d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -59,7 +59,7 @@ # General information about the project. project = 'Raspberry Pi Build HAT' -copyright = '''2020-2021 - Raspberry Pi Foundation; +copyright = '''2020-2025 - Raspberry Pi Foundation; 2017-2020 - LEGO System A/S - Aastvej 1, 7190 Billund, DK.''' # The version info for the project you're documenting, acts as replacement for diff --git a/docs/license.rst b/docs/license.rst index 2ad53fd..af5c040 100644 --- a/docs/license.rst +++ b/docs/license.rst @@ -3,7 +3,7 @@ License information The MIT License (MIT) -Copyright (c) 2020-2021 - Raspberry Pi Foundation +Copyright (c) 2020-2025 - Raspberry Pi Foundation Copyright (c) 2017-2021 - LEGO System A/S - Aastvej 1, 7190 Billund, DK diff --git a/requirements-test.txt b/requirements-test.txt index b6d3acc..8d7b912 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,16 +1,10 @@ -flake8 -flake8-bandit -flake8-broken-line -flake8-bugbear -flake8-commas -flake8-comprehensions +flake8>=6.0.0 +flake8-bugbear>=22.10.27 +flake8-comprehensions>=3.10 flake8-debugger -flake8-docstrings -flake8-eradicate -flake8-isort -flake8-isort +flake8-docstrings>=1.6.0 +flake8-isort>=5.0 flake8-pylint -flake8-quotes flake8-rst-docstrings flake8-string-format gpiozero diff --git a/setup.py b/setup.py index fe788ea..38ae4f1 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ """Setup file""" -# Copyright (c) 2020-2021 Raspberry Pi Foundation +# Copyright (c) 2020-2025 Raspberry Pi Foundation # # SPDX-License-Identifier: MIT diff --git a/test/color.py b/test/color.py new file mode 100644 index 0000000..ba23ff2 --- /dev/null +++ b/test/color.py @@ -0,0 +1,47 @@ +"""Test Color Sensor functionality""" +import time +import unittest + +from buildhat import ColorSensor + + +class TestColor(unittest.TestCase): + """Test color sensor functions""" + + def test_color_interval(self): + """Test color sensor interval""" + color = ColorSensor('A') + color.avg_reads = 1 + color.interval = 10 + count = 1000 + expected_dur = count * color.interval * 1e-3 + + start = time.time() + for _ in range(count): + color.get_ambient_light() + end = time.time() + diff = abs((end - start) - expected_dur) + self.assertLess(diff, 0.25) + + start = time.time() + for _ in range(count): + color.get_color_rgbi() + end = time.time() + diff = abs((end - start) - expected_dur) + self.assertLess(diff, 0.25) + + def test_caching(self): + """Test to make sure we're not reading cached data""" + color = ColorSensor('A') + color.avg_reads = 1 + color.interval = 10 + + for _ in range(100): + color.mode(2) + self.assertEqual(len(color.get()), 1) + color.mode(5) + self.assertEqual(len(color.get()), 4) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/motors.py b/test/motors.py index 4d3275f..46d01ec 100644 --- a/test/motors.py +++ b/test/motors.py @@ -3,13 +3,15 @@ import time import unittest -from buildhat import Motor +from buildhat import Hat, Motor from buildhat.exc import DeviceError, MotorError class TestMotor(unittest.TestCase): """Test motors""" + THRESHOLD_DISTANCE = 15 + def test_rotations(self): """Test motor rotating""" m = Motor('A') @@ -19,18 +21,83 @@ def test_rotations(self): rotated = (pos2 - pos1) / 360 self.assertLess(abs(rotated - 2), 0.5) + def test_nonblocking(self): + """Test motor nonblocking mode""" + m = Motor('A') + m.set_default_speed(10) + last = 0 + for delay in [1, 0]: + for _ in range(3): + m.run_to_position(90, blocking=False) + time.sleep(delay) + m.run_to_position(90, blocking=False) + time.sleep(delay) + m.run_to_position(90, blocking=False) + time.sleep(delay) + m.run_to_position(last, blocking=False) + time.sleep(delay) + # Wait for a bit, before reading last position + time.sleep(7) + pos1 = m.get_aposition() + diff = abs((last - pos1 + 180) % 360 - 180) + self.assertLess(diff, self.THRESHOLD_DISTANCE) + + def test_nonblocking_multiple(self): + """Test motor nonblocking mode""" + m1 = Motor('A') + m1.set_default_speed(10) + m2 = Motor('B') + m2.set_default_speed(10) + last = 0 + for delay in [1, 0]: + for _ in range(3): + m1.run_to_position(90, blocking=False) + m2.run_to_position(90, blocking=False) + time.sleep(delay) + m1.run_to_position(90, blocking=False) + m2.run_to_position(90, blocking=False) + time.sleep(delay) + m1.run_to_position(90, blocking=False) + m2.run_to_position(90, blocking=False) + time.sleep(delay) + m1.run_to_position(last, blocking=False) + m2.run_to_position(last, blocking=False) + time.sleep(delay) + # Wait for a bit, before reading last position + time.sleep(7) + pos1 = m1.get_aposition() + diff = abs((last - pos1 + 180) % 360 - 180) + self.assertLess(diff, self.THRESHOLD_DISTANCE) + pos2 = m2.get_aposition() + diff = abs((last - pos2 + 180) % 360 - 180) + self.assertLess(diff, self.THRESHOLD_DISTANCE) + + def test_nonblocking_mixed(self): + """Test motor nonblocking mode mixed with blocking mode""" + m = Motor('A') + m.run_for_seconds(5, blocking=False) + m.run_for_degrees(360) + m.run_for_seconds(5, blocking=False) + m.run_to_position(180) + m.run_for_seconds(5, blocking=False) + m.run_for_seconds(5) + m.run_for_seconds(5, blocking=False) + m.start() + m.run_for_seconds(5, blocking=False) + m.stop() + def test_position(self): """Test motor goes to desired position""" m = Motor('A') m.run_to_position(0) pos1 = m.get_aposition() diff = abs((0 - pos1 + 180) % 360 - 180) - self.assertLess(diff, 10) + self.assertLess(diff, self.THRESHOLD_DISTANCE) m.run_to_position(180) pos1 = m.get_aposition() diff = abs((180 - pos1 + 180) % 360 - 180) - self.assertLess(diff, 10) + self.assertLess(diff, self.THRESHOLD_DISTANCE) def test_time(self): """Test motor runs for correct duration""" @@ -54,13 +121,6 @@ def test_plimit(self): self.assertRaises(MotorError, m.plimit, -1) self.assertRaises(MotorError, m.plimit, 2) - def test_bias(self): - """Test setting motor bias""" - m = Motor('A') - m.bias(0.5) - self.assertRaises(MotorError, m.bias, -1) - self.assertRaises(MotorError, m.bias, 2) - def test_pwm(self): """Test PWMing motor""" m = Motor('A') @@ -81,6 +141,18 @@ def handle_motor(speed, pos, apos): m.run_for_seconds(1) self.assertGreater(handle_motor.evt, 0) + def test_callback_interval(self): + """Test setting callback and interval""" + m = Motor('A') + m.interval = 10 + + def handle_motor(speed, pos, apos): + handle_motor.evt += 1 + handle_motor.evt = 0 + m.when_rotated = handle_motor + m.run_for_seconds(5) + self.assertGreater(handle_motor.evt, 0.8 * ((1 / ((m.interval) * 1e-3)) * 5)) + def test_none_callback(self): """Test setting empty callback""" m = Motor('A') @@ -104,22 +176,67 @@ def test_continuous_start(self): """Test starting motor for 5mins""" t = time.time() + (60 * 5) m = Motor('A') + toggle = 0 while time.time() < t: - m.start(0) + m.start(toggle) + toggle ^= 1 def test_continuous_degrees(self): """Test setting degrees for 5mins""" t = time.time() + (60 * 5) m = Motor('A') + toggle = 0 while time.time() < t: - m.run_for_degrees(0) + m.run_for_degrees(toggle) + toggle ^= 1 def test_continuous_position(self): """Test setting position of motor for 5mins""" t = time.time() + (60 * 5) m = Motor('A') + toggle = 0 while time.time() < t: - m.run_to_position(0) + m.run_to_position(toggle) + toggle ^= 1 + + def test_continuous_feedback(self): + """Test feedback of motor for 30mins""" + Hat(debug=True) + t = time.time() + (60 * 30) + m = Motor('A') + m.start(40) + while time.time() < t: + _ = (m.get_speed(), m.get_position(), m.get_aposition()) + + def test_interval(self): + """Test motor interval""" + m = Motor('A') + m.interval = 10 + count = 1000 + expected_dur = count * m.interval * 1e-3 + start = time.time() + for _ in range(count): + m.get_position() + end = time.time() + diff = abs((end - start) - expected_dur) + self.assertLess(diff, expected_dur * 0.1) + + def test_dual_interval(self): + """Test dual motor interval""" + m1 = Motor('A') + m2 = Motor('B') + for interval in [20, 10]: + m1.interval = interval + m2.interval = interval + count = 1000 + expected_dur = count * m1.interval * 1e-3 + start = time.time() + for _ in range(count): + m1.get_position() + m2.get_position() + end = time.time() + diff = abs((end - start) - expected_dur) + self.assertLess(diff, expected_dur * 0.1) if __name__ == '__main__':