diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..8f58749 --- /dev/null +++ b/.flake8 @@ -0,0 +1,7 @@ +[flake8] +docstring_style=sphinx +max-line-length = 127 +ignore = D400, Q000, S311, W503, PLW, PLC, PLR +per-file-ignores = + buildhat/__init__.py:F401 +exclude = docs/conf.py, docs/sphinxcontrib/cmtinc-buildhat.py, docs/sphinx_selective_exclude/*.py diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml new file mode 100644 index 0000000..57a67fb --- /dev/null +++ b/.github/workflows/python-app.yml @@ -0,0 +1,36 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Python application + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v2 + with: + python-version: 3.9 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + - name: Install dependencies for testing + run: | + if [ -f requirements-test.txt ]; then pip install -r requirements-test.txt; fi + - name: set pythonpath + run: | + echo "PYTHONPATH=." >> $GITHUB_ENV + - name: Lint with flake8 + run: | + flake8 . --version + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --show-source --statistics diff --git a/.gitignore b/.gitignore index 1219d4e..7112796 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,7 @@ share/python-wheels/ .installed.cfg *.egg MANIFEST +.pypirc # Installer logs pip-log.txt @@ -68,3 +69,7 @@ venv.bak/ # pytype static type analyzer .pytype/ + +# ide +.idea/ +*.iml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..bb16389 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,17 @@ +repos: +- repo: https://github.com/PyCQA/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + additional_dependencies: [ + 'flake8-bugbear>=22.10.27', + 'flake8-comprehensions>=3.10', + 'flake8-debugger', + 'flake8-docstrings>=1.6.0', + 'flake8-isort>=5.0', + 'flake8-pylint', + 'flake8-rst-docstrings', + 'flake8-string-format', + 'gpiozero', + 'pyserial' + ] diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 3f5c924..121204f 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -6,10 +6,11 @@ version: 2 build: - image: latest + os: ubuntu-22.04 + tools: + python: "3.8" python: - version: 3.8 install: - method: setuptools path: . diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ca599c..8bf1d81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,201 @@ # Change Log +## 0.9.0 + +Fixes: + +* Resend 'version' if empty data received from the Build HAT + +## 0.8.0 + +Adds: + +* Support for custom firmware (see: https://www.raspberrypi.com/news/build-hat-firmware-now-fully-open-source/) +* Add W503 to Flake8 ignore +* Increase interval for sensor while testing - avoids error that occurs when handling large amount of sensor data. In future, might be interesting to see if we can support faster UART baud rate. + +## 0.7.0 + +Adds: + +* Mode 7 IR transmission to ColorDistanceSensor https://github.com/RaspberryPiFoundation/python-build-hat/pull/205 +* Debug log filename access https://github.com/RaspberryPiFoundation/python-build-hat/pull/204 +* Movement counter to WeDo 2.0 Motion Sensor https://github.com/RaspberryPiFoundation/python-build-hat/pull/201 + +## 0.6.0 + +### Added + +* Support for Raspberry Pi 5 (https://github.com/RaspberryPiFoundation/python-build-hat/pull/203) + +## 0.5.12 + +### Added + +* Ability to set custom device interval for callbacks + +### Changed + +* New firmware with mitigation for motor disconnecting and selrate command + +### Removed + +n/a + +## 0.5.11 + +### Added + +* Expose release property to allow motor to hold position +* Updated documentation + +### Changed + +n/a + +### Removed + +n/a + +## 0.5.10 + +### Added + +* Support for 88008 motor +* get_distance support for ColourDistanceSensor + +### Changed + +* Use debug log level for logging + +### Removed + +n/a + +## 0.5.9 + +### Added + +* Allow the BuildHAT LEDs to be turned on and off +* Allow for White in set\_pixels +* Prevent using same port multiple times +* Add support for checking maximum force sensed + +### Changed + +* Linting of code +* Renamed exceptions to conform to linter's style + +### Removed + +n/a + +## 0.5.8 + +### Added + +* LED Matrix transitions +* Expose feature to measure voltage of hat +* Function to set brightness of LEDs + +### Changed + +* New firmware to fix passive devices in hardware revision + +### Removed + +n/a + +## 0.5.7 + +### Added + +* Support for light +* Allow alternative serial device to be used +* Passive motor support +* WeDo sensor support + +### Changed + +n/a + +### Removed + +n/a + +## 0.5.6 + +### Added + + +### Changed + +n/a + +### Removed + +n/a + +## 0.5.5 + +### Added + + +### Changed + +n/a + +### Removed + +n/a + +## 0.5.4 + +### Added + +Documentation copy updates + +### Changed + +n/a + +### Removed + +n/a + +## 0.5.3 + +### Added + +* Force sensor now supports force threshold +* Simplify list of colours supported by colour sensor +* Fix distance sensor firing bug +* Simplify coast call + +### Changed + +n/a + +### Removed + +n/a + +## 0.5.2 + +### Added + +* Fixes force sensor callback only firing one +* Pass distance to distance sensor callback +* Make sure motors move simultaneously for a motor pair + +### Changed + +n/a + +### Removed + +n/a + ## 0.5.1 ### Added diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 634adbe..0000000 --- a/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM ubuntu:18.04 -MAINTAINER Max Conway - -RUN apt-get update && \ - apt-get install -y make build-essential python3 software-properties-common git python3-venv python3-dev libi2c-dev - -COPY .git /data/.git -RUN git -C /data/ reset --hard - -WORKDIR /data/ \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile deleted file mode 100644 index 87247c7..0000000 --- a/Jenkinsfile +++ /dev/null @@ -1,59 +0,0 @@ -pipeline { - agent { dockerfile true } - - stages { - stage('Build') { - steps { - echo 'Building..' - sh 'bash ./build.sh' - echo 'Finished building.' - } - } - stage('Test') { - steps { - echo 'Testing..' - sh 'bash ./test.sh' - echo 'Finished testing.' - } - } - } - post{ - always{ - sh 'apt list --installed > current_packages.txt' - archiveArtifacts artifacts: 'current_packages.txt', fingerprint: true - - sh './hat_env/bin/pip3 freeze > requirements.txt' - archiveArtifacts artifacts: 'requirements.txt', fingerprint: true - - echo 'Archive your test results here' - //archiveArtifacts artifacts: 'build/tests/rsa/release/bin/rsa.elf', fingerprint: true - //archiveArtifacts artifacts: 'build/tests/sha256/release/bin/sha256.elf', fingerprint: true - } - success{ - echo 'Archive your build results here' - //archiveArtifacts artifacts: 'build/apps/firmware/release/bin/firmware-1.elf', fingerprint: true - //archiveArtifacts artifacts: 'build/apps/firmware/release/bin/firmware-0.elf', fingerprint: true - //archiveArtifacts artifacts: 'build/apps/bootloader/release/bin/bootloader.elf', fingerprint: true - } - fixed{ - script { - if (env.BRANCH_NAME in ['master']) { - slackSend (color: '#00FF00', channel: "#jenkins", message: "FIXED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' (${env.RUN_DISPLAY_URL})") - } - } - } - regression{ - script { - if (env.BRANCH_NAME in ['master']) { - slackSend (color: '#FF0000', channel: "#jenkins", message: "REGRESSED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' (${env.RUN_DISPLAY_URL})") - emailext ( - subject: "REGRESSED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'", - body: """REGRESSED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]': - Check console output at ${env.BUILD_URL}""", - recipientProviders: [[$class: 'CulpritsRecipientProvider']] - ) - } - } - } - } -} \ No newline at end of file diff --git a/LICENSE b/LICENSE index 78d99a6..a43a2e8 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2020-2021 - Raspberry Pi Foundation +Copyright (c) 2020-2025 - Raspberry Pi Foundation Copyright (c) 2017-2021 - LEGO System A/S - Aastvej 1, 7190 Billund, DK Permission is hereby granted, free of charge, to any person obtaining a copy diff --git a/MANIFEST.in b/MANIFEST.in index 21e6bc5..cda2998 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,3 @@ -include src/*.c -include include/*.h include LICENSE recursive-include docs *.rst Makefile *.md *.py include docs/templates/*.html diff --git a/README.md b/README.md index a20d9d5..0b5968a 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Projects and inspiration: https://projects.raspberrypi.org/en/pathways/lego-intr To install the Build HAT library, enter the following commands in a terminal: - sudo pip3 install buildhat + pip3 install buildhat ## Usage diff --git a/VERSION b/VERSION index d3532a1..ac39a10 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.5.7 +0.9.0 diff --git a/build.sh b/build.sh deleted file mode 100755 index f04cfdc..0000000 --- a/build.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash - -python3 -m venv hat_env -source hat_env/bin/activate -./setup.py build -./setup.py install diff --git a/buildhat/__init__.py b/buildhat/__init__.py index 206d8c9..da3a147 100644 --- a/buildhat/__init__.py +++ b/buildhat/__init__.py @@ -1,11 +1,13 @@ -from .motors import Motor, MotorPair, PassiveMotor -from .distance import DistanceSensor -from .force import ForceSensor +"""Provide all the classes we need for build HAT""" + from .color import ColorSensor from .colordistance import ColorDistanceSensor -from .matrix import Matrix -from .wedo import TiltSensor, MotionSensor -from .light import Light +from .distance import DistanceSensor +from .exc import * # noqa: F403 +from .force import ForceSensor from .hat import Hat +from .light import Light +from .matrix import Matrix +from .motors import Motor, MotorPair, PassiveMotor from .serinterface import BuildHAT -from .exc import * +from .wedo import MotionSensor, TiltSensor diff --git a/buildhat/color.py b/buildhat/color.py index 7cd0263..5959263 100644 --- a/buildhat/color.py +++ b/buildhat/color.py @@ -1,38 +1,48 @@ -from .devices import Device -from .exc import DeviceInvalid -from threading import Condition -from collections import deque +"""Color sensor handling functionality""" + import math +from collections import deque +from threading import Condition + +from .devices import Device + class ColorSensor(Device): """Color sensor :param port: Port of device - :raises DeviceInvalid: Occurs if there is no color sensor attached to port + :raises DeviceError: Occurs if there is no color sensor attached to port """ + def __init__(self, port): + """ + Initialise color sensor + + :param port: Port of device + """ super().__init__(port) - if self.typeid != 61: - raise DeviceInvalid('There is not a color sensor connected to port %s (Found %s)' % (port, self.name)) self.reverse() self.mode(6) self.avg_reads = 4 self._old_color = None def segment_color(self, r, g, b): - """Returns the color name from RGB + """Return the color name from RGB + :param r: Red + :param g: Green + :param b: Blue :return: Name of the color as a string :rtype: str """ - table = [("black",(0,0,0)), - ("violet",(127,0,255)), - ("blue",(0,0,255)), - ("cyan",(0,183,235)), - ("green",(0,128,0)), - ("yellow",(255,255,0)), - ("red",(255,0,0)), - ("white",(255,255,255))] + table = [("black", (0, 0, 0)), + ("violet", (127, 0, 255)), + ("blue", (0, 0, 255)), + ("cyan", (0, 183, 235)), + ("green", (0, 128, 0)), + ("yellow", (255, 255, 0)), + ("red", (255, 0, 0)), + ("white", (255, 255, 255))] near = "" euc = math.inf for itm in table: @@ -41,16 +51,19 @@ def segment_color(self, r, g, b): near = itm[0] euc = cur return near - + def rgb_to_hsv(self, r, g, b): """Convert RGB to HSV Based on https://www.rapidtables.com/convert/color/rgb-to-hsv.html algorithm + :param r: Red + :param g: Green + :param b: Blue :return: HSV representation of color :rtype: tuple """ - r, g, b = r/255.0, g/255.0, b/255.0 + r, g, b = r / 255.0, g / 255.0, b / 255.0 cmax = max(r, g, b) cmin = min(r, g, b) delt = cmax - cmin @@ -59,7 +72,7 @@ def rgb_to_hsv(self, r, g, b): elif cmax == r: h = 60 * (((g - b) / delt) % 6) elif cmax == g: - h = 60 * ((((b - r) / delt)) + 2) + h = 60 * ((((b - r) / delt)) + 2) elif cmax == b: h = 60 * ((((r - g) / delt)) + 4) if cmax == 0: @@ -67,10 +80,10 @@ def rgb_to_hsv(self, r, g, b): else: s = delt / cmax v = cmax - return int(h), int(s*100), int(v*100) + return int(h), int(s * 100), int(v * 100) def get_color(self): - """Returns the color + """Return the color :return: Name of the color as a string :rtype: str @@ -79,33 +92,36 @@ def get_color(self): return self.segment_color(r, g, b) def get_ambient_light(self): - """Returns the ambient light + """Return the ambient light :return: Ambient light :rtype: int """ self.mode(2) readings = [] - for i in range(self.avg_reads): + for _ in range(self.avg_reads): readings.append(self.get()[0]) - return int(sum(readings)/len(readings)) - + return int(sum(readings) / len(readings)) + def get_reflected_light(self): - """Returns the reflected light + """Return the reflected light :return: Reflected light :rtype: int """ self.mode(1) readings = [] - for i in range(self.avg_reads): + for _ in range(self.avg_reads): readings.append(self.get()[0]) - return int(sum(readings)/len(readings)) + return int(sum(readings) / len(readings)) def _avgrgbi(self, reads): readings = [] for read in reads: - read = [int((read[0]/1024)*255), int((read[1]/1024)*255), int((read[2]/1024)*255), int((read[3]/1024)*255)] + read = [int((read[0] / 1024) * 255), + int((read[1] / 1024) * 255), + int((read[2] / 1024) * 255), + int((read[3] / 1024) * 255)] readings.append(read) rgbi = [] for i in range(4): @@ -113,87 +129,85 @@ def _avgrgbi(self, reads): return rgbi def get_color_rgbi(self): - """Returns the color + """Return the color - :return: RGBI representation + :return: RGBI representation :rtype: list """ self.mode(5) reads = [] - for i in range(self.avg_reads): + for _ in range(self.avg_reads): reads.append(self.get()) return self._avgrgbi(reads) def get_color_hsv(self): - """Returns the color + """Return the color - :return: HSV representation + :return: HSV representation :rtype: tuple """ self.mode(6) readings = [] - for i in range(self.avg_reads): + for _ in range(self.avg_reads): read = self.get() - read = [read[0], int((read[1]/1024)*100), int((read[2]/1024)*100)] + read = [read[0], int((read[1] / 1024) * 100), int((read[2] / 1024) * 100)] readings.append(read) s = c = 0 for hsv in readings: hue = hsv[0] s += math.sin(math.radians(hue)) - c += math.cos(math.radians(hue)) + c += math.cos(math.radians(hue)) - hue = int((math.degrees((math.atan2(s,c))) + 360) % 360) + hue = int((math.degrees((math.atan2(s, c))) + 360) % 360) sat = int(sum([hsv[1] for hsv in readings]) / len(readings)) val = int(sum([hsv[2] for hsv in readings]) / len(readings)) - return (hue, sat, val) + return (hue, sat, val) + + def _cb_handle(self, lst): + self._data.append(lst[:4]) + if len(self._data) == self.avg_reads: + r, g, b, _ = self._avgrgbi(self._data) + seg = self.segment_color(r, g, b) + if self._cmp(seg, self._color): + with self._cond: + self._old_color = seg + self._cond.notify() def wait_until_color(self, color): - """Waits until specific color + """Wait until specific color - :param color: Color to look for + :param color: Color to look for """ self.mode(5) - cond = Condition() - data = deque(maxlen=self.avg_reads) - def cb(lst): - data.append(lst[:4]) - if len(data) == self.avg_reads: - r, g, b, _ = self._avgrgbi(data) - seg = self.segment_color(r, g, b) - if seg == color: - with cond: - cond.notify() - self.callback(cb) - with cond: - cond.wait() + self._cond = Condition() + self._data = deque(maxlen=self.avg_reads) + self._color = color + self._cmp = lambda x, y: x == y + self.callback(self._cb_handle) + with self._cond: + self._cond.wait() self.callback(None) def wait_for_new_color(self): - """Waits for new color or returns immediately if first call + """Wait for new color or returns immediately if first call + + :return: Name of the color as a string + :rtype: str """ self.mode(5) if self._old_color is None: self._old_color = self.get_color() return self._old_color - cond = Condition() - data = deque(maxlen=self.avg_reads) - def cb(lst): - data.append(lst[:4]) - if len(data) == self.avg_reads: - r, g, b, _ = self._avgrgbi(data) - seg = self.segment_color(r, g, b) - if seg != self._old_color: - self._old_color = seg - with cond: - cond.notify() - self.callback(cb) - with cond: - cond.wait() + self._cond = Condition() + self._data = deque(maxlen=self.avg_reads) + self._color = self._old_color + self._cmp = lambda x, y: x != y + self.callback(self._cb_handle) + with self._cond: + self._cond.wait() self.callback(None) return self._old_color def on(self): - """ - Turns on the sensor and LED - """ - self._write("port {} ; plimit 1 ; set -1\r".format(self.port)) + """Turn on the sensor and LED""" + self.reverse() diff --git a/buildhat/colordistance.py b/buildhat/colordistance.py index 6c2cc63..3c722a3 100644 --- a/buildhat/colordistance.py +++ b/buildhat/colordistance.py @@ -1,56 +1,72 @@ -from .devices import Device -from .exc import DeviceInvalid -from threading import Condition -from collections import deque +"""Color distance sensor handling functionality""" + import math +from collections import deque +from threading import Condition + +from .devices import Device + class ColorDistanceSensor(Device): """Color Distance sensor :param port: Port of device - :raises DeviceInvalid: Occurs if there is no colordistance sensor attached to port + :raises DeviceError: Occurs if there is no colordistance sensor attached to port """ + def __init__(self, port): + """ + Initialise color distance sensor + + :param port: Port of device + """ super().__init__(port) - if self.typeid != 37: - raise DeviceInvalid('There is not a colordistance sensor connected to port %s (Found %s)' % (port, self.name)) self.on() self.mode(6) self.avg_reads = 4 self._old_color = None + self._ir_channel = 0x0 + self._ir_address = 0x0 + self._ir_toggle = 0x0 def segment_color(self, r, g, b): - """Returns the color name from HSV + """Return the color name from HSV + :param r: Red + :param g: Green + :param b: Blue :return: Name of the color as a string :rtype: str """ - table = [("black",(0,0,0)), - ("violet",(127,0,255)), - ("blue",(0,0,255)), - ("cyan",(0,183,235)), - ("green",(0,128,0)), - ("yellow",(255,255,0)), - ("red",(255,0,0)), - ("white",(255,255,255))] + table = [("black", (0, 0, 0)), + ("violet", (127, 0, 255)), + ("blue", (0, 0, 255)), + ("cyan", (0, 183, 235)), + ("green", (0, 128, 0)), + ("yellow", (255, 255, 0)), + ("red", (255, 0, 0)), + ("white", (255, 255, 255))] near = "" euc = math.inf for itm in table: - cur = math.sqrt((r - itm[1][0])**2 + (g - itm[1][1])**2 + (b - itm[1][2])**2) + cur = math.sqrt((r - itm[1][0]) ** 2 + (g - itm[1][1]) ** 2 + (b - itm[1][2]) ** 2) if cur < euc: near = itm[0] euc = cur return near - + def rgb_to_hsv(self, r, g, b): """Convert RGB to HSV Based on https://www.rapidtables.com/convert/color/rgb-to-hsv.html algorithm + :param r: Red + :param g: Green + :param b: Blue :return: HSV representation of color :rtype: tuple """ - r, g, b = r/255.0, g/255.0, b/255.0 + r, g, b = r / 255.0, g / 255.0, b / 255.0 cmax = max(r, g, b) cmin = min(r, g, b) delt = cmax - cmin @@ -59,7 +75,7 @@ def rgb_to_hsv(self, r, g, b): elif cmax == r: h = 60 * (((g - b) / delt) % 6) elif cmax == g: - h = 60 * ((((b - r) / delt)) + 2) + h = 60 * ((((b - r) / delt)) + 2) elif cmax == b: h = 60 * ((((r - g) / delt)) + 4) if cmax == 0: @@ -67,10 +83,10 @@ def rgb_to_hsv(self, r, g, b): else: s = delt / cmax v = cmax - return int(h), int(s*100), int(v*100) + return int(h), int(s * 100), int(v * 100) def get_color(self): - """Returns the color + """Return the color :return: Name of the color as a string :rtype: str @@ -79,28 +95,38 @@ def get_color(self): return self.segment_color(r, g, b) def get_ambient_light(self): - """Returns the ambient light + """Return the ambient light :return: Ambient light :rtype: int """ self.mode(4) readings = [] - for i in range(self.avg_reads): + for _ in range(self.avg_reads): readings.append(self.get()[0]) - return int(sum(readings)/len(readings)) - + return int(sum(readings) / len(readings)) + def get_reflected_light(self): - """Returns the reflected light + """Return the reflected light :return: Reflected light :rtype: int """ self.mode(3) readings = [] - for i in range(self.avg_reads): + for _ in range(self.avg_reads): readings.append(self.get()[0]) - return int(sum(readings)/len(readings)) + return int(sum(readings) / len(readings)) + + def get_distance(self): + """Return the distance + + :return: Distance + :rtype: int + """ + self.mode(1) + distance = self.get()[0] + return distance def _clamp(self, val, small, large): return max(small, min(val, large)) @@ -108,9 +134,9 @@ def _clamp(self, val, small, large): def _avgrgb(self, reads): readings = [] for read in reads: - read = [int((self._clamp(read[0], 0, 400)/400)*255), - int((self._clamp(read[1], 0, 400)/400)*255), - int((self._clamp(read[2], 0, 400)/400)*255)] + read = [int((self._clamp(read[0], 0, 400) / 400) * 255), + int((self._clamp(read[1], 0, 400) / 400) * 255), + int((self._clamp(read[2], 0, 400) / 400) * 255)] readings.append(read) rgb = [] for i in range(3): @@ -118,64 +144,393 @@ def _avgrgb(self, reads): return rgb def get_color_rgb(self): - """Returns the color + """Return the color - :return: RGBI representation + :return: RGBI representation :rtype: list """ self.mode(6) reads = [] - for i in range(self.avg_reads): + for _ in range(self.avg_reads): reads.append(self.get()) return self._avgrgb(reads) + def _cb_handle(self, lst): + self._data.append(lst) + if len(self._data) == self.avg_reads: + r, g, b = self._avgrgb(self._data) + seg = self.segment_color(r, g, b) + if self._cmp(seg, self._color): + with self._cond: + self._old_color = seg + self._cond.notify() + def wait_until_color(self, color): - """Waits until specific color + """Wait until specific color - :param color: Color to look for + :param color: Color to look for """ self.mode(6) - cond = Condition() - data = deque(maxlen=self.avg_reads) - def cb(lst): - data.append(lst) - if len(data) == self.avg_reads: - r, g, b = self._avgrgb(data) - seg = self.segment_color(r, g, b) - if seg == color: - with cond: - cond.notify() - self.callback(cb) - with cond: - cond.wait() + self._cond = Condition() + self._data = deque(maxlen=self.avg_reads) + self._color = color + self._cmp = lambda x, y: x == y + self.callback(self._cb_handle) + with self._cond: + self._cond.wait() self.callback(None) def wait_for_new_color(self): - """Waits for new color or returns immediately if first call + """Wait for new color or returns immediately if first call + + :return: Name of the color as a string + :rtype: str """ self.mode(6) if self._old_color is None: self._old_color = self.get_color() return self._old_color - cond = Condition() - data = deque(maxlen=self.avg_reads) - def cb(lst): - data.append(lst) - if len(data) == self.avg_reads: - r, g, b = self._avgrgb(data) - seg = self.segment_color(r, g, b) - if seg != self._old_color: - self._old_color = seg - with cond: - cond.notify() - self.callback(cb) - with cond: - cond.wait() + self._cond = Condition() + self._data = deque(maxlen=self.avg_reads) + self._color = self._old_color + self._cmp = lambda x, y: x != y + self.callback(self._cb_handle) + with self._cond: + self._cond.wait() self.callback(None) return self._old_color - def on(self): + @property + def ir_channel(self): + """Get the IR channel for message transmission""" + return self._ir_channel + + @ir_channel.setter + def ir_channel(self, channel=1): """ - Turns on the sensor and LED + Set the IR channel for RC Tx + + :param channel: 1-4 indicating the selected IR channel on the reciever """ - self._write("port {} ; plimit 1 ; set -1\r".format(self.port)) + check_chan = channel + if check_chan > 4: + check_chan = 4 + elif check_chan < 1: + check_chan = 1 + # Internally: 0-3 + self._ir_channel = int(check_chan) - 1 + + @property + def ir_address(self): + """IR Address space of 0x0 for default PoweredUp or 0x1 for extra space""" + return self._ir_address + + def toggle_ir_toggle(self): + """Toggle the IR toggle bit""" + # IYKYK, because the RC documents are not clear + if self._ir_toggle: + self._ir_toggle = 0x0 + else: + self._ir_toggle = 0x1 + return self._ir_toggle + + def send_ir_sop(self, port, mode): + """ + Send an IR message via Power Functions RC Protocol in Single Output PWM mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Port B is blue + + Valid values for mode are: + 0x0: Float output + 0x1: Forward/Clockwise at speed 1 + 0x2: Forward/Clockwise at speed 2 + 0x3: Forward/Clockwise at speed 3 + 0x4: Forward/Clockwise at speed 4 + 0x5: Forward/Clockwise at speed 5 + 0x6: Forward/Clockwise at speed 6 + 0x7: Forward/Clockwise at speed 7 + 0x8: Brake (then float v1.20) + 0x9: Backwards/Counterclockwise at speed 7 + 0xA: Backwards/Counterclockwise at speed 6 + 0xB: Backwards/Counterclockwise at speed 5 + 0xC: Backwards/Counterclockwise at speed 4 + 0xD: Backwards/Counterclockwise at speed 3 + 0xE: Backwards/Counterclockwise at speed 2 + 0xF: Backwards/Counterclockwise at speed 1 + + :param port: 'A' or 'B' + :param mode: 0-15 indicating the port's mode to set + """ + escape_modeselect = 0x0 + escape = escape_modeselect + + ir_mode_single_output = 0x4 + ir_mode = ir_mode_single_output + + so_mode_pwm = 0x0 + so_mode = so_mode_pwm + + output_port_a = 0x0 + output_port_b = 0x1 + + output_port = None + if port == 'A' or port == 'a': + output_port = output_port_a + elif port == 'B' or port == 'b': + output_port = output_port_b + else: + return False + + ir_mode = ir_mode | (so_mode << 1) | output_port + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + + # Mode range checked here + return self._send_ir_nibbles(nibble1, nibble2, mode) + + def send_ir_socstid(self, port, mode): + """ + Send an IR message via Power Functions RC Protocol in Single Output Clear/Set/Toggle/Increment/Decrement mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for mode are: + 0x0: Toggle full Clockwise/Forward (Stop to Clockwise, Clockwise to Stop, Counterclockwise to Clockwise) + 0x1: Toggle direction + 0x2: Increment numerical PWM + 0x3: Decrement numerical PWM + 0x4: Increment PWM + 0x5: Decrement PWM + 0x6: Full Clockwise/Forward + 0x7: Full Counterclockwise/Backward + 0x8: Toggle full (defaults to Forward, first) + 0x9: Clear C1 (C1 to High) + 0xA: Set C1 (C1 to Low) + 0xB: Toggle C1 + 0xC: Clear C2 (C2 to High) + 0xD: Set C2 (C2 to Low) + 0xE: Toggle C2 + 0xF: Toggle full Counterclockwise/Backward (Stop to Clockwise, Counterclockwise to Stop, Clockwise to Counterclockwise) + + :param port: 'A' or 'B' + :param mode: 0-15 indicating the port's mode to set + """ + escape_modeselect = 0x0 + escape = escape_modeselect + + ir_mode_single_output = 0x4 + ir_mode = ir_mode_single_output + + so_mode_cstid = 0x1 + so_mode = so_mode_cstid + + output_port_a = 0x0 + output_port_b = 0x1 + + output_port = None + if port == 'A' or port == 'a': + output_port = output_port_a + elif port == 'B' or port == 'b': + output_port = output_port_b + else: + return False + + ir_mode = ir_mode | (so_mode << 1) | output_port + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + + # Mode range checked here + return self._send_ir_nibbles(nibble1, nibble2, mode) + + def send_ir_combo_pwm(self, port_b_mode, port_a_mode): + """ + Send an IR message via Power Functions RC Protocol in Combo PWM mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for the modes are: + 0x0 Float + 0x1 PWM Forward step 1 + 0x2 PWM Forward step 2 + 0x3 PWM Forward step 3 + 0x4 PWM Forward step 4 + 0x5 PWM Forward step 5 + 0x6 PWM Forward step 6 + 0x7 PWM Forward step 7 + 0x8 Brake (then float v1.20) + 0x9 PWM Backward step 7 + 0xA PWM Backward step 6 + 0xB PWM Backward step 5 + 0xC PWM Backward step 4 + 0xD PWM Backward step 3 + 0xE PWM Backward step 2 + 0xF PWM Backward step 1 + + :param port_b_mode: 0-15 indicating the command to send to port B + :param port_a_mode: 0-15 indicating the command to send to port A + """ + escape_combo_pwm = 0x1 + escape = escape_combo_pwm + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + + # Port modes are range checked here + return self._send_ir_nibbles(nibble1, port_b_mode, port_a_mode) + + def send_ir_combo_direct(self, port_b_output, port_a_output): + """ + Send an IR message via Power Functions RC Protocol in Combo Direct mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for the output variables are: + 0x0: Float output + 0x1: Clockwise/Forward + 0x2: Counterclockwise/Backwards + 0x3: Brake then float + + :param port_b_output: 0-3 indicating the output to send to port B + :param port_a_output: 0-3 indicating the output to send to port A + """ + escape_modeselect = 0x0 + escape = escape_modeselect + + ir_mode_combo_direct = 0x1 + ir_mode = ir_mode_combo_direct + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + + if port_b_output > 0x3 or port_a_output > 0x3: + return False + if port_b_output < 0x0 or port_a_output < 0x0: + return False + + nibble3 = (port_b_output << 2) | port_a_output + + return self._send_ir_nibbles(nibble1, nibble2, nibble3) + + def send_ir_extended(self, mode): + """ + Send an IR message via Power Functions RC Protocol in Extended mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for the mode are: + 0x0: Brake Port A (timeout) + 0x1: Increment Speed on Port A + 0x2: Decrement Speed on Port A + + 0x4: Toggle Forward/Clockwise/Float on Port B + + 0x6: Toggle Address bit + 0x7: Align toggle bit + + :param mode: 0-2,4,6-7 + """ + escape_modeselect = 0x0 + escape = escape_modeselect + + ir_mode_extended = 0x0 + ir_mode = ir_mode_extended + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + + if mode < 0x0 or mode == 0x3 or mode == 0x5 or mode > 0x7: + return False + + return self._send_ir_nibbles(nibble1, nibble2, mode) + + def send_ir_single_pin(self, port, pin, mode, timeout): + """ + Send an IR message via Power Functions RC Protocol in Single Pin mode + + PF IR RC Protocol documented at https://www.philohome.com/pf/pf.htm + + Valid values for the mode are: + 0x0: No-op + 0x1: Clear + 0x2: Set + 0x3: Toggle + + Note: The unlabeled IR receiver (vs the one labeled V2) has a "firmware bug in Single Pin mode" + https://www.philohome.com/pfrec/pfrec.htm + + :param port: 'A' or 'B' + :param pin: 1 or 2 + :param mode: 0-3 indicating the pin's mode to set + :param timeout: True or False + """ + escape_mode = 0x0 + escape = escape_mode + + ir_mode_single_continuous = 0x2 + ir_mode_single_timeout = 0x3 + ir_mode = None + if timeout: + ir_mode = ir_mode_single_timeout + else: + ir_mode = ir_mode_single_continuous + + output_port_a = 0x0 + output_port_b = 0x1 + + output_port = None + if port == 'A' or port == 'a': + output_port = output_port_a + elif port == 'B' or port == 'b': + output_port = output_port_b + else: + return False + + if pin != 1 and pin != 2: + return False + pin_value = pin - 1 + + if mode > 0x3 or mode < 0x0: + return False + + nibble1 = (self._ir_toggle << 3) | (escape << 2) | self._ir_channel + nibble2 = (self._ir_address << 3) | ir_mode + nibble3 = (output_port << 3) | (pin_value << 3) | mode + + return self._send_ir_nibbles(nibble1, nibble2, nibble3) + + def _send_ir_nibbles(self, nibble1, nibble2, nibble3): + + # M7 IR Tx SI = N/A + # format count=1 type=1 chars=5 dp=0 + # RAW: 00000000 0000FFFF PCT: 00000000 00000064 SI: 00000000 0000FFFF + + mode = 7 + self.mode(mode) + + # The upper bits of data[2] are ignored + if nibble1 > 0xF or nibble2 > 0xF or nibble3 > 0xF: + return False + if nibble1 < 0x0 or nibble2 < 0x0 or nibble3 < 0x0: + return False + + byte_two = (nibble2 << 4) | nibble3 + + data = bytearray(3) + data[0] = (0xc << 4) | mode + data[1] = byte_two + data[2] = nibble1 + + # print(" ".join('{:04b}'.format(nibble1))) + # print(" ".join('{:04b}'.format(nibble2))) + # print(" ".join('{:04b}'.format(nibble3))) + # print(" ".join('{:08b}'.format(n) for n in data)) + + self._write1(data) + return True + + def on(self): + """Turn on the sensor and LED""" + self.reverse() diff --git a/buildhat/data/firmware.bin b/buildhat/data/firmware.bin index 6bd36ff..f6dcde3 100644 Binary files a/buildhat/data/firmware.bin and b/buildhat/data/firmware.bin differ diff --git a/buildhat/data/signature.bin b/buildhat/data/signature.bin index b99b88a..8f495fa 100644 --- a/buildhat/data/signature.bin +++ b/buildhat/data/signature.bin @@ -1 +1 @@ -; >g]mVV!Lٙ ]NF鴋X|[-;&X8\Son|$ \ No newline at end of file +N.2wSmQ/IӍm )io\~IAz4ĘɅPw>\w LV \ No newline at end of file diff --git a/buildhat/data/version b/buildhat/data/version index e77edc7..1f3109b 100644 --- a/buildhat/data/version +++ b/buildhat/data/version @@ -1 +1 @@ -1636109636 +1737564117 diff --git a/buildhat/devices.py b/buildhat/devices.py index b14f62a..80dec80 100644 --- a/buildhat/devices.py +++ b/buildhat/devices.py @@ -1,54 +1,126 @@ -from .serinterface import BuildHAT -from .exc import DeviceNotFound, DeviceChanged, DeviceInvalidMode -import weakref -import time +"""Functionality for handling Build HAT devices""" + import os import sys -import gc +import weakref +from concurrent.futures import Future + +from .exc import DeviceError +from .serinterface import BuildHAT + class Device: """Creates a single instance of the buildhat for all devices to use""" + _instance = None _started = 0 - _device_names = { 2: "PassiveMotor", - 8: "Light", - 34: "TiltSensor", - 35: "MotionSensor", - 37: "ColorDistanceSensor", - 61: "ColorSensor", - 62: "DistanceSensor", - 63: "ForceSensor", - 64: "Matrix", - 38: "Motor", - 46: "Motor", - 47: "Motor", - 48: "Motor", - 49: "Motor", - 65: "Motor", - 75: "Motor", - 76: "Motor" - } - + _device_names = {1: ("PassiveMotor", "PassiveMotor"), + 2: ("PassiveMotor", "PassiveMotor"), + 8: ("Light", "Light"), # 88005 + 34: ("TiltSensor", "WeDo 2.0 Tilt Sensor"), # 45305 + 35: ("MotionSensor", "MotionSensor"), # 45304 + 37: ("ColorDistanceSensor", "Color & Distance Sensor"), # 88007 + 61: ("ColorSensor", "Color Sensor"), # 45605 + 62: ("DistanceSensor", "Distance Sensor"), # 45604 + 63: ("ForceSensor", "Force Sensor"), # 45606 + 64: ("Matrix", "3x3 Color Light Matrix"), # 45608 + 38: ("Motor", "Medium Linear Motor"), # 88008 + 46: ("Motor", "Large Motor"), # 88013 + 47: ("Motor", "XL Motor"), # 88014 + 48: ("Motor", "Medium Angular Motor (Cyan)"), # 45603 + 49: ("Motor", "Large Angular Motor (Cyan)"), # 45602 + 65: ("Motor", "Small Angular Motor"), # 45607 + 75: ("Motor", "Medium Angular Motor (Grey)"), # 88018 + 76: ("Motor", "Large Angular Motor (Grey)")} # 88017 + + _used = {0: False, + 1: False, + 2: False, + 3: False} + + UNKNOWN_DEVICE = "Unknown" + DISCONNECTED_DEVICE = "Disconnected" + def __init__(self, port): + """Initialise device + + :param port: Port of device + :raises DeviceError: Occurs if incorrect port specified or port already used + """ if not isinstance(port, str) or len(port) != 1: - raise DeviceNotFound("Invalid port") + raise DeviceError("Invalid port") p = ord(port) - ord('A') if not (p >= 0 and p <= 3): - raise DeviceNotFound("Invalid port") + raise DeviceError("Invalid port") + if Device._used[p]: + raise DeviceError("Port already used") self.port = p - if not Device._instance: - data = os.path.join(os.path.dirname(sys.modules["buildhat"].__file__),"data/") - firm = os.path.join(data,"firmware.bin") - sig = os.path.join(data,"signature.bin") - ver = os.path.join(data,"version") - vfile = open(ver) - v = int(vfile.read()) - vfile.close() - Device._instance = BuildHAT(firm, sig, v) - weakref.finalize(self, self._close) + Device._setup() self._simplemode = -1 self._combimode = -1 + self._modestr = "" self._typeid = self._conn.typeid + self._interval = 10 + if ( + self._typeid in Device._device_names + and Device._device_names[self._typeid][0] != type(self).__name__ # noqa: W503 + ) or self._typeid == -1: + raise DeviceError(f'There is not a {type(self).__name__} connected to port {port} (Found {self.name})') + Device._used[p] = True + + @staticmethod + def _setup(**kwargs): + if Device._instance: + return + if ( + os.path.isdir(os.path.join(os.getcwd(), "data/")) + and os.path.isfile(os.path.join(os.getcwd(), "data", "firmware.bin")) + and os.path.isfile(os.path.join(os.getcwd(), "data", "signature.bin")) + and os.path.isfile(os.path.join(os.getcwd(), "data", "version")) + ): + data = os.path.join(os.getcwd(), "data/") + else: + data = os.path.join(os.path.dirname(sys.modules["buildhat"].__file__), "data/") + firm = os.path.join(data, "firmware.bin") + sig = os.path.join(data, "signature.bin") + ver = os.path.join(data, "version") + vfile = open(ver) + v = int(vfile.read()) + vfile.close() + Device._instance = BuildHAT(firm, sig, v, **kwargs) + weakref.finalize(Device._instance, Device._instance.shutdown) + + def __del__(self): + """Handle deletion of device""" + if hasattr(self, "port") and Device._used[self.port]: + Device._used[self.port] = False + self._conn.callit = None + self.deselect() + self.off() + + @staticmethod + def name_for_id(typeid): + """Translate integer type id to device name (python class) + + :param typeid: Type of device + :return: Name of device + """ + if typeid in Device._device_names: + return Device._device_names[typeid][0] + else: + return Device.UNKNOWN_DEVICE + + @staticmethod + def desc_for_id(typeid): + """Translate integer type id to something more descriptive than the device name + + :param typeid: Type of device + :return: Description of device + """ + if typeid in Device._device_names: + return Device._device_names[typeid][1] + else: + return Device.UNKNOWN_DEVICE @property def _conn(self): @@ -56,108 +128,192 @@ def _conn(self): @property def connected(self): + """Whether device is connected or not + + :return: Connection status + """ return self._conn.connected @property def typeid(self): + """Type ID of device + + :return: Type ID + """ return self._typeid @property def typeidcur(self): + """Type ID currently present + + :return: Type ID + """ return self._conn.typeid @property def _hat(self): + """Hat instance + + :return: Hat instance + """ return Device._instance @property def name(self): - """Determines name of device on port""" - if self.connected == False: - return "No device" + """Determine name of device on port + + :return: Device name + """ + if not self.connected: + return Device.DISCONNECTED_DEVICE elif self.typeidcur in self._device_names: - return self._device_names[self.typeidcur] + return self._device_names[self.typeidcur][0] else: - return "Unknown" + return Device.UNKNOWN_DEVICE - def _close(self): - Device._instance.shutdown() + @property + def description(self): + """Device on port info + + :return: Device description + """ + if not self.connected: + return Device.DISCONNECTED_DEVICE + elif self.typeidcur in self._device_names: + return self._device_names[self.typeidcur][1] + else: + return Device.UNKNOWN_DEVICE def isconnected(self): + """Whether it is connected or not + + :raises DeviceError: Occurs if device no longer the same + """ if not self.connected: - raise DeviceNotFound("No device found") + raise DeviceError("No device found") if self.typeid != self.typeidcur: - raise DeviceChanged("Device has changed") + raise DeviceError("Device has changed") def reverse(self): - self._write("port {} ; plimit 1 ; set -1\r".format(self.port)) + """Reverse polarity""" + self._write(f"port {self.port} ; port_plimit 1 ; set -1\r") def get(self): + """Extract information from device + + :return: Data from device + :raises DeviceError: Occurs if device not in valid mode + """ self.isconnected() - idx = -1 - if self._simplemode != -1: - idx = self._simplemode - elif self._combimode != -1: - idx = self._combimode - else: - raise DeviceInvalidMode("Not in simple or combimode") - self._write("port {} ; selonce {}\r".format(self.port, idx)) - # wait for data - with Device._instance.portcond[self.port]: - Device._instance.portcond[self.port].wait() - return self._conn.data + if self._simplemode == -1 and self._combimode == -1: + raise DeviceError("Not in simple or combimode") + ftr = Future() + self._hat.portftr[self.port].append(ftr) + return ftr.result() def mode(self, modev): + """Set combimode or simple mode + + :param modev: List of tuples for a combimode, or integer for simple mode + """ self.isconnected() if isinstance(modev, list): - self._combimode = 0 modestr = "" for t in modev: - modestr += "{} {} ".format(t[0],t[1]) - self._write("port {} ; combi {} {}\r".format(self.port,self._combimode, modestr)) + modestr += f"{t[0]} {t[1]} " + if self._simplemode == -1 and self._combimode == 0 and self._modestr == modestr: + return + self._write(f"port {self.port}; select\r") + self._combimode = 0 + self._write((f"port {self.port} ; combi {self._combimode} {modestr} ; " + f"select {self._combimode} ; " + f"selrate {self._interval}\r")) self._simplemode = -1 + self._modestr = modestr + self._conn.combimode = 0 + self._conn.simplemode = -1 else: + if self._combimode == -1 and self._simplemode == int(modev): + return # Remove combi mode if self._combimode != -1: - self._write("port {} ; combi {}\r".format(self.port,self._combimode)) + self._write(f"port {self.port} ; combi {self._combimode}\r") + self._write(f"port {self.port}; select\r") self._combimode = -1 self._simplemode = int(modev) + self._write(f"port {self.port} ; select {int(modev)} ; selrate {self._interval}\r") + self._conn.combimode = -1 + self._conn.simplemode = int(modev) def select(self): + """Request data from mode + + :raises DeviceError: Occurs if device not in valid mode + """ self.isconnected() if self._simplemode != -1: idx = self._simplemode elif self._combimode != -1: idx = self._combimode else: - raise DeviceInvalidMode("Not in simple or combimode") - self._write("port {} ; select {}\r".format(self.port,idx)) + raise DeviceError("Not in simple or combimode") + self._write(f"port {self.port} ; select {idx} ; selrate {self._interval}\r") def on(self): - """ - Turns on sensor - """ - self._write("port {} ; plimit 1 ; on\r".format(self.port)) + """Turn on sensor""" + self._write(f"port {self.port} ; port_plimit 1 ; on\r") def off(self): - """ - Turns off sensor - """ - self._write("port {} ; off\r".format(self.port)) + """Turn off sensor""" + self._write(f"port {self.port} ; off\r") def deselect(self): - self._write("port {} ; select\r".format(self.port)) + """Unselect data from mode""" + self._write(f"port {self.port} ; select\r") def _write(self, cmd): self.isconnected() Device._instance.write(cmd.encode()) def _write1(self, data): - self._write("port {} ; write1 {}\r".format(self.port, ' '.join('{:x}'.format(h) for h in data))) + hexstr = ' '.join(f'{h:x}' for h in data) + self._write(f"port {self.port} ; write1 {hexstr}\r") def callback(self, func): + """Set callback function + + :param func: Callback function + """ if func is not None: self.select() else: self.deselect() - self._conn.callit = func + if func is None: + self._conn.callit = None + else: + self._conn.callit = weakref.WeakMethod(func) + + @property + def interval(self): + """Interval between data points in milliseconds + + :getter: Gets interval + :setter: Sets interval + :return: Device interval + :rtype: int + """ + return self._interval + + @interval.setter + def interval(self, value): + """Interval between data points in milliseconds + + :param value: Interval + :type value: int + :raises DeviceError: Occurs if invalid interval passed + """ + if isinstance(value, int) and value >= 0 and value <= 1000000000: + self._interval = value + self._write(f"port {self.port} ; selrate {self._interval}\r") + else: + raise DeviceError("Invalid interval") diff --git a/buildhat/distance.py b/buildhat/distance.py index 0e35b72..8a32caa 100644 --- a/buildhat/distance.py +++ b/buildhat/distance.py @@ -1,18 +1,26 @@ -from .devices import Device -from .exc import DeviceInvalid, DistanceSensorException +"""Distance sensor handling functionality""" + from threading import Condition -import threading + +from .devices import Device +from .exc import DistanceSensorError + class DistanceSensor(Device): """Distance sensor :param port: Port of device - :raises DeviceInvalid: Occurs if there is no distance sensor attached to port + :raises DeviceError: Occurs if there is no distance sensor attached to port """ + def __init__(self, port, threshold_distance=100): + """ + Initialise distance sensor + + :param port: Port of device + :param threshold_distance: Optional + """ super().__init__(port) - if self.typeid != 62: - raise DeviceInvalid('There is not a distance sensor connected to port %s (Found %s)' % (port, self.name)) self.on() self.mode(0) self._cond_data = Condition() @@ -42,15 +50,21 @@ def _intermediate(self, data): @property def distance(self): """ + Obtain previously stored distance + :getter: Returns distance + :return: Stored distance """ return self._distance @property def threshold_distance(self): """ + Threshold distance value + :getter: Returns threshold distance :setter: Sets threshold distance + :return: Threshold distance """ return self._threshold_distance @@ -60,7 +74,7 @@ def threshold_distance(self, value): def get_distance(self): """ - Returns the distance from ultrasonic sensor to object + Return the distance from ultrasonic sensor to object :return: Distance from ultrasonic sensor :rtype: int @@ -71,37 +85,45 @@ def get_distance(self): @property def when_in_range(self): """ - Handles motion events + Handle motion events :getter: Returns function to be called when in range :setter: Sets function to be called when in range + :return: In range callback """ return self._when_in_range @when_in_range.setter def when_in_range(self, value): - """Calls back, when distance in range""" + """Call back, when distance in range + + :param value: In range callback + """ self._when_in_range = value self.callback(self._intermediate) @property def when_out_of_range(self): """ - Handles motion events + Handle motion events :getter: Returns function to be called when out of range :setter: Sets function to be called when out of range + :return: Out of range callback """ return self._when_out_of_range @when_out_of_range.setter def when_out_of_range(self, value): - """Calls back, when distance out of range""" + """Call back, when distance out of range + + :param value: Out of range callback + """ self._when_out_of_range = value self.callback(self._intermediate) def wait_for_out_of_range(self, distance): - """Waits until object is farther than specified distance + """Wait until object is farther than specified distance :param distance: Distance """ @@ -110,9 +132,9 @@ def wait_for_out_of_range(self, distance): self._cond_data.wait() while self._data < distance: self._cond_data.wait() - + def wait_for_in_range(self, distance): - """Waits until object is closer than specified distance + """Wait until object is closer than specified distance :param distance: Distance """ @@ -125,21 +147,21 @@ def wait_for_in_range(self, distance): def eyes(self, *args): """ Brightness of LEDs on sensor + (Sensor Right Upper, Sensor Left Upper, Sensor Right Lower, Sensor Left Lower) - :param \*args: Four brightness arguments of 0 to 100 + :param args: Four brightness arguments of 0 to 100 + :raises DistanceSensorError: Occurs if invalid brightness passed """ out = [0xc5] if len(args) != 4: - raise DistanceSensorException("Need 4 brightness args, of 0 to 100") + raise DistanceSensorError("Need 4 brightness args, of 0 to 100") for v in args: if not (v >= 0 and v <= 100): - raise DistanceSensorException("Need 4 brightness args, of 0 to 100") + raise DistanceSensorError("Need 4 brightness args, of 0 to 100") out += [v] self._write1(out) def on(self): - """ - Turns on the sensor - """ - self._write("port {} ; set -1\r".format(self.port)) + """Turn on the sensor""" + self._write(f"port {self.port} ; set -1\r") diff --git a/buildhat/exc.py b/buildhat/exc.py index 8267015..2c0d3f9 100644 --- a/buildhat/exc.py +++ b/buildhat/exc.py @@ -1,26 +1,25 @@ -class MatrixInvalidPixel(Exception): - pass +"""Exceptions for all build HAT classes""" -class MotorException(Exception): - pass -class HatNotFound(Exception): - pass +class DistanceSensorError(Exception): + """Error raised when invalid arguments passed to distance sensor functions""" -class DeviceInvalidMode(Exception): - pass -class DeviceInvalid(Exception): - pass +class MatrixError(Exception): + """Error raised when invalid arguments passed to matrix functions""" -class DeviceNotFound(Exception): - pass -class DeviceChanged(Exception): - pass +class LightError(Exception): + """Error raised when invalid arguments passed to light functions""" -class DistanceSensorException(Exception): - pass -class DirectionInvalid(Exception): - pass +class MotorError(Exception): + """Error raised when invalid arguments passed to motor functions""" + + +class BuildHATError(Exception): + """Error raised when HAT not found""" + + +class DeviceError(Exception): + """Error raised when there is a Device issue""" diff --git a/buildhat/force.py b/buildhat/force.py index aef305f..c7378db 100644 --- a/buildhat/force.py +++ b/buildhat/force.py @@ -1,19 +1,25 @@ -from .devices import Device -from .exc import DeviceInvalid +"""Force sensor handling functionality""" + from threading import Condition -import threading + +from .devices import Device + class ForceSensor(Device): """Force sensor :param port: Port of device - :raises DeviceInvalid: Occurs if there is no force sensor attached to port + :raises DeviceError: Occurs if there is no force sensor attached to port """ + def __init__(self, port, threshold_force=1): + """Initialise force sensor + + :param port: Port of device + :param threshold_force: Optional + """ super().__init__(port) - if self.typeid != 63: - raise DeviceInvalid('There is not a force sensor connected to port %s (Found %s)' % (port, self.name)) - self.mode([(0,0),(1,0)]) + self.mode([(0, 0), (1, 0), (3, 0)]) self._when_pressed = None self._when_released = None self._fired_pressed = False @@ -38,9 +44,11 @@ def _intermediate(self, data): @property def threshold_force(self): - """ + """Threshold force + :getter: Returns threshold force :setter: Sets threshold force + :return: Threshold force """ return self._threshold_force @@ -49,15 +57,25 @@ def threshold_force(self, value): self._threshold_force = value def get_force(self): - """Returns the force in (N) + """Return the force in (N) :return: The force exerted on the button :rtype: int """ return self.get()[0] + def get_peak_force(self): + """Get the maximum force registered since the sensor was reset + + (The sensor gets reset when the firmware is reloaded) + + :return: 0 - 100 + :rtype: int + """ + return self.get()[2] + def is_pressed(self): - """Gets whether the button is pressed + """Get whether the button is pressed :return: If button is pressed :rtype: bool @@ -66,36 +84,46 @@ def is_pressed(self): @property def when_pressed(self): - """Handles force events + """Handle force events :getter: Returns function to be called when pressed :setter: Sets function to be called when pressed + :return: Callback function """ return self._when_pressed @when_pressed.setter def when_pressed(self, value): - """Calls back, when button is has pressed""" + """Call back, when button is has pressed + + :param value: Callback function + """ self._when_pressed = value self.callback(self._intermediate) @property def when_released(self): - """Handles force events + """Handle force events :getter: Returns function to be called when released :setter: Sets function to be called when released + :return: Callback function """ return self._when_pressed @when_released.setter def when_released(self, value): - """Calls back, when button is has released""" + """Call back, when button is has released + + :param value: Callback function + """ self._when_released = value self.callback(self._intermediate) def wait_until_pressed(self, force=1): - """Waits until the button is pressed + """Wait until the button is pressed + + :param force: Optional """ self.callback(self._intermediate) with self._cond_force: @@ -104,7 +132,9 @@ def wait_until_pressed(self, force=1): self._cond_force.wait() def wait_until_released(self, force=0): - """Waits until the button is released + """Wait until the button is released + + :param force: Optional """ self.callback(self._intermediate) with self._cond_force: diff --git a/buildhat/hat.py b/buildhat/hat.py index 4afe3f7..0a277eb 100644 --- a/buildhat/hat.py +++ b/buildhat/hat.py @@ -1,41 +1,130 @@ -from .serinterface import BuildHAT +"""HAT handling functionality""" + +from concurrent.futures import Future + from .devices import Device -import os -import sys -import weakref + class Hat: - """Allows enumeration of devices which are connected to the hat - """ - def __init__(self, device="/dev/serial0"): - if not Device._instance: - data = os.path.join(os.path.dirname(sys.modules["buildhat"].__file__),"data/") - firm = os.path.join(data,"firmware.bin") - sig = os.path.join(data,"signature.bin") - ver = os.path.join(data,"version") - vfile = open(ver) - v = int(vfile.read()) - vfile.close() - Device._instance = BuildHAT(firm, sig, v, device=device) - weakref.finalize(self, self._close) + """Allows enumeration of devices which are connected to the hat""" + + def __init__(self, device=None, debug=False): + """Hat + + :param device: Optional string containing path to Build HAT serial device + :param debug: Optional boolean to log debug information + """ + self.led_status = -1 + if device is None: + Device._setup(debug=debug) + else: + Device._setup(device=device, debug=debug) def get(self): - """Gets devices which are connected or disconnected + """Get devices which are connected or disconnected :return: Dictionary of devices :rtype: dict """ devices = {} for i in range(4): - name = "Other" + name = Device.UNKNOWN_DEVICE if Device._instance.connections[i].typeid in Device._device_names: - name = Device._device_names[Device._instance.connections[i].typeid] + name = Device._device_names[Device._instance.connections[i].typeid][0] + desc = Device._device_names[Device._instance.connections[i].typeid][1] elif Device._instance.connections[i].typeid == -1: - name = "Disconnected" - devices[chr(ord('A')+i)] = {"typeid" : Device._instance.connections[i].typeid, - "connected" : Device._instance.connections[i].connected, - "name" : name} + name = Device.DISCONNECTED_DEVICE + desc = '' + devices[chr(ord('A') + i)] = {"typeid": Device._instance.connections[i].typeid, + "connected": Device._instance.connections[i].connected, + "name": name, + "description": desc} return devices + def get_logfile(self): + """Get the filename of the debug log (If enabled, None otherwise) + + :return: Path of the debug logfile + :rtype: str or None + """ + return Device._instance.debug_filename + + def get_vin(self): + """Get the voltage present on the input power jack + + :return: Voltage on the input power jack + :rtype: float + """ + ftr = Future() + Device._instance.vinftr.append(ftr) + Device._instance.write(b"vin\r") + return ftr.result() + + def _set_led(self, intmode): + if isinstance(intmode, int) and intmode >= -1 and intmode <= 3: + self.led_status = intmode + Device._instance.write(f"ledmode {intmode}\r".encode()) + + def set_leds(self, color="voltage"): + """Set the two LEDs on or off on the BuildHAT. + + By default the color depends on the input voltage with green being nominal at around 8V + (The fastest time the LEDs can be perceptually toggled is around 0.025 seconds) + + :param color: orange, green, both, off, or voltage (default) + """ + if color == "orange": + self._set_led(1) + elif color == "green": + self._set_led(2) + elif color == "both": + self._set_led(3) + elif color == "off": + self._set_led(0) + elif color == "voltage": + self._set_led(-1) + else: + return + + def orange_led(self, status=True): + """Turn the BuildHAT's orange LED on or off + + :param status: True to turn it on, False to turn it off + """ + if status: + if self.led_status == 3 or self.led_status == 1: + # already on + return + elif self.led_status == 2: + self._set_led(3) + # off or default + else: + self._set_led(1) + else: + if self.led_status == 1 or self.led_status == -1: + self._set_led(0) + elif self.led_status == 3: + self._set_led(2) + + def green_led(self, status=True): + """Turn the BuildHAT's green LED on or off + + :param status: True to turn it on, False to turn it off + """ + if status: + if self.led_status == 3 or self.led_status == 2: + # already on + return + elif self.led_status == 1: + self._set_led(3) + # off or default + else: + self._set_led(2) + else: + if self.led_status == 2 or self.led_status == -1: + self._set_led(0) + elif self.led_status == 3: + self._set_led(1) + def _close(self): Device._instance.shutdown() diff --git a/buildhat/light.py b/buildhat/light.py index 9874649..1748e55 100644 --- a/buildhat/light.py +++ b/buildhat/light.py @@ -1,5 +1,8 @@ +"""Light device handling functionality""" + from .devices import Device -from .exc import DeviceInvalid +from .exc import LightError + class Light(Device): """Light @@ -7,9 +10,32 @@ class Light(Device): Use on()/off() functions to turn lights on/off :param port: Port of device - :raises DeviceInvalid: Occurs if there is no light attached to port + :raises DeviceError: Occurs if there is no light attached to port """ + def __init__(self, port): + """ + Initialise light + + :param port: Port of device + """ super().__init__(port) - if self.typeid != 8: - raise DeviceInvalid('There is not a light connected to port %s (Found %s)' % (port, self.name)) + + def brightness(self, brightness): + """ + Brightness of LEDs + + :param brightness: Brightness argument 0 to 100 + :raises LightError: Occurs if invalid brightness passed + """ + if not (brightness >= 0 and brightness <= 100): + raise LightError("Need brightness arg, of 0 to 100") + if brightness > 0: + self._write(f"port {self.port} ; on ; set {brightness / 100.0}\r") + else: + self.off() + + def off(self): + """Turn off lights""" + # Using coast to turn off DIY lights completely + self._write(f"port {self.port} ; coast\r") diff --git a/buildhat/matrix.py b/buildhat/matrix.py index fb12900..78a637b 100644 --- a/buildhat/matrix.py +++ b/buildhat/matrix.py @@ -1,36 +1,43 @@ +"""Matrix device handling functionality""" + from .devices import Device -from .exc import DeviceInvalid -import threading -import time +from .exc import MatrixError + class Matrix(Device): """LED Matrix :param port: Port of device - :raises DeviceInvalid: Occurs if there is no LED matrix attached to port + :raises DeviceError: Occurs if there is no LED matrix attached to port """ + def __init__(self, port): + """Initialise matrix + + :param port: Port of device + """ super().__init__(port) - if self.typeid != 64: - raise DeviceInvalid('There is no LED matrix connected to port %s (Found %s)' % (port, self.name)) self.on() self.mode(2) - self._matrix = [[(0,0) for x in range(3)] for y in range(3)] + self._matrix = [[(0, 0) for x in range(3)] for y in range(3)] - def set_pixels(self, matrix): + def set_pixels(self, matrix, display=True): """Write pixel data to LED matrix - :param pixels: 3x3 list of tuples, with colour (0–9) and brightness (0–10) (see example for more detail) + :param matrix: 3x3 list of tuples, with colour (0–10) and brightness (0–10) (see example for more detail) + :param display: Whether to update matrix or not + :raises MatrixError: Occurs if invalid matrix height/width provided """ + if len(matrix) != 3: + raise MatrixError("Incorrect matrix height") for x in range(3): + if len(matrix[x]) != 3: + raise MatrixError("Incorrect matrix width") for y in range(3): - color, brightness = matrix[x][y] - if not (brightness >= 0 and brightness <= 10): - raise MatrixInvalidPixel("Invalid brightness specified") - if not (color >= 0 and color <= 9): - raise MatrixInvalidPixel("Invalid pixel specified") + matrix[x][y] = Matrix.normalize_pixel(matrix[x][y]) # pylint: disable=too-many-function-args self._matrix = matrix - self._output() + if display: + self._output() def _output(self): out = [0xc2] @@ -41,7 +48,15 @@ def _output(self): self._write1(out) self.deselect() - def strtocolor(self, colorstr): + @staticmethod + def strtocolor(colorstr): + """Return the BuldHAT's integer representation of a color string + + :param colorstr: str of a valid color + :return: (0-10) representing the color + :rtype: int + :raises MatrixError: Occurs if invalid color specified + """ if colorstr == "pink": return 1 elif colorstr == "lilac": @@ -60,54 +75,133 @@ def strtocolor(self, colorstr): return 8 elif colorstr == "red": return 9 - raise MatrixInvalidPixel("Invalid color specified") + elif colorstr == "white": + return 10 + elif colorstr == "": + return 0 + raise MatrixError("Invalid color specified") + + @staticmethod + def normalize_pixel(pixel): + """Validate a pixel tuple (color, brightness) and convert string colors to integers + + :param pixel: tuple of colour (0–10) or string (ie:"red") and brightness (0–10) + :return: (color, brightness) integers + :rtype: tuple + :raises MatrixError: Occurs if invalid pixel specified + """ + if isinstance(pixel, tuple): + c, brightness = pixel # pylint: disable=unpacking-non-sequence + if isinstance(c, str): + c = Matrix.strtocolor(c) # pylint: disable=too-many-function-args + if not (isinstance(brightness, int) and isinstance(c, int)): + raise MatrixError("Invalid pixel specified") + if not (brightness >= 0 and brightness <= 10): + raise MatrixError("Invalid brightness value specified") + if not (c >= 0 and c <= 10): + raise MatrixError("Invalid pixel color specified") + return (c, brightness) + else: + raise MatrixError("Invalid pixel specified") + + @staticmethod + def validate_coordinate(coord): + """Validate an x,y coordinate for the 3x3 Matrix + + :param coord: tuple of 0-2 for the X coordinate and 0-2 for the Y coordinate + :raises MatrixError: Occurs if invalid coordinate specified + """ + # pylint: disable=unsubscriptable-object + if isinstance(coord, tuple): + if not (isinstance(coord[0], int) and isinstance(coord[1], int)): + raise MatrixError("Invalid coord specified") + elif coord[0] > 2 or coord[0] < 0 or coord[1] > 2 or coord[1] < 0: + raise MatrixError("Invalid coord specified") + else: + raise MatrixError("Invalid coord specified") def clear(self, pixel=None): """Clear matrix or set all as the same pixel - :param pixel: tuple of colour (0–9) or string and brightness (0–10) + :param pixel: tuple of colour (0–10) or string and brightness (0–10) """ if pixel is None: - self._matrix = [[(0,0) for x in range(3)] for y in range(3)] + self._matrix = [[(0, 0) for x in range(3)] for y in range(3)] else: - color = () - if isinstance(pixel, tuple): - c, brightness = pixel - if isinstance(c, str): - c = self.strtocolor(c) - if not (brightness >= 0 and brightness <= 10): - raise MatrixInvalidPixel("Invalid brightness specified") - if not (c >= 0 and c <= 9): - raise MatrixInvalidPixel("Invalid pixel specified") - color = (c, brightness) - else: - raise MatrixInvalidPixel("Invalid pixel specified") + color = Matrix.normalize_pixel(pixel) # pylint: disable=too-many-function-args self._matrix = [[color for x in range(3)] for y in range(3)] self._output() + def off(self): + """Pretends to turn matrix off + + Never send the "off" command to the port a Matrix is connected to + Instead, just turn all the pixels off + """ + self.clear() + + def level(self, level): + """Use the matrix as a "level" meter from 0-9 + + (The level meter is expressed in green which seems to be unchangeable) + + :param level: The height of the bar graph, 0-9 + :raises MatrixError: Occurs if invalid level specified + """ + if not isinstance(level, int): + raise MatrixError("Invalid level, not integer") + if not (level >= 0 and level <= 9): + raise MatrixError("Invalid level specified") + self.mode(0) + self.select() + self._write1([0xc0, level]) + self.mode(2) # The rest of the Matrix code seems to expect this to be always set + self.deselect() + + def set_transition(self, transition): + """Set the transition mode between pixels + + Use display=False on set_pixel() or use set_pixels() to achieve desired + results with transitions. + + Setting a new transition mode will wipe the screen and interrupt any + running transition. + + Mode 0: No transition, immediate pixel drawing + + Mode 1: Right-to-left wipe in/out + + If the timing between writing new matrix pixels is less than one second + the transition will clip columns of pixels from the right. + + Mode 2: Fade-in/Fade-out + + The fade in and fade out take about 2.2 seconds for full fade effect. + Waiting less time between setting new pixels will result in a faster + fade which will cause the fade to "pop" in brightness. + + :param transition: Transition mode (0-2) + :raises MatrixError: Occurs if invalid transition + """ + if not isinstance(transition, int): + raise MatrixError("Invalid transition, not integer") + if not (transition >= 0 and transition <= 2): + raise MatrixError("Invalid transition specified") + self.mode(3) + self.select() + self._write1([0xc3, transition]) + self.mode(2) # The rest of the Matrix code seems to expect this to be always set + self.deselect() + def set_pixel(self, coord, pixel, display=True): """Write pixel to coordinate :param coord: (0,0) to (2,2) - :param pixel: tuple of colour (0–9) or string and brightness (0–10) + :param pixel: tuple of colour (0–10) or string and brightness (0–10) :param display: Whether to update matrix or not """ - if isinstance(pixel, tuple): - c, brightness = pixel - if isinstance(c, str): - c = self.strtocolor(c) - if not (brightness >= 0 and brightness <= 10): - raise MatrixInvalidPixel("Invalid brightness specified") - if not (c >= 0 and c <= 9): - raise MatrixInvalidPixel("Invalid pixel specified") - color = (c, brightness) - else: - raise MatrixInvalidPixel("Invalid pixel specified") - if isinstance(coord, tuple): - if not (isinstance(coord[0], int) and isinstance(coord[1], int)): - raise MatrixInvalidPixel("Invalid coord specified") - else: - raise MatrixInvalidPixel("Invalid coord specified") + color = Matrix.normalize_pixel(pixel) # pylint: disable=too-many-function-args + Matrix.validate_coordinate(coord) # pylint: disable=too-many-function-args x, y = coord self._matrix[x][y] = color if display: diff --git a/buildhat/motors.py b/buildhat/motors.py index aee7fe3..38786c0 100644 --- a/buildhat/motors.py +++ b/buildhat/motors.py @@ -1,140 +1,188 @@ -from .devices import Device -from .exc import DeviceInvalid, DirectionInvalid, MotorException -from threading import Condition -from collections import deque -from enum import Enum +"""Motor device handling functionality""" + import threading -import statistics import time +from collections import deque +from concurrent.futures import Future +from enum import Enum +from threading import Condition + +from .devices import Device +from .exc import MotorError + class PassiveMotor(Device): """Passive Motor device :param port: Port of device - :raises DeviceInvalid: Occurs if there is no passive motor attached to port + :raises DeviceError: Occurs if there is no passive motor attached to port """ - MOTOR_SET = set([2]) def __init__(self, port): + """Initialise motor + + :param port: Port of device + """ super().__init__(port) - if self.typeid not in PassiveMotor.MOTOR_SET: - raise DeviceInvalid('There is not a passive motor connected to port %s (Found %s)' % (port, self.name)) self._default_speed = 20 + self._currentspeed = 0 self.plimit(0.7) - self.bias(0.3) def set_default_speed(self, default_speed): - """Sets the default speed of the motor + """Set the default speed of the motor :param default_speed: Speed ranging from -100 to 100 + :raises MotorError: Occurs if invalid speed passed """ if not (default_speed >= -100 and default_speed <= 100): - raise MotorException("Invalid Speed") + raise MotorError("Invalid Speed") self._default_speed = default_speed def start(self, speed=None): """Start motor :param speed: Speed ranging from -100 to 100 + :raises MotorError: Occurs if invalid speed passed """ + if self._currentspeed == speed: + # Already running at this speed, do nothing + return + if speed is None: speed = self._default_speed else: if not (speed >= -100 and speed <= 100): - raise MotorException("Invalid Speed") - cmd = "port {} ; pwm ; set {}\r".format(self.port, speed/100) + raise MotorError("Invalid Speed") + self._currentspeed = speed + cmd = f"port {self.port} ; pwm ; set {speed / 100}\r" self._write(cmd) def stop(self): - """Stops motor""" - cmd = "port {} ; off\r".format(self.port) + """Stop motor""" + cmd = f"port {self.port} ; off\r" self._write(cmd) + self._currentspeed = 0 def plimit(self, plimit): + """Limit power + + :param plimit: Value 0 to 1 + :raises MotorError: Occurs if invalid plimit value passed + """ if not (plimit >= 0 and plimit <= 1): - raise MotorException("plimit should be 0 to 1") - self._write("port {} ; plimit {}\r".format(self.port, plimit)) + raise MotorError("plimit should be 0 to 1") + self._write(f"port {self.port} ; port_plimit {plimit}\r") def bias(self, bias): - if not (bias >= 0 and bias <= 1): - raise MotorException("bias should be 0 to 1") - self._write("port {} ; bias {}\r".format(self.port, bias)) + """Bias motor + + :param bias: Value 0 to 1 + :raises MotorError: Occurs if invalid bias value passed + + .. deprecated:: 0.6.0 + """ # noqa: RST303 + raise MotorError("Bias no longer available") + + +class MotorRunmode(Enum): + """Current mode motor is in""" + + NONE = 0 + FREE = 1 + DEGREES = 2 + SECONDS = 3 + class Motor(Device): """Motor device :param port: Port of device - :raises DeviceInvalid: Occurs if there is no motor attached to port + :raises DeviceError: Occurs if there is no motor attached to port """ - # See hub-python-module/drivers/m_sched_shortcake.h - MOTOR_SET = set([38, 46, 47, 48, 49, 65, 75, 76]) def __init__(self, port): + """Initialise motor + + :param port: Port of device + """ super().__init__(port) - if self.typeid not in Motor.MOTOR_SET: - raise DeviceInvalid('There is not a motor connected to port %s (Found %s)' % (port, self.name)) self.default_speed = 20 - self.mode([(1,0),(2,0),(3,0)]) + self._currentspeed = 0 + if self._typeid in {38}: + self.mode([(1, 0), (2, 0)]) + self._combi = "1 0 2 0" + self._noapos = True + else: + self.mode([(1, 0), (2, 0), (3, 0)]) + self._combi = "1 0 2 0 3 0" + self._noapos = False self.plimit(0.7) - self.bias(0.3) + self.pwmparams(0.65, 0.01) + self._rpm = False self._release = True self._bqueue = deque(maxlen=5) self._cvqueue = Condition() self.when_rotated = None self._oldpos = None + self._runmode = MotorRunmode.NONE + + def set_speed_unit_rpm(self, rpm=False): + """Set whether to use RPM for speed units or not + + :param rpm: Boolean to determine whether to use RPM for units + """ + self._rpm = rpm def set_default_speed(self, default_speed): - """Sets the default speed of the motor + """Set the default speed of the motor :param default_speed: Speed ranging from -100 to 100 + :raises MotorError: Occurs if invalid speed passed """ if not (default_speed >= -100 and default_speed <= 100): - raise MotorException("Invalid Speed") + raise MotorError("Invalid Speed") self.default_speed = default_speed - def _isfinishedcb(self, speed, pos, apos): - self._bqueue.append(pos) - def run_for_rotations(self, rotations, speed=None, blocking=True): - """Runs motor for N rotations + """Run motor for N rotations :param rotations: Number of rotations :param speed: Speed ranging from -100 to 100 + :param blocking: Whether call should block till finished + :raises MotorError: Occurs if invalid speed passed """ + self._runmode = MotorRunmode.DEGREES if speed is None: self.run_for_degrees(int(rotations * 360), self.default_speed, blocking) else: if not (speed >= -100 and speed <= 100): - raise MotorException("Invalid Speed") + raise MotorError("Invalid Speed") self.run_for_degrees(int(rotations * 360), speed, blocking) def _run_for_degrees(self, degrees, speed): + self._runmode = MotorRunmode.DEGREES mul = 1 if speed < 0: speed = abs(speed) mul = -1 pos = self.get_position() - newpos = ((degrees*mul)+pos)/360.0 + newpos = ((degrees * mul) + pos) / 360.0 pos /= 360.0 - speed *= 0.05 - dur = abs((newpos - pos) / speed) - cmd = "port {} ; combi 0 1 0 2 0 3 0 ; select 0 ; pid {} 0 1 s4 0.0027777778 0 5 0 .1 3 ; set ramp {} {} {} 0\r".format(self.port, - self.port, pos, newpos, dur) - self._write(cmd) - with self._hat.rampcond[self.port]: - self._hat.rampcond[self.port].wait() - if self._release: - time.sleep(0.2) - self.coast() + self._run_positional_ramp(pos, newpos, speed) + self._runmode = MotorRunmode.NONE def _run_to_position(self, degrees, speed, direction): + self._runmode = MotorRunmode.DEGREES data = self.get() pos = data[1] - apos = data[2] - diff = (degrees-apos+180) % 360 - 180 - newpos = (pos + diff)/360 - v1 = (degrees - apos)%360 - v2 = (apos - degrees)%360 + if self._noapos: + apos = pos + else: + apos = data[2] + diff = (degrees - apos + 180) % 360 - 180 + newpos = (pos + diff) / 360 + v1 = (degrees - apos) % 360 + v2 = (apos - degrees) % 360 mul = 1 if diff > 0: mul = -1 @@ -142,121 +190,181 @@ def _run_to_position(self, degrees, speed, direction): if direction == "shortest": pass elif direction == "clockwise": - newpos = (pos + diff[1])/360 + newpos = (pos + diff[1]) / 360 elif direction == "anticlockwise": - newpos = (pos + diff[0])/360 + newpos = (pos + diff[0]) / 360 else: - raise DirectionInvalid("Invalid direction, should be: shortest, clockwise or anticlockwise") + raise MotorError("Invalid direction, should be: shortest, clockwise or anticlockwise") + # Convert current motor position to decimal rotations from preset position to match newpos units pos /= 360.0 - speed *= 0.05 + self._run_positional_ramp(pos, newpos, speed) + self._runmode = MotorRunmode.NONE + + def _run_positional_ramp(self, pos, newpos, speed): + """Ramp motor + + :param pos: Current motor position in decimal rotations (from preset position) + :param newpos: New motor postion in decimal rotations (from preset position) + :param speed: -100 to 100 + """ + if self._rpm: + speed = self._speed_process(speed) + else: + speed *= 0.05 # Collapse speed range to -5 to 5 dur = abs((newpos - pos) / speed) - cmd = "port {} ; combi 0 1 0 2 0 3 0 ; select 0 ; pid {} 0 1 s4 0.0027777778 0 5 0 .1 3 ; set ramp {} {} {} 0\r".format(self.port, - self.port, pos, newpos, dur) + cmd = (f"port {self.port}; select 0 ; selrate {self._interval}; " + f"pid {self.port} 0 1 s4 0.0027777778 0 5 0 .1 3 0.01; " + f"set ramp {pos} {newpos} {dur} 0\r") + ftr = Future() + self._hat.rampftr[self.port].append(ftr) self._write(cmd) - with self._hat.rampcond[self.port]: - self._hat.rampcond[self.port].wait() + ftr.result() if self._release: time.sleep(0.2) self.coast() def run_for_degrees(self, degrees, speed=None, blocking=True): - """Runs motor for N degrees + """Run motor for N degrees Speed of 1 means 1 revolution / second :param degrees: Number of degrees to rotate :param speed: Speed ranging from -100 to 100 + :param blocking: Whether call should block till finished + :raises MotorError: Occurs if invalid speed passed """ + self._runmode = MotorRunmode.DEGREES if speed is None: speed = self.default_speed if not (speed >= -100 and speed <= 100): - raise MotorException("Invalid Speed") + raise MotorError("Invalid Speed") if not blocking: - th = threading.Thread(target=self._run_for_degrees, args=(degrees, speed)) - th.daemon = True - th.start() + self._queue((self._run_for_degrees, (degrees, speed))) else: + self._wait_for_nonblocking() self._run_for_degrees(degrees, speed) def run_to_position(self, degrees, speed=None, blocking=True, direction="shortest"): - """Runs motor to position (in degrees) + """Run motor to position (in degrees) - :param degrees: Position in degrees + :param degrees: Position in degrees from -180 to 180 :param speed: Speed ranging from 0 to 100 + :param blocking: Whether call should block till finished + :param direction: shortest (default)/clockwise/anticlockwise + :raises MotorError: Occurs if invalid speed or angle passed """ + self._runmode = MotorRunmode.DEGREES if speed is None: speed = self.default_speed if not (speed >= 0 and speed <= 100): - raise MotorException("Invalid Speed") + raise MotorError("Invalid Speed") if degrees < -180 or degrees > 180: - raise MotorException("Invalid angle") + raise MotorError("Invalid angle") if not blocking: - th = threading.Thread(target=self._run_to_position, args=(degrees, speed, direction)) - th.daemon = True - th.start() + self._queue((self._run_to_position, (degrees, speed, direction))) else: + self._wait_for_nonblocking() self._run_to_position(degrees, speed, direction) def _run_for_seconds(self, seconds, speed): - cmd = "port {} ; combi 0 1 0 2 0 3 0 ; select 0 ; pid {} 0 0 s1 1 0 0.003 0.01 0 100; set pulse {} 0.0 {} 0\r".format(self.port, self.port, speed, seconds); - self._write(cmd); - with self._hat.pulsecond[self.port]: - self._hat.pulsecond[self.port].wait() + speed = self._speed_process(speed) + self._runmode = MotorRunmode.SECONDS + if self._rpm: + pid = f"pid_diff {self.port} 0 5 s2 0.0027777778 1 0 2.5 0 .4 0.01; " + else: + pid = f"pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100 0.01;" + cmd = (f"port {self.port} ; select 0 ; selrate {self._interval}; " + f"{pid}" + f"set pulse {speed} 0.0 {seconds} 0\r") + ftr = Future() + self._hat.pulseftr[self.port].append(ftr) + self._write(cmd) + ftr.result() if self._release: self.coast() + self._runmode = MotorRunmode.NONE def run_for_seconds(self, seconds, speed=None, blocking=True): - """Runs motor for N seconds + """Run motor for N seconds :param seconds: Time in seconds :param speed: Speed ranging from -100 to 100 + :param blocking: Whether call should block till finished + :raises MotorError: Occurs when invalid speed specified """ + self._runmode = MotorRunmode.SECONDS if speed is None: speed = self.default_speed if not (speed >= -100 and speed <= 100): - raise MotorException("Invalid Speed") + raise MotorError("Invalid Speed") if not blocking: - th = threading.Thread(target=self._run_for_seconds, args=(seconds, speed)) - th.daemon = True - th.start() + self._queue((self._run_for_seconds, (seconds, speed))) else: + self._wait_for_nonblocking() self._run_for_seconds(seconds, speed) def start(self, speed=None): """Start motor :param speed: Speed ranging from -100 to 100 + :raises MotorError: Occurs when invalid speed specified """ + self._wait_for_nonblocking() + if self._runmode == MotorRunmode.FREE: + if self._currentspeed == speed: + # Already running at this speed, do nothing + return + elif self._runmode != MotorRunmode.NONE: + # Motor is running some other mode, wait for it to stop or stop() it yourself + return + if speed is None: speed = self.default_speed else: if not (speed >= -100 and speed <= 100): - raise MotorException("Invalid Speed") - cmd = "port {} ; combi 0 1 0 2 0 3 0 ; select 0 ; pid {} 0 0 s1 1 0 0.003 0.01 0 100; set {}\r".format(self.port, self.port, speed) + raise MotorError("Invalid Speed") + speed = self._speed_process(speed) + cmd = f"port {self.port} ; set {speed}\r" + if self._runmode == MotorRunmode.NONE: + if self._rpm: + pid = f"pid_diff {self.port} 0 5 s2 0.0027777778 1 0 2.5 0 .4 0.01; " + else: + pid = f"pid {self.port} 0 0 s1 1 0 0.003 0.01 0 100 0.01; " + cmd = (f"port {self.port} ; select 0 ; selrate {self._interval}; " + f"{pid}" + f"set {speed}\r") + self._runmode = MotorRunmode.FREE + self._currentspeed = speed self._write(cmd) def stop(self): - """Stops motor""" + """Stop motor""" + self._wait_for_nonblocking() + self._runmode = MotorRunmode.NONE + self._currentspeed = 0 self.coast() def get_position(self): - """Gets position of motor with relation to preset position (can be negative or positive) + """Get position of motor with relation to preset position (can be negative or positive) - :return: Position of motor + :return: Position of motor in degrees from preset position :rtype: int """ return self.get()[1] def get_aposition(self): - """Gets absolute position of motor + """Get absolute position of motor - :return: Absolute position of motor + :return: Absolute position of motor from -180 to 180 :rtype: int """ - return self.get()[2] + if self._noapos: + raise MotorError("No absolute position with this motor") + else: + return self.get()[2] def get_speed(self): - """Gets speed of motor + """Get speed of motor :return: Speed of motor :rtype: int @@ -266,73 +374,164 @@ def get_speed(self): @property def when_rotated(self): """ - Handles rotation events + Handle rotation events :getter: Returns function to be called when rotated :setter: Sets function to be called when rotated + :return: Callback function """ return self._when_rotated - def _intermediate(self, value, speed, pos, apos): + def _intermediate(self, data): + if self._noapos: + speed, pos = data + apos = None + else: + speed, pos, apos = data if self._oldpos is None: self._oldpos = pos return if abs(pos - self._oldpos) >= 1: - value(speed, pos, apos) + if self._when_rotated is not None: + self._when_rotated(speed, pos, apos) self._oldpos = pos @when_rotated.setter def when_rotated(self, value): - """Calls back, when motor has been rotated""" - if value is not None: - self._when_rotated = lambda lst: [self._intermediate(value, lst[0], lst[1], lst[2]),self._isfinishedcb(lst[0], lst[1], lst[2])] - else: - self._when_rotated = lambda lst: self._isfinishedcb(lst[0], lst[1], lst[2]) - self.callback(self._when_rotated) + """Call back, when motor has been rotated + + :param value: Callback function + """ + self._when_rotated = value + self.callback(self._intermediate) def plimit(self, plimit): + """Limit power + + :param plimit: Value 0 to 1 + :raises MotorError: Occurs if invalid plimit value passed + """ if not (plimit >= 0 and plimit <= 1): - raise MotorException("plimit should be 0 to 1") - self._write("port {} ; plimit {}\r".format(self.port, plimit)) + raise MotorError("plimit should be 0 to 1") + self._write(f"port {self.port} ; port_plimit {plimit}\r") def bias(self, bias): - if not (bias >= 0 and bias <= 1): - raise MotorException("bias should be 0 to 1") - self._write("port {} ; bias {}\r".format(self.port, bias)) + """Bias motor + + :param bias: Value 0 to 1 + :raises MotorError: Occurs if invalid bias value passed + + .. deprecated:: 0.6.0 + """ # noqa: RST303 + raise MotorError("Bias no longer available") + + def pwmparams(self, pwmthresh, minpwm): + """PWM thresholds + + :param pwmthresh: Value 0 to 1, threshold below, will switch from fast to slow, PWM + :param minpwm: Value 0 to 1, threshold below which it switches off the drive altogether + :raises MotorError: Occurs if invalid values are passed + """ + if not (pwmthresh >= 0 and pwmthresh <= 1): + raise MotorError("pwmthresh should be 0 to 1") + if not (minpwm >= 0 and minpwm <= 1): + raise MotorError("minpwm should be 0 to 1") + self._write(f"port {self.port} ; pwmparams {pwmthresh} {minpwm}\r") def pwm(self, pwmv): + """PWM motor + + :param pwmv: Value -1 to 1 + :raises MotorError: Occurs if invalid pwm value passed + """ if not (pwmv >= -1 and pwmv <= 1): - raise MotorException("pwm should be -1 to 1") - self._write("port {} ; pwm ; set {}\r".format(self.port, pwmv)) + raise MotorError("pwm should be -1 to 1") + self._write(f"port {self.port} ; pwm ; set {pwmv}\r") def coast(self): - self._write("port {} ; coast\r".format(self.port)) + """Coast motor""" + self._write(f"port {self.port} ; coast\r") def float(self): + """Float motor""" self.pwm(0) + @property + def release(self): + """Determine if motor is released after running, so can be turned by hand + + :getter: Returns whether motor is released, so can be turned by hand + :setter: Sets whether motor is released, so can be turned by hand + :return: Whether motor is released, so can be turned by hand + :rtype: bool + """ + return self._release + + @release.setter + def release(self, value): + """Determine if the motor is released after running, so can be turned by hand + + :param value: Whether motor should be released, so can be turned by hand + :type value: bool + """ + if not isinstance(value, bool): + raise MotorError("Must pass boolean") + self._release = value + + def _queue(self, cmd): + Device._instance.motorqueue[self.port].put(cmd) + + def _wait_for_nonblocking(self): + """Wait for nonblocking commands to finish""" + Device._instance.motorqueue[self.port].join() + + def _speed_process(self, speed): + """Lower speed value""" + if self._rpm: + return speed / 60 + else: + return speed + + class MotorPair: """Pair of motors :param motora: One of the motors to drive :param motorb: Other motor in pair to drive - :raises DeviceInvalid: Occurs if there is no motor attached to port + :raises DeviceError: Occurs if there is no motor attached to port """ + def __init__(self, leftport, rightport): + """Initialise pair of motors + + :param leftport: Left motor port + :param rightport: Right motor port + """ super().__init__() self._leftmotor = Motor(leftport) self._rightmotor = Motor(rightport) self.default_speed = 20 + self._release = True + self._rpm = False def set_default_speed(self, default_speed): - """Sets the default speed of the motor + """Set the default speed of the motor :param default_speed: Speed ranging from -100 to 100 """ self.default_speed = default_speed + def set_speed_unit_rpm(self, rpm=False): + """Set whether to use RPM for speed units or not + + :param rpm: Boolean to determine whether to use RPM for units + """ + self._rpm = rpm + self._leftmotor.set_speed_unit_rpm(rpm) + self._rightmotor.set_speed_unit_rpm(rpm) + def run_for_rotations(self, rotations, speedl=None, speedr=None): - """Runs pair of motors for N rotations + """Run pair of motors for N rotations :param rotations: Number of rotations :param speedl: Speed ranging from -100 to 100 @@ -345,7 +544,7 @@ def run_for_rotations(self, rotations, speedl=None, speedr=None): self.run_for_degrees(int(rotations * 360), speedl, speedr) def run_for_degrees(self, degrees, speedl=None, speedr=None): - """Runs pair of motors for degrees + """Run pair of motors for degrees :param degrees: Number of degrees :param speedl: Speed ranging from -100 to 100 @@ -365,7 +564,7 @@ def run_for_degrees(self, degrees, speedl=None, speedr=None): th2.join() def run_for_seconds(self, seconds, speedl=None, speedr=None): - """Runs pair for N seconds + """Run pair for N seconds :param seconds: Time in seconds :param speedl: Speed ranging from -100 to 100 @@ -387,7 +586,8 @@ def run_for_seconds(self, seconds, speedl=None, speedr=None): def start(self, speedl=None, speedr=None): """Start motors - :param speed: Speed ranging from -100 to 100 + :param speedl: Speed ranging from -100 to 100 + :param speedr: Speed ranging from -100 to 100 """ if speedl is None: speedl = self.default_speed @@ -402,11 +602,12 @@ def stop(self): self._rightmotor.stop() def run_to_position(self, degreesl, degreesr, speed=None, direction="shortest"): - """Runs pair to position (in degrees) + """Run pair to position (in degrees) - :param degreesl: Position in degrees for left motor - :param degreesr: Position in degrees for right motor - :param speed: Speed ranging from -100 to 100 + :param degreesl: Position in degrees for left motor + :param degreesr: Position in degrees for right motor + :param speed: Speed ranging from -100 to 100 + :param direction: shortest (default)/clockwise/anticlockwise """ if speed is None: th1 = threading.Thread(target=self._leftmotor._run_to_position, args=(degreesl, self.default_speed, direction)) @@ -420,3 +621,27 @@ def run_to_position(self, degreesl, degreesr, speed=None, direction="shortest"): th2.start() th1.join() th2.join() + + @property + def release(self): + """Determine if motors are released after running, so can be turned by hand + + :getter: Returns whether motors are released, so can be turned by hand + :setter: Sets whether motors are released, so can be turned by hand + :return: Whether motors are released, so can be turned by hand + :rtype: bool + """ + return self._release + + @release.setter + def release(self, value): + """Determine if motors are released after running, so can be turned by hand + + :param value: Whether motors should be released, so can be turned by hand + :type value: bool + """ + if not isinstance(value, bool): + raise MotorError("Must pass boolean") + self._release = value + self._leftmotor.release = value + self._rightmotor.release = value diff --git a/buildhat/serinterface.py b/buildhat/serinterface.py index 404870f..901dc2e 100644 --- a/buildhat/serinterface.py +++ b/buildhat/serinterface.py @@ -1,85 +1,135 @@ -from .exc import DeviceNotFound, DeviceChanged, DeviceInvalid, HatNotFound -import importlib +"""Build HAT handling functionality""" + +import logging +import os +import queue +import tempfile import threading -import gpiozero -import serial import time -import queue +from enum import Enum from threading import Condition, Timer + +import serial from gpiozero import DigitalOutputDevice -from enum import Enum + +from .exc import BuildHATError + class HatState(Enum): + """Current state that hat is in""" + OTHER = 0 FIRMWARE = 1 NEEDNEWFIRMWARE = 2 BOOTLOADER = 3 + class Connection: + """Connection information for a port""" + def __init__(self): + """Initialise connection""" self.typeid = -1 self.connected = False self.callit = None + self.simplemode = -1 + self.combimode = -1 def update(self, typeid, connected, callit=None): + """Update connection information for port + + :param typeid: Type ID of device on port + :param connected: Whether device is connected or not + :param callit: Callback function + """ self.typeid = typeid self.connected = connected self.callit = callit + def cmp(str1, str2): + """Look for str2 in str1 + + :param str1: String to look in + :param str2: String to look for + :return: Whether str2 exists + """ return str1[:len(str2)] == str2 + class BuildHAT: - CONNECTED=": connected to active ID" - CONNECTEDPASSIVE=": connected to passive ID" - DISCONNECTED=": disconnected" - DEVTIMEOUT=": timeout during data phase: disconnecting" - NOTCONNECTED=": no device detected" - PULSEDONE=": pulse done" - RAMPDONE=": ramp done" - FIRMWARE="Firmware version: " - BOOTLOADER="BuildHAT bootloader version" - DONE="Done initialising ports" - - def __init__(self, firmware, signature, version, device="/dev/serial0"): + """Interacts with Build HAT via UART interface""" + + CONNECTED = ": connected to active ID" + CONNECTEDPASSIVE = ": connected to passive ID" + DISCONNECTED = ": disconnected" + DEVTIMEOUT = ": timeout during data phase: disconnecting" + NOTCONNECTED = ": no device detected" + PULSEDONE = ": pulse done" + RAMPDONE = ": ramp done" + FIRMWARE = "Firmware version: " + BOOTLOADER = "BuildHAT bootloader version" + DONE = "Done initialising ports" + PROMPT = "BHBL>" + RESET_GPIO_NUMBER = 4 + BOOT0_GPIO_NUMBER = 22 + + def __init__(self, firmware, signature, version, device="/dev/serial0", debug=False): + """Interact with Build HAT + + :param firmware: Firmware file + :param signature: Signature file + :param version: Firmware version + :param device: Serial device to use + :param debug: Optional boolean to log debug information + :raises BuildHATError: Occurs if can't find HAT + """ self.cond = Condition() self.state = HatState.OTHER self.connections = [] - self.portcond = [] - self.pulsecond = [] - self.rampcond = [] + self.portftr = [] + self.pulseftr = [] + self.rampftr = [] + self.vinftr = [] + self.motorqueue = [] self.fin = False self.running = True + self.debug_filename = None + if debug: + tmp = tempfile.NamedTemporaryFile(suffix=".log", prefix="buildhat-", delete=False) + self.debug_filename = tmp.name + logging.basicConfig(filename=tmp.name, format='%(asctime)s %(message)s', + level=logging.DEBUG) - for i in range(4): + for _ in range(4): self.connections.append(Connection()) - self.portcond.append(Condition()) - self.pulsecond.append(Condition()) - self.rampcond.append(Condition()) + self.portftr.append([]) + self.pulseftr.append([]) + self.rampftr.append([]) + self.motorqueue.append(queue.Queue()) + # On a Pi 5 /dev/serial0 will point to /dev/ttyAMA10 (which *only* + # exists on a Pi 5, and is the 3-pin debug UART connector) + # The UART on the Pi 5 GPIO header is /dev/ttyAMA0 + if device == "/dev/serial0" and os.readlink(device) == "ttyAMA10": + device = "/dev/ttyAMA0" self.ser = serial.Serial(device, 115200, timeout=5) # Check if we're in the bootloader or the firmware self.write(b"version\r") incdata = 0 while True: - try: - line = self.ser.readline().decode('utf-8', 'ignore') - except serial.SerialException: - pass - if len(line) == 0: - # Didn't recieve any data - break - if line[:len(BuildHAT.FIRMWARE)] == BuildHAT.FIRMWARE: + line = self.read() + if cmp(line, BuildHAT.FIRMWARE): self.state = HatState.FIRMWARE - ver = line[len(BuildHAT.FIRMWARE):].split(' ') + ver = line[len(BuildHAT.FIRMWARE):].split(' ') if int(ver[0]) == version: self.state = HatState.FIRMWARE break else: self.state = HatState.NEEDNEWFIRMWARE break - elif line[:len(BuildHAT.BOOTLOADER)] == BuildHAT.BOOTLOADER: + elif cmp(line, BuildHAT.BOOTLOADER): self.state = HatState.BOOTLOADER break else: @@ -89,30 +139,36 @@ def __init__(self, firmware, signature, version, device="/dev/serial0"): break else: self.write(b"version\r") - # Use to force hat reset - #self.state = HatState.NEEDNEWFIRMWARE + if self.state == HatState.NEEDNEWFIRMWARE: self.resethat() self.loadfirmware(firmware, signature) elif self.state == HatState.BOOTLOADER: self.loadfirmware(firmware, signature) elif self.state == HatState.OTHER: - raise HatNotFound() + raise BuildHATError("HAT not found") self.cbqueue = queue.Queue() self.cb = threading.Thread(target=self.callbackloop, args=(self.cbqueue,)) self.cb.daemon = True self.cb.start() + for q in self.motorqueue: + ml = threading.Thread(target=self.motorloop, args=(q,)) + ml.daemon = True + ml.start() + # Drop timeout value to 1s + listevt = threading.Event() self.ser.timeout = 1 - self.th = threading.Thread(target=self.loop, args=(self.cond, self.state == HatState.FIRMWARE, self.cbqueue)) + self.th = threading.Thread(target=self.loop, args=(self.cond, self.state == HatState.FIRMWARE, self.cbqueue, listevt)) self.th.daemon = True self.th.start() if self.state == HatState.FIRMWARE: self.write(b"port 0 ; select ; port 1 ; select ; port 2 ; select ; port 3 ; select ; echo 0\r") self.write(b"list\r") + listevt.set() elif self.state == HatState.NEEDNEWFIRMWARE or self.state == HatState.BOOTLOADER: self.write(b"reboot\r") @@ -121,10 +177,9 @@ def __init__(self, firmware, signature, version, device="/dev/serial0"): self.cond.wait() def resethat(self): - RESET_GPIO_NUMBER = 4 - BOOT0_GPIO_NUMBER = 22 - reset = DigitalOutputDevice(RESET_GPIO_NUMBER) - boot0 = DigitalOutputDevice(BOOT0_GPIO_NUMBER) + """Reset the HAT""" + reset = DigitalOutputDevice(BuildHAT.RESET_GPIO_NUMBER) + boot0 = DigitalOutputDevice(BuildHAT.BOOT0_GPIO_NUMBER) boot0.off() reset.off() time.sleep(0.01) @@ -135,38 +190,46 @@ def resethat(self): time.sleep(0.5) def loadfirmware(self, firmware, signature): + """Load firmware + + :param firmware: Firmware to load + :param signature: Signature to load + """ with open(firmware, "rb") as f: firm = f.read() with open(signature, "rb") as f: sig = f.read() self.write(b"clear\r") self.getprompt() - self.write("load {} {}\r".format(len(firm), self.checksum(firm)).encode()) + self.write(f"load {len(firm)} {self.checksum(firm)}\r".encode()) time.sleep(0.1) - self.write(b"\x02") - self.write(firm) - self.write(b"\x03\r") + self.write(b"\x02", replace="0x02") + self.write(firm, replace="--firmware file--") + self.write(b"\x03\r", replace="0x03") self.getprompt() - self.write("signature {}\r".format(len(sig)).encode()) + self.write(f"signature {len(sig)}\r".encode()) time.sleep(0.1) - self.write(b"\x02") - self.write(sig) - self.write(b"\x03\r") + self.write(b"\x02", replace="0x02") + self.write(sig, replace="--signature file--") + self.write(b"\x03\r", replace="0x03") self.getprompt() def getprompt(self): - # Need to decide what we will do, when no prompt - PROMPT="BHBL>" + """Loop until prompt is found + + Need to decide what we will do, when no prompt + """ while True: - line = b"" - try: - line = self.ser.readline().decode('utf-8', 'ignore') - except serial.SerialException: - pass - if line[:len(PROMPT)] == PROMPT: + line = self.read() + if cmp(line, BuildHAT.PROMPT): break def checksum(self, data): + """Calculate checksum from data + + :param data: Data to calculate the checksum from + :return: Checksum that has been calculated + """ u = 1 for i in range(0, len(data)): if (u & 0x80000000) != 0: @@ -176,54 +239,112 @@ def checksum(self, data): u = (u ^ data[i]) & 0xFFFFFFFF return u - def write(self, data): + def write(self, data, log=True, replace=""): + """Write data to the serial port of Build HAT + + :param data: Data to write to Build HAT + :param log: Whether to log line or not + :param replace: Whether to log an alternative string + """ self.ser.write(data) + if not self.fin and log: + if replace != "": + logging.debug(f"> {replace}") + else: + logging.debug(f"> {data.decode('utf-8', 'ignore').strip()}") + + def read(self): + """Read data from the serial port of Build HAT + + :return: Line that has been read + """ + line = "" + try: + line = self.ser.readline().decode('utf-8', 'ignore').strip() + except serial.SerialException: + pass + if line != "": + logging.debug(f"< {line}") + return line def shutdown(self): + """Turn off the Build HAT devices""" if not self.fin: self.fin = True self.running = False self.th.join() + self.cbqueue.put(()) + for q in self.motorqueue: + q.put((None, None)) + self.cb.join() turnoff = "" for p in range(4): conn = self.connections[p] if conn.typeid != 64: - turnoff += "port {} ; pwm ; coast ; off ".format(p) + turnoff += f"port {p} ; pwm ; coast ; off ;" else: - self.write("port {} ; write1 {}\r".format(p, ' '.join('{:x}'.format(h) for h in [0xc2,0,0,0,0,0,0,0,0,0])).encode()) - self.write("{}\r".format(turnoff).encode()) + hexstr = ' '.join(f'{h:x}' for h in [0xc2, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.write(f"port {p} ; write1 {hexstr}\r".encode()) + self.write(f"{turnoff}\r".encode()) self.write(b"port 0 ; select ; port 1 ; select ; port 2 ; select ; port 3 ; select ; echo 0\r") + def motorloop(self, q): + """Event handling for non-blocking motor commands + + :param q: Queue of motor functions + """ + while self.running: + func, data = q.get() + if func is None: + break + else: + func(*data) + func = None # Necessary for 'del' to function correctly on motor object + data = None + q.task_done() + def callbackloop(self, q): + """Event handling for callbacks + + :param q: Queue of callback events + """ while self.running: cb = q.get() - cb[0](cb[1]) + # Test for empty tuple, which should only be passed when + # we're shutting down + if len(cb) == 0: + continue + if not cb[0]._alive: + continue + cb[0]()(cb[1]) q.task_done() - def loop(self, cond, uselist, q): + def loop(self, cond, uselist, q, listevt): + """Event handling for Build HAT + + :param cond: Condition used to block user's script till we're ready + :param uselist: Whether we're using the HATs 'list' function or not + :param q: Queue for callback events + """ count = 0 while self.running: - line = b"" - try: - line = self.ser.readline().decode('utf-8', 'ignore') - except serial.SerialException: - pass + line = self.read() if len(line) == 0: continue if line[0] == "P" and line[2] == ":": portid = int(line[1]) msg = line[2:] if cmp(msg, BuildHAT.CONNECTED): - typeid = int(line[2+len(BuildHAT.CONNECTED):],16) + typeid = int(line[2 + len(BuildHAT.CONNECTED):], 16) self.connections[portid].update(typeid, True) if typeid == 64: - self.write("port {} ; on\r".format(portid).encode()) - if uselist: + self.write(f"port {portid} ; on\r".encode()) + if uselist and listevt.is_set(): count += 1 elif cmp(msg, BuildHAT.CONNECTEDPASSIVE): - typeid = int(line[2+len(BuildHAT.CONNECTEDPASSIVE):],16) + typeid = int(line[2 + len(BuildHAT.CONNECTEDPASSIVE):], 16) self.connections[portid].update(typeid, True) - if uselist: + if uselist and listevt.is_set(): count += 1 elif cmp(msg, BuildHAT.DISCONNECTED): self.connections[portid].update(-1, False) @@ -231,29 +352,30 @@ def loop(self, cond, uselist, q): self.connections[portid].update(-1, False) elif cmp(msg, BuildHAT.NOTCONNECTED): self.connections[portid].update(-1, False) - if uselist: + if uselist and listevt.is_set(): count += 1 elif cmp(msg, BuildHAT.RAMPDONE): - with self.rampcond[portid]: - self.rampcond[portid].notify() + ftr = self.rampftr[portid].pop() + ftr.set_result(True) elif cmp(msg, BuildHAT.PULSEDONE): - with self.pulsecond[portid]: - self.pulsecond[portid].notify() + ftr = self.pulseftr[portid].pop() + ftr.set_result(True) if uselist and count == 4: with cond: + uselist = False cond.notify() if not uselist and cmp(line, BuildHAT.DONE): def runit(): with cond: cond.notify() - t = Timer(8.0, runit) + t = Timer(11.0, runit) t.start() if line[0] == "P" and (line[2] == "C" or line[2] == "M"): portid = int(line[1]) - data = line[5:].strip().split(" ") + data = line[5:].split(" ") newdata = [] for d in data: if "." in d: @@ -261,9 +383,22 @@ def runit(): else: if d != "": newdata.append(int(d)) + # Check data was for our current mode + if line[2] == "M" and self.connections[portid].simplemode != int(line[3]): + continue + elif line[2] == "C" and self.connections[portid].combimode != int(line[3]): + continue callit = self.connections[portid].callit if callit is not None: q.put((callit, newdata)) self.connections[portid].data = newdata - with self.portcond[portid]: - self.portcond[portid].notify() + try: + ftr = self.portftr[portid].pop() + ftr.set_result(newdata) + except IndexError: + pass + + if len(line) >= 5 and line[1] == "." and line.endswith(" V"): + vin = float(line.split(" ")[0]) + ftr = self.vinftr.pop() + ftr.set_result(vin) diff --git a/buildhat/wedo.py b/buildhat/wedo.py index 1abdb5b..372524e 100644 --- a/buildhat/wedo.py +++ b/buildhat/wedo.py @@ -1,44 +1,91 @@ +"""WeDo sensor handling functionality""" + from .devices import Device -from .exc import DeviceInvalid + class TiltSensor(Device): """Tilt sensor :param port: Port of device - :raises DeviceInvalid: Occurs if there is no tilt sensor attached to port + :raises DeviceError: Occurs if there is no tilt sensor attached to port """ + def __init__(self, port): + """ + Initialise tilt sensor + + :param port: Port of device + """ super().__init__(port) - if self.typeid != 34: - raise DeviceInvalid('There is not a tilt sensor connected to port %s (Found %s)' % (port, self.name)) self.mode(0) def get_tilt(self): """ - Returns the tilt from tilt sensor to object + Return the tilt from tilt sensor :return: Tilt from tilt sensor :rtype: tuple """ return tuple(self.get()) + class MotionSensor(Device): """Motion sensor :param port: Port of device - :raises DeviceInvalid: Occurs if there is no motion sensor attached to port + :raises DeviceError: Occurs if there is no motion sensor attached to port """ + + default_mode = 0 + def __init__(self, port): + """ + Initialise motion sensor + + :param port: Port of device + """ super().__init__(port) - if self.typeid != 35: - raise DeviceInvalid('There is not a motion sensor connected to port %s (Found %s)' % (port, self.name)) - self.mode(0) + self.mode(self.default_mode) + + def set_default_data_mode(self, mode): + """ + Set the mode most often queried from this device. + + This significantly improves performance when repeatedly accessing data + + :param mode: 0 for distance (default), 1 for movement count + """ + if mode == 1 or mode == 0: + self.default_mode = mode + self.mode(mode) def get_distance(self): """ - Returns the distance from motion sensor to object + Return the distance from motion sensor :return: Distance from motion sensor :rtype: int """ - return self.get()[0] + return self._get_data_from_mode(0) + + def get_movement_count(self): + """ + Return the movement counter + + This is the count of how many times the sensor has detected an object + that moved within 4 blocks of the sensor since the sensor has been + plugged in or the BuildHAT reset + + :return: Count of objects detected + :rtype: int + """ + return self._get_data_from_mode(1) + + def _get_data_from_mode(self, mode): + if self.default_mode == mode: + return self.get()[0] + else: + self.mode(mode) + retval = self.get()[0] + self.mode(self.default_mode) + return retval diff --git a/docs/buildhat/color.py b/docs/buildhat/color.py index 913cc2d..5f63812 100755 --- a/docs/buildhat/color.py +++ b/docs/buildhat/color.py @@ -1,3 +1,5 @@ +"""Example getting color values from color sensor""" + from buildhat import ColorSensor color = ColorSensor('C') diff --git a/docs/buildhat/colordistance.py b/docs/buildhat/colordistance.py index cf783cd..d8a3466 100755 --- a/docs/buildhat/colordistance.py +++ b/docs/buildhat/colordistance.py @@ -1,7 +1,10 @@ +"""Example for color distance sensor""" + from buildhat import ColorDistanceSensor color = ColorDistanceSensor('C') +print("Distance", color.get_distance()) print("RGBI", color.get_color_rgb()) print("Ambient", color.get_ambient_light()) print("Reflected", color.get_reflected_light()) diff --git a/docs/buildhat/distance.py b/docs/buildhat/distance.py index d70df97..19f1963 100755 --- a/docs/buildhat/distance.py +++ b/docs/buildhat/distance.py @@ -1,23 +1,37 @@ +"""Example for distance sensor""" + from signal import pause -from buildhat import Motor, DistanceSensor + +from buildhat import DistanceSensor, Motor motor = Motor('A') dist = DistanceSensor('D', threshold_distance=100) print("Wait for in range") dist.wait_for_in_range(50) -motor.run_for_rotations(1) +motor.run_for_rotations(1) print("Wait for out of range") dist.wait_for_out_of_range(100) motor.run_for_rotations(2) + def handle_in(distance): + """Within range + + :param distance: Distance + """ print("in range", distance) + def handle_out(distance): + """Out of range + + :param distance: Distance + """ print("out of range", distance) + dist.when_in_range = handle_in dist.when_out_of_range = handle_out pause() diff --git a/docs/buildhat/force.py b/docs/buildhat/force.py index cd73acf..1c8f3cd 100755 --- a/docs/buildhat/force.py +++ b/docs/buildhat/force.py @@ -1,5 +1,8 @@ +"""Example using force sensor""" + from signal import pause -from buildhat import Motor, ForceSensor + +from buildhat import ForceSensor, Motor motor = Motor('A') button = ForceSensor('D', threshold_force=1) @@ -14,14 +17,25 @@ print("Wait for button to be pressed") button.wait_until_pressed() -motor.run_for_rotations(2) +motor.run_for_rotations(2) + def handle_pressed(force): + """Force sensor pressed + + :param force: Force value + """ print("pressed", force) + def handle_released(force): + """Force sensor released + + :param force: Force value + """ print("released", force) + button.when_pressed = handle_pressed button.when_released = handle_released pause() diff --git a/docs/buildhat/hat.py b/docs/buildhat/hat.py index 4a67ed8..2f95ad2 100755 --- a/docs/buildhat/hat.py +++ b/docs/buildhat/hat.py @@ -1,3 +1,5 @@ +"""Example to print devices attached to hat""" + from buildhat import Hat hat = Hat() diff --git a/docs/buildhat/images/lights.jpg b/docs/buildhat/images/lights.jpg new file mode 100644 index 0000000..af89c18 Binary files /dev/null and b/docs/buildhat/images/lights.jpg differ diff --git a/docs/buildhat/images/train_motor.jpg b/docs/buildhat/images/train_motor.jpg new file mode 100644 index 0000000..97e8124 Binary files /dev/null and b/docs/buildhat/images/train_motor.jpg differ diff --git a/docs/buildhat/index.rst b/docs/buildhat/index.rst index 2ce842b..f4f7fc4 100644 --- a/docs/buildhat/index.rst +++ b/docs/buildhat/index.rst @@ -28,10 +28,51 @@ power supply. For best results, use the `official Raspberry Pi Build HAT power s .. _official Raspberry Pi Build HAT power supply: http://raspberrypi.com/products/build-hat-power-supply +It is now possible to use custom firmware with the library. To do this you can follow the steps below. + +.. code-block:: + :caption: Using custom firmware + + sudo apt install cmake python3 build-essential gcc-arm-none-eabi libnewlib-arm-none-eabi libstdc++-arm-none-eabi-newlib + + git clone https://github.com/raspberrypi/pico-sdk.git --recursive + git clone https://github.com/raspberrypi/buildhat.git --recursive + + cd buildhat + export PICO_SDK_PATH="$(pwd)/../pico-sdk/" + make + + cd .. + mkdir test + cd test + mkdir data + + cp ../buildhat/firmware-pico/build/main.bin data/firmware.bin + cp ../buildhat/bhbl-pico/signature.bin data/signature.bin + cat ../buildhat/firmware-pico/version.h | sed 's/#define FWVERSION "//g; s/ .*//g' > data/version + +Then place your script, such as the following, within the test/ directory. + +.. code-block:: + :caption: Create test.py in test directory + + import time + from buildhat import Motor + + m = Motor('A') + m.start() + + time.sleep(5) + +Then use: ``python test.py`` in the test directory, to run your script with your custom firmware. + +Note if you want python to always reload the firmware from your **data/** directory each time +you run your script, simply write the value: -1 to **data/version**. + .. warning:: The API for the Build HAT is undergoing active development and is subject - to change. + to change. .. toctree:: :maxdepth: 2 @@ -40,7 +81,11 @@ power supply. For best results, use the `official Raspberry Pi Build HAT power s colordistancesensor.rst distancesensor.rst forcesensor.rst + light.rst matrix.rst + motionsensor.rst motor.rst motorpair.rst + passivemotor.rst + tiltsensor.rst hat.rst diff --git a/docs/buildhat/light.py b/docs/buildhat/light.py new file mode 100644 index 0000000..7a0e5bc --- /dev/null +++ b/docs/buildhat/light.py @@ -0,0 +1,11 @@ +"""Example turning on/off LED lights""" + +from time import sleep + +from buildhat import Light + +light = Light('A') + +light.on() +sleep(1) +light.off() diff --git a/docs/buildhat/light.rst b/docs/buildhat/light.rst new file mode 100644 index 0000000..69ec564 --- /dev/null +++ b/docs/buildhat/light.rst @@ -0,0 +1,33 @@ +Light +=========== + +.. image:: images/lights.jpg + :width: 200 + :alt: The LEGO Light + +|location_link| + +.. |location_link| raw:: html + + LEGO® Light 88005 + +|location_link2| + +.. |location_link2| raw:: html + + BrickLink item + + +The LEGO® 88005 Powered Up Light features 2 LED lights, connecting wire and connection point for LEGO® Powered Up components. + + +.. autoclass:: buildhat.Light + :members: + :inherited-members: + +Example +------- + +.. literalinclude:: light.py + + diff --git a/docs/buildhat/matrix.py b/docs/buildhat/matrix.py index 0b91461..63277e5 100755 --- a/docs/buildhat/matrix.py +++ b/docs/buildhat/matrix.py @@ -1,20 +1,23 @@ -from buildhat import Matrix -import time +"""Example driving LED matrix""" + import random +import time + +from buildhat import Matrix matrix = Matrix('C') -matrix.clear(("red",10)) +matrix.clear(("red", 10)) time.sleep(1) matrix.clear() time.sleep(1) -matrix.set_pixel((0,0), ("blue", 10)) -matrix.set_pixel((2,2), ("red", 10)) +matrix.set_pixel((0, 0), ("blue", 10)) +matrix.set_pixel((2, 2), ("red", 10)) time.sleep(1) while True: - out = [[(int(random.uniform(0,9)),10) for x in range(3)] for y in range(3)] + out = [[(int(random.uniform(0, 9)), 10) for x in range(3)] for y in range(3)] matrix.set_pixels(out) time.sleep(0.1) diff --git a/docs/buildhat/matrix.rst b/docs/buildhat/matrix.rst index d183680..9f9d942 100644 --- a/docs/buildhat/matrix.rst +++ b/docs/buildhat/matrix.rst @@ -35,6 +35,8 @@ Colours may be passed as string or integer parameters. - orange * - 9 - red + * - 10 + - white .. autoclass:: buildhat.Matrix :members: diff --git a/docs/buildhat/motion.py b/docs/buildhat/motion.py new file mode 100755 index 0000000..cf48829 --- /dev/null +++ b/docs/buildhat/motion.py @@ -0,0 +1,11 @@ +"""Example using motion sensor""" + +from time import sleep + +from buildhat import MotionSensor + +motion = MotionSensor('A') + +for _ in range(50): + print(motion.get_distance()) + sleep(0.1) diff --git a/docs/buildhat/motionsensor.rst b/docs/buildhat/motionsensor.rst new file mode 100644 index 0000000..e9d0f5f --- /dev/null +++ b/docs/buildhat/motionsensor.rst @@ -0,0 +1,19 @@ +Motion Sensor +============= + +|location_link| + +.. |location_link| raw:: html + + BrickLink item + +.. autoclass:: buildhat.MotionSensor + :members: + :inherited-members: + +Example +------- + +.. literalinclude:: motion.py + + diff --git a/docs/buildhat/motor.py b/docs/buildhat/motor.py index c9bc7a6..8cab2e8 100755 --- a/docs/buildhat/motor.py +++ b/docs/buildhat/motor.py @@ -1,13 +1,23 @@ -from signal import pause -from buildhat import Motor +"""Example driving motors""" + import time +from buildhat import Motor + motor = Motor('A') motorb = Motor('B') + def handle_motor(speed, pos, apos): + """Motor data + + :param speed: Speed of motor + :param pos: Position of motor + :param apos: Absolute position of motor + """ print("Motor", speed, pos, apos) + motor.when_rotated = handle_motor motor.set_default_speed(50) diff --git a/docs/buildhat/motorpair.py b/docs/buildhat/motorpair.py index e5a8858..d392cbf 100755 --- a/docs/buildhat/motorpair.py +++ b/docs/buildhat/motorpair.py @@ -1,3 +1,5 @@ +"""Example for pair of motors""" + from buildhat import MotorPair pair = MotorPair('C', 'D') diff --git a/docs/buildhat/passivemotor.py b/docs/buildhat/passivemotor.py new file mode 100755 index 0000000..68d0f26 --- /dev/null +++ b/docs/buildhat/passivemotor.py @@ -0,0 +1,13 @@ +"""Passive motor example""" + +import time + +from buildhat import PassiveMotor + +motor = PassiveMotor('A') + +print("Start motor") +motor.start() +time.sleep(3) +print("Stop motor") +motor.stop() diff --git a/docs/buildhat/passivemotor.rst b/docs/buildhat/passivemotor.rst new file mode 100644 index 0000000..357055e --- /dev/null +++ b/docs/buildhat/passivemotor.rst @@ -0,0 +1,40 @@ +PassiveMotor +============ + +LEGO® TECHNIC™ motors which do not have an integrated rotation sensor (encoder). + + +.. |location_link1| raw:: html + + LEGO® Train motor 88011 + +.. |location_link1b| raw:: html + + BrickLink item 88011 + + +.. |LTM| image:: images/train_motor.jpg + :width: 200 + :alt: The LEGO Train Motor + + +.. list-table:: Passive Motors + :widths: 50 50 200 + :header-rows: 0 + + + * - |location_link1| + - |location_link1b| + - |LTM| + + + +.. autoclass:: buildhat.PassiveMotor + :members: + :inherited-members: + +Example +------- + +.. literalinclude:: passivemotor.py + diff --git a/docs/buildhat/tilt.py b/docs/buildhat/tilt.py new file mode 100755 index 0000000..36ec8d1 --- /dev/null +++ b/docs/buildhat/tilt.py @@ -0,0 +1,11 @@ +"""Example using tilt sensor""" + +from time import sleep + +from buildhat import TiltSensor + +tilt = TiltSensor('A') + +for _ in range(50): + print(tilt.get_tilt()) + sleep(0.1) diff --git a/docs/buildhat/tiltsensor.rst b/docs/buildhat/tiltsensor.rst new file mode 100644 index 0000000..e428882 --- /dev/null +++ b/docs/buildhat/tiltsensor.rst @@ -0,0 +1,19 @@ +Tilt Sensor +=========== + +|location_link| + +.. |location_link| raw:: html + + BrickLink item + +.. autoclass:: buildhat.TiltSensor + :members: + :inherited-members: + +Example +------- + +.. literalinclude:: tilt.py + + diff --git a/docs/conf.py b/docs/conf.py index e17f1fd..aae1e0d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -59,7 +59,7 @@ # General information about the project. project = 'Raspberry Pi Build HAT' -copyright = '''2020-2021 - Raspberry Pi Foundation; +copyright = '''2020-2025 - Raspberry Pi Foundation; 2017-2020 - LEGO System A/S - Aastvej 1, 7190 Billund, DK.''' # The version info for the project you're documenting, acts as replacement for diff --git a/docs/license.rst b/docs/license.rst index 2ad53fd..af5c040 100644 --- a/docs/license.rst +++ b/docs/license.rst @@ -3,7 +3,7 @@ License information The MIT License (MIT) -Copyright (c) 2020-2021 - Raspberry Pi Foundation +Copyright (c) 2020-2025 - Raspberry Pi Foundation Copyright (c) 2017-2021 - LEGO System A/S - Aastvej 1, 7190 Billund, DK diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..8d7b912 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,11 @@ +flake8>=6.0.0 +flake8-bugbear>=22.10.27 +flake8-comprehensions>=3.10 +flake8-debugger +flake8-docstrings>=1.6.0 +flake8-isort>=5.0 +flake8-pylint +flake8-rst-docstrings +flake8-string-format +gpiozero +pyserial diff --git a/setup.py b/setup.py index 407d9b8..38ae4f1 100755 --- a/setup.py +++ b/setup.py @@ -1,11 +1,12 @@ #! /usr/bin/env python3 -# Copyright (c) 2020-2021 Raspberry Pi Foundation +"""Setup file""" + +# Copyright (c) 2020-2025 Raspberry Pi Foundation # # SPDX-License-Identifier: MIT -from setuptools import setup, Extension -from os import getenv +from setuptools import setup with open("README.md") as readme: long_description = readme.read() @@ -16,17 +17,16 @@ setup(name='buildhat', version=version, description='Build HAT Python library', + license='MIT', long_description=long_description, long_description_content_type="text/markdown", author='Raspberry Pi Foundation', - author_email = 'web@raspberrypi.org', - url = 'https://github.com/RaspberryPiFoundation/python-build-hat', - project_urls = { - 'Bug Tracker': "https://github.com/RaspberryPiFoundation/python-build-hat/issues", - }, + author_email='web@raspberrypi.org', + url='https://github.com/RaspberryPiFoundation/python-build-hat', + project_urls={'Bug Tracker': "https://github.com/RaspberryPiFoundation/python-build-hat/issues"}, packages=['buildhat'], package_data={ "": ["data/firmware.bin", "data/signature.bin", "data/version"], }, - python_requires = '>=3.7', - install_requires=['gpiozero','pyserial']), + python_requires='>=3.7', + install_requires=['gpiozero', 'pyserial']) diff --git a/test/color.py b/test/color.py new file mode 100644 index 0000000..ba23ff2 --- /dev/null +++ b/test/color.py @@ -0,0 +1,47 @@ +"""Test Color Sensor functionality""" +import time +import unittest + +from buildhat import ColorSensor + + +class TestColor(unittest.TestCase): + """Test color sensor functions""" + + def test_color_interval(self): + """Test color sensor interval""" + color = ColorSensor('A') + color.avg_reads = 1 + color.interval = 10 + count = 1000 + expected_dur = count * color.interval * 1e-3 + + start = time.time() + for _ in range(count): + color.get_ambient_light() + end = time.time() + diff = abs((end - start) - expected_dur) + self.assertLess(diff, 0.25) + + start = time.time() + for _ in range(count): + color.get_color_rgbi() + end = time.time() + diff = abs((end - start) - expected_dur) + self.assertLess(diff, 0.25) + + def test_caching(self): + """Test to make sure we're not reading cached data""" + color = ColorSensor('A') + color.avg_reads = 1 + color.interval = 10 + + for _ in range(100): + color.mode(2) + self.assertEqual(len(color.get()), 1) + color.mode(5) + self.assertEqual(len(color.get()), 4) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/distance.py b/test/distance.py new file mode 100644 index 0000000..3348314 --- /dev/null +++ b/test/distance.py @@ -0,0 +1,41 @@ +"""Test distance sensor""" + +import unittest + +from buildhat import DistanceSensor +from buildhat.exc import DeviceError + + +class TestDistance(unittest.TestCase): + """Test distance sensor""" + + def test_properties(self): + """Test properties of sensor""" + d = DistanceSensor('A') + self.assertIsInstance(d.distance, int) + self.assertIsInstance(d.threshold_distance, int) + + def test_distance(self): + """Test obtaining distance""" + d = DistanceSensor('A') + self.assertIsInstance(d.get_distance(), int) + + def test_eyes(self): + """Test lighting LEDs on sensor""" + d = DistanceSensor('A') + d.eyes(100, 100, 100, 100) + + def test_duplicate_port(self): + """Test using same port""" + d = DistanceSensor('A') # noqa: F841 + self.assertRaises(DeviceError, DistanceSensor, 'A') + + def test_del(self): + """Test deleting sensor""" + d = DistanceSensor('A') + del d + DistanceSensor('A') + + +if __name__ == '__main__': + unittest.main() diff --git a/test/hat.py b/test/hat.py new file mode 100644 index 0000000..9538ede --- /dev/null +++ b/test/hat.py @@ -0,0 +1,29 @@ +"""Test hat functionality""" + +import unittest + +from buildhat import Hat + + +class TestHat(unittest.TestCase): + """Test hat functions""" + + def test_vin(self): + """Test voltage measure function""" + h = Hat() + vin = h.get_vin() + self.assertGreaterEqual(vin, 7.2) + self.assertLessEqual(vin, 8.5) + + def test_get(self): + """Test getting list of devices""" + h = Hat() + self.assertIsInstance(h.get(), dict) + + def test_serial(self): + """Test setting serial device""" + Hat(device="/dev/serial0") + + +if __name__ == '__main__': + unittest.main() diff --git a/test/light.py b/test/light.py new file mode 100644 index 0000000..d004bc1 --- /dev/null +++ b/test/light.py @@ -0,0 +1,28 @@ +"""Test light functionality""" + +import time +import unittest + +from buildhat import Light +from buildhat.exc import LightError + + +class TestLight(unittest.TestCase): + """Test light functions""" + + def test_light(self): + """Test light functions""" + light = Light('A') + light.on() + light.brightness(0) + light.brightness(100) + time.sleep(1) + light.brightness(25) + time.sleep(1) + self.assertRaises(LightError, light.brightness, -1) + self.assertRaises(LightError, light.brightness, 101) + light.off() + + +if __name__ == '__main__': + unittest.main() diff --git a/test/matrix.py b/test/matrix.py new file mode 100644 index 0000000..63d652d --- /dev/null +++ b/test/matrix.py @@ -0,0 +1,87 @@ +"""Test matrix functionality""" + +import time +import unittest + +from buildhat import Matrix +from buildhat.exc import MatrixError + + +class TestMatrix(unittest.TestCase): + """Test matrix functions""" + + def test_matrix(self): + """Test setting matrix pixels""" + matrix = Matrix('A') + matrix.set_pixels([[(10, 10) for x in range(3)] for y in range(3)]) + time.sleep(1) + self.assertRaises(MatrixError, matrix.set_pixels, [[(10, 10) for x in range(3)] for y in range(4)]) + self.assertRaises(MatrixError, matrix.set_pixels, [[(10, 10) for x in range(4)] for y in range(3)]) + self.assertRaises(MatrixError, matrix.set_pixels, [[(11, 10) for x in range(3)] for y in range(3)]) + self.assertRaises(MatrixError, matrix.set_pixels, [[(-1, 10) for x in range(3)] for y in range(3)]) + self.assertRaises(MatrixError, matrix.set_pixels, [[(10, 11) for x in range(3)] for y in range(3)]) + self.assertRaises(MatrixError, matrix.set_pixels, [[(10, -1) for x in range(3)] for y in range(3)]) + self.assertRaises(MatrixError, matrix.set_pixels, [[("gold", 10) for x in range(3)] for y in range(3)]) + self.assertRaises(MatrixError, matrix.set_pixels, [[(10, "test") for x in range(3)] for y in range(3)]) + matrix.set_pixels([[("pink", 10) for x in range(3)] for y in range(3)]) + time.sleep(1) + + def test_clear(self): + """Test clearing matrix""" + matrix = Matrix('A') + matrix.clear() + matrix.clear((10, 10)) + time.sleep(1) + matrix.clear(("yellow", 10)) + time.sleep(1) + self.assertRaises(MatrixError, matrix.clear, ("gold", 10)) + self.assertRaises(MatrixError, matrix.clear, (10, -1)) + self.assertRaises(MatrixError, matrix.clear, (10, 11)) + self.assertRaises(MatrixError, matrix.clear, (-1, 10)) + self.assertRaises(MatrixError, matrix.clear, (10, 11)) + + def test_transition(self): + """Test transitions""" + matrix = Matrix('A') + matrix.clear(("green", 10)) + time.sleep(1) + matrix.set_transition(1) + matrix.set_pixels([[("blue", 10) for x in range(3)] for y in range(3)]) + time.sleep(4) + matrix.set_transition(2) + matrix.set_pixels([[("red", 10) for x in range(3)] for y in range(3)]) + time.sleep(4) + matrix.set_transition(0) + self.assertRaises(MatrixError, matrix.set_transition, -1) + self.assertRaises(MatrixError, matrix.set_transition, 3) + self.assertRaises(MatrixError, matrix.set_transition, "test") + + def test_level(self): + """Test level""" + matrix = Matrix('A') + matrix.clear(("orange", 10)) + time.sleep(1) + matrix.level(5) + time.sleep(4) + self.assertRaises(MatrixError, matrix.level, -1) + self.assertRaises(MatrixError, matrix.level, 10) + self.assertRaises(MatrixError, matrix.level, "test") + + def test_pixel(self): + """Test pixel""" + matrix = Matrix('A') + matrix.clear() + matrix.set_pixel((0, 0), ("red", 10)) + matrix.set_pixel((2, 2), ("red", 10)) + time.sleep(1) + self.assertRaises(MatrixError, matrix.set_pixel, (-1, 0), ("red", 10)) + self.assertRaises(MatrixError, matrix.set_pixel, (0, -1), ("red", 10)) + self.assertRaises(MatrixError, matrix.set_pixel, (3, 0), ("red", 10)) + self.assertRaises(MatrixError, matrix.set_pixel, (0, 3), ("red", 10)) + self.assertRaises(MatrixError, matrix.set_pixel, (0, 0), ("gold", 10)) + self.assertRaises(MatrixError, matrix.set_pixel, (0, 0), ("red", -1)) + self.assertRaises(MatrixError, matrix.set_pixel, (0, 0), ("red", 11)) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/motors.py b/test/motors.py index 3f0196b..46d01ec 100644 --- a/test/motors.py +++ b/test/motors.py @@ -1,31 +1,106 @@ -import unittest +"""Test motors""" + import time -from buildhat.exc import DeviceInvalid, DirectionInvalid, MotorException -from buildhat import Motor +import unittest + +from buildhat import Hat, Motor +from buildhat.exc import DeviceError, MotorError + + +class TestMotor(unittest.TestCase): + """Test motors""" -class TestMotorMethods(unittest.TestCase): + THRESHOLD_DISTANCE = 15 def test_rotations(self): + """Test motor rotating""" m = Motor('A') pos1 = m.get_position() - m.run_for_rotations(2) + m.run_for_rotations(2) pos2 = m.get_position() - rotated = (pos2 - pos1)/360 + rotated = (pos2 - pos1) / 360 self.assertLess(abs(rotated - 2), 0.5) + def test_nonblocking(self): + """Test motor nonblocking mode""" + m = Motor('A') + m.set_default_speed(10) + last = 0 + for delay in [1, 0]: + for _ in range(3): + m.run_to_position(90, blocking=False) + time.sleep(delay) + m.run_to_position(90, blocking=False) + time.sleep(delay) + m.run_to_position(90, blocking=False) + time.sleep(delay) + m.run_to_position(last, blocking=False) + time.sleep(delay) + # Wait for a bit, before reading last position + time.sleep(7) + pos1 = m.get_aposition() + diff = abs((last - pos1 + 180) % 360 - 180) + self.assertLess(diff, self.THRESHOLD_DISTANCE) + + def test_nonblocking_multiple(self): + """Test motor nonblocking mode""" + m1 = Motor('A') + m1.set_default_speed(10) + m2 = Motor('B') + m2.set_default_speed(10) + last = 0 + for delay in [1, 0]: + for _ in range(3): + m1.run_to_position(90, blocking=False) + m2.run_to_position(90, blocking=False) + time.sleep(delay) + m1.run_to_position(90, blocking=False) + m2.run_to_position(90, blocking=False) + time.sleep(delay) + m1.run_to_position(90, blocking=False) + m2.run_to_position(90, blocking=False) + time.sleep(delay) + m1.run_to_position(last, blocking=False) + m2.run_to_position(last, blocking=False) + time.sleep(delay) + # Wait for a bit, before reading last position + time.sleep(7) + pos1 = m1.get_aposition() + diff = abs((last - pos1 + 180) % 360 - 180) + self.assertLess(diff, self.THRESHOLD_DISTANCE) + pos2 = m2.get_aposition() + diff = abs((last - pos2 + 180) % 360 - 180) + self.assertLess(diff, self.THRESHOLD_DISTANCE) + + def test_nonblocking_mixed(self): + """Test motor nonblocking mode mixed with blocking mode""" + m = Motor('A') + m.run_for_seconds(5, blocking=False) + m.run_for_degrees(360) + m.run_for_seconds(5, blocking=False) + m.run_to_position(180) + m.run_for_seconds(5, blocking=False) + m.run_for_seconds(5) + m.run_for_seconds(5, blocking=False) + m.start() + m.run_for_seconds(5, blocking=False) + m.stop() + def test_position(self): + """Test motor goes to desired position""" m = Motor('A') m.run_to_position(0) pos1 = m.get_aposition() - diff = abs((0-pos1+180) % 360 - 180) - self.assertLess(diff, 10) + diff = abs((0 - pos1 + 180) % 360 - 180) + self.assertLess(diff, self.THRESHOLD_DISTANCE) m.run_to_position(180) pos1 = m.get_aposition() - diff = abs((180-pos1+180) % 360 - 180) - self.assertLess(diff, 10) + diff = abs((180 - pos1 + 180) % 360 - 180) + self.assertLess(diff, self.THRESHOLD_DISTANCE) def test_time(self): + """Test motor runs for correct duration""" m = Motor('A') t1 = time.time() m.run_for_seconds(5) @@ -33,33 +108,32 @@ def test_time(self): self.assertEqual(int(t2 - t1), 5) def test_speed(self): + """Test setting motor speed""" m = Motor('A') m.set_default_speed(50) - self.assertRaises(MotorException, m.set_default_speed, -101) - self.assertRaises(MotorException, m.set_default_speed, 101) + self.assertRaises(MotorError, m.set_default_speed, -101) + self.assertRaises(MotorError, m.set_default_speed, 101) def test_plimit(self): + """Test altering power limit of motor""" m = Motor('A') m.plimit(0.5) - self.assertRaises(MotorException, m.plimit, -1) - self.assertRaises(MotorException, m.plimit, 2) - - def test_bias(self): - m = Motor('A') - m.bias(0.5) - self.assertRaises(MotorException, m.bias, -1) - self.assertRaises(MotorException, m.bias, 2) + self.assertRaises(MotorError, m.plimit, -1) + self.assertRaises(MotorError, m.plimit, 2) def test_pwm(self): + """Test PWMing motor""" m = Motor('A') m.pwm(0.3) time.sleep(0.5) m.pwm(0) - self.assertRaises(MotorException, m.pwm, -2) - self.assertRaises(MotorException, m.pwm, 2) + self.assertRaises(MotorError, m.pwm, -2) + self.assertRaises(MotorError, m.pwm, 2) def test_callback(self): + """Test setting callback""" m = Motor('A') + def handle_motor(speed, pos, apos): handle_motor.evt += 1 handle_motor.evt = 0 @@ -67,5 +141,103 @@ def handle_motor(speed, pos, apos): m.run_for_seconds(1) self.assertGreater(handle_motor.evt, 0) + def test_callback_interval(self): + """Test setting callback and interval""" + m = Motor('A') + m.interval = 10 + + def handle_motor(speed, pos, apos): + handle_motor.evt += 1 + handle_motor.evt = 0 + m.when_rotated = handle_motor + m.run_for_seconds(5) + self.assertGreater(handle_motor.evt, 0.8 * ((1 / ((m.interval) * 1e-3)) * 5)) + + def test_none_callback(self): + """Test setting empty callback""" + m = Motor('A') + m.when_rotated = None + m.start() + time.sleep(0.5) + m.stop() + + def test_duplicate_port(self): + """Test using same port for motor""" + m1 = Motor('A') # noqa: F841 + self.assertRaises(DeviceError, Motor, 'A') + + def test_del(self): + """Test deleting motor""" + m1 = Motor('A') + del m1 + Motor('A') + + def test_continuous_start(self): + """Test starting motor for 5mins""" + t = time.time() + (60 * 5) + m = Motor('A') + toggle = 0 + while time.time() < t: + m.start(toggle) + toggle ^= 1 + + def test_continuous_degrees(self): + """Test setting degrees for 5mins""" + t = time.time() + (60 * 5) + m = Motor('A') + toggle = 0 + while time.time() < t: + m.run_for_degrees(toggle) + toggle ^= 1 + + def test_continuous_position(self): + """Test setting position of motor for 5mins""" + t = time.time() + (60 * 5) + m = Motor('A') + toggle = 0 + while time.time() < t: + m.run_to_position(toggle) + toggle ^= 1 + + def test_continuous_feedback(self): + """Test feedback of motor for 30mins""" + Hat(debug=True) + t = time.time() + (60 * 30) + m = Motor('A') + m.start(40) + while time.time() < t: + _ = (m.get_speed(), m.get_position(), m.get_aposition()) + + def test_interval(self): + """Test motor interval""" + m = Motor('A') + m.interval = 10 + count = 1000 + expected_dur = count * m.interval * 1e-3 + start = time.time() + for _ in range(count): + m.get_position() + end = time.time() + diff = abs((end - start) - expected_dur) + self.assertLess(diff, expected_dur * 0.1) + + def test_dual_interval(self): + """Test dual motor interval""" + m1 = Motor('A') + m2 = Motor('B') + for interval in [20, 10]: + m1.interval = interval + m2.interval = interval + count = 1000 + expected_dur = count * m1.interval * 1e-3 + start = time.time() + for _ in range(count): + m1.get_position() + m2.get_position() + end = time.time() + diff = abs((end - start) - expected_dur) + self.assertLess(diff, expected_dur * 0.1) + + if __name__ == '__main__': unittest.main() diff --git a/test/wedo.py b/test/wedo.py new file mode 100644 index 0000000..a558e4e --- /dev/null +++ b/test/wedo.py @@ -0,0 +1,30 @@ +"""Test wedo sensor functionality""" + +import unittest + +from buildhat import MotionSensor, TiltSensor + + +class TestWeDo(unittest.TestCase): + """Test wedo sensor functions""" + + def test_motionsensor(self): + """Test motion sensor""" + motion = MotionSensor('A') + dist = motion.get_distance() + self.assertIsInstance(dist, int) + + def test_tiltsensor(self): + """Test tilt sensor + + ToDo - Test when I re-find this sensor + """ + tilt = TiltSensor('B') + tvalue = tilt.get_tilt() + self.assertIsInstance(tvalue, tuple) + self.assertIsInstance(tvalue[0], int) + self.assertIsInstance(tvalue[1], int) + + +if __name__ == '__main__': + unittest.main()