diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..21c125c --- /dev/null +++ b/.gitattributes @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: 2024 Justin Myers for Adafruit Industries +# +# SPDX-License-Identifier: Unlicense + +.py text eol=lf +.rst text eol=lf +.txt text eol=lf +.yaml text eol=lf +.toml text eol=lf +.license text eol=lf +.md text eol=lf diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 70ade69..ff19dde 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,42 +1,21 @@ -# SPDX-FileCopyrightText: 2020 Diego Elio Pettenò +# SPDX-FileCopyrightText: 2024 Justin Myers for Adafruit Industries # # SPDX-License-Identifier: Unlicense repos: - - repo: https://github.com/python/black - rev: 23.3.0 - hooks: - - id: black - - repo: https://github.com/fsfe/reuse-tool - rev: v1.1.2 - hooks: - - id: reuse - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.5.0 hooks: - id: check-yaml - id: end-of-file-fixer - id: trailing-whitespace - - repo: https://github.com/pycqa/pylint - rev: v2.17.4 + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.3.4 hooks: - - id: pylint - name: pylint (library code) - types: [python] - args: - - --disable=consider-using-f-string - exclude: "^(docs/|examples/|tests/|setup.py$)" - - id: pylint - name: pylint (example code) - description: Run pylint rules on "examples/*.py" files - types: [python] - files: "^examples/" - args: - - --disable=missing-docstring,invalid-name,consider-using-f-string,duplicate-code - - id: pylint - name: pylint (test code) - description: Run pylint rules on "tests/*.py" files - types: [python] - files: "^tests/" - args: - - --disable=missing-docstring,consider-using-f-string,duplicate-code + - id: ruff-format + - id: ruff + args: ["--fix"] + - repo: https://github.com/fsfe/reuse-tool + rev: v3.0.1 + hooks: + - id: reuse diff --git a/.pylintrc b/.pylintrc deleted file mode 100644 index 40208c3..0000000 --- a/.pylintrc +++ /dev/null @@ -1,399 +0,0 @@ -# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries -# -# SPDX-License-Identifier: Unlicense - -[MASTER] - -# A comma-separated list of package or module names from where C extensions may -# be loaded. Extensions are loading into the active Python interpreter and may -# run arbitrary code -extension-pkg-whitelist= - -# Add files or directories to the ignore-list. They should be base names, not -# paths. -ignore=CVS - -# Add files or directories matching the regex patterns to the ignore-list. The -# regex matches against base names, not paths. -ignore-patterns= - -# Python code to execute, usually for sys.path manipulation such as -# pygtk.require(). -#init-hook= - -# Use multiple processes to speed up Pylint. -jobs=1 - -# List of plugins (as comma separated values of python modules names) to load, -# usually to register additional checkers. -load-plugins=pylint.extensions.no_self_use - -# Pickle collected data for later comparisons. -persistent=yes - -# Specify a configuration file. -#rcfile= - -# Allow loading of arbitrary C extensions. Extensions are imported into the -# active Python interpreter and may run arbitrary code. -unsafe-load-any-extension=no - - -[MESSAGES CONTROL] - -# Only show warnings with the listed confidence levels. Leave empty to show -# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED -confidence= - -# Disable the message, report, category or checker with the given id(s). You -# can either give multiple identifiers separated by comma (,) or put this -# option multiple times (only on the command line, not in the configuration -# file where it should appear only once).You can also use "--disable=all" to -# disable everything first and then reenable specific checks. For example, if -# you want to run only the similarities checker, you can use "--disable=all -# --enable=similarities". If you want to run only the classes checker, but have -# no Warning level messages displayed, use"--disable=all --enable=classes -# --disable=W" -# disable=import-error,raw-checker-failed,bad-inline-option,locally-disabled,file-ignored,suppressed-message,useless-suppression,deprecated-pragma,deprecated-str-translate-call -disable=raw-checker-failed,bad-inline-option,locally-disabled,file-ignored,suppressed-message,useless-suppression,deprecated-pragma,import-error,pointless-string-statement,unspecified-encoding - -# Enable the message, report, category or checker with the given id(s). You can -# either give multiple identifier separated by comma (,) or put this option -# multiple time (only on the command line, not in the configuration file where -# it should appear only once). See also the "--disable" option for examples. -enable= - - -[REPORTS] - -# Python expression which should return a note less than 10 (10 is the highest -# note). You have access to the variables errors warning, statement which -# respectively contain the number of errors / warnings messages and the total -# number of statements analyzed. This is used by the global evaluation report -# (RP0004). -evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) - -# Template used to display messages. This is a python new-style format string -# used to format the message information. See doc for all details -#msg-template= - -# Set the output format. Available formats are text, parseable, colorized, json -# and msvs (visual studio).You can also give a reporter class, eg -# mypackage.mymodule.MyReporterClass. -output-format=text - -# Tells whether to display a full report or only the messages -reports=no - -# Activate the evaluation score. -score=yes - - -[REFACTORING] - -# Maximum number of nested blocks for function / method body -max-nested-blocks=5 - - -[LOGGING] - -# Logging modules to check that the string format arguments are in logging -# function parameter format -logging-modules=logging - - -[SPELLING] - -# Spelling dictionary name. Available dictionaries: none. To make it working -# install python-enchant package. -spelling-dict= - -# List of comma separated words that should not be checked. -spelling-ignore-words= - -# A path to a file that contains private dictionary; one word per line. -spelling-private-dict-file= - -# Tells whether to store unknown words to indicated private dictionary in -# --spelling-private-dict-file option instead of raising a message. -spelling-store-unknown-words=no - - -[MISCELLANEOUS] - -# List of note tags to take in consideration, separated by a comma. -# notes=FIXME,XXX,TODO -notes=FIXME,XXX - - -[TYPECHECK] - -# List of decorators that produce context managers, such as -# contextlib.contextmanager. Add to this list to register other decorators that -# produce valid context managers. -contextmanager-decorators=contextlib.contextmanager - -# List of members which are set dynamically and missed by pylint inference -# system, and so shouldn't trigger E1101 when accessed. Python regular -# expressions are accepted. -generated-members= - -# Tells whether missing members accessed in mixin class should be ignored. A -# mixin class is detected if its name ends with "mixin" (case insensitive). -ignore-mixin-members=yes - -# This flag controls whether pylint should warn about no-member and similar -# checks whenever an opaque object is returned when inferring. The inference -# can return multiple potential results while evaluating a Python object, but -# some branches might not be evaluated, which results in partial inference. In -# that case, it might be useful to still emit no-member and other checks for -# the rest of the inferred objects. -ignore-on-opaque-inference=yes - -# List of class names for which member attributes should not be checked (useful -# for classes with dynamically set attributes). This supports the use of -# qualified names. -ignored-classes=optparse.Values,thread._local,_thread._local - -# List of module names for which member attributes should not be checked -# (useful for modules/projects where namespaces are manipulated during runtime -# and thus existing member attributes cannot be deduced by static analysis. It -# supports qualified module names, as well as Unix pattern matching. -ignored-modules=board - -# Show a hint with possible names when a member name was not found. The aspect -# of finding the hint is based on edit distance. -missing-member-hint=yes - -# The minimum edit distance a name should have in order to be considered a -# similar match for a missing member name. -missing-member-hint-distance=1 - -# The total number of similar names that should be taken in consideration when -# showing a hint for a missing member. -missing-member-max-choices=1 - - -[VARIABLES] - -# List of additional names supposed to be defined in builtins. Remember that -# you should avoid to define new builtins when possible. -additional-builtins= - -# Tells whether unused global variables should be treated as a violation. -allow-global-unused-variables=yes - -# List of strings which can identify a callback function by name. A callback -# name must start or end with one of those strings. -callbacks=cb_,_cb - -# A regular expression matching the name of dummy variables (i.e. expectedly -# not used). -dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ - -# Argument names that match this expression will be ignored. Default to name -# with leading underscore -ignored-argument-names=_.*|^ignored_|^unused_ - -# Tells whether we should check for unused import in __init__ files. -init-import=no - -# List of qualified module names which can have objects that can redefine -# builtins. -redefining-builtins-modules=six.moves,future.builtins - - -[FORMAT] - -# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. -# expected-line-ending-format= -expected-line-ending-format=LF - -# Regexp for a line that is allowed to be longer than the limit. -ignore-long-lines=^\s*(# )??$ - -# Number of spaces of indent required inside a hanging or continued line. -indent-after-paren=4 - -# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 -# tab). -indent-string=' ' - -# Maximum number of characters on a single line. -max-line-length=100 - -# Maximum number of lines in a module -max-module-lines=1000 - -# Allow the body of a class to be on the same line as the declaration if body -# contains single statement. -single-line-class-stmt=no - -# Allow the body of an if to be on the same line as the test if there is no -# else. -single-line-if-stmt=no - - -[SIMILARITIES] - -# Ignore comments when computing similarities. -ignore-comments=yes - -# Ignore docstrings when computing similarities. -ignore-docstrings=yes - -# Ignore imports when computing similarities. -ignore-imports=yes - -# Minimum lines number of a similarity. -min-similarity-lines=12 - - -[BASIC] - -# Regular expression matching correct argument names -argument-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - -# Regular expression matching correct attribute names -attr-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - -# Bad variable names which should always be refused, separated by a comma -bad-names=foo,bar,baz,toto,tutu,tata - -# Regular expression matching correct class attribute names -class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ - -# Regular expression matching correct class names -# class-rgx=[A-Z_][a-zA-Z0-9]+$ -class-rgx=[A-Z_][a-zA-Z0-9_]+$ - -# Regular expression matching correct constant names -const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$ - -# Minimum line length for functions/classes that require docstrings, shorter -# ones are exempt. -docstring-min-length=-1 - -# Regular expression matching correct function names -function-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - -# Good variable names which should always be accepted, separated by a comma -# good-names=i,j,k,ex,Run,_ -good-names=r,g,b,w,i,j,k,n,x,y,z,ex,ok,Run,_ - -# Include a hint for the correct naming format with invalid-name -include-naming-hint=no - -# Regular expression matching correct inline iteration names -inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ - -# Regular expression matching correct method names -method-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - -# Regular expression matching correct module names -module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ - -# Colon-delimited sets of names that determine each other's naming style when -# the name regexes allow several styles. -name-group= - -# Regular expression which should only match function or class names that do -# not require a docstring. -no-docstring-rgx=^_ - -# List of decorators that produce properties, such as abc.abstractproperty. Add -# to this list to register other decorators that produce valid properties. -property-classes=abc.abstractproperty - -# Regular expression matching correct variable names -variable-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - - -[IMPORTS] - -# Allow wildcard imports from modules that define __all__. -allow-wildcard-with-all=no - -# Analyse import fallback blocks. This can be used to support both Python 2 and -# 3 compatible code, which means that the block might have code that exists -# only in one or another interpreter, leading to false positives when analysed. -analyse-fallback-blocks=no - -# Deprecated modules which should not be used, separated by a comma -deprecated-modules=optparse,tkinter.tix - -# Create a graph of external dependencies in the given file (report RP0402 must -# not be disabled) -ext-import-graph= - -# Create a graph of every (i.e. internal and external) dependencies in the -# given file (report RP0402 must not be disabled) -import-graph= - -# Create a graph of internal dependencies in the given file (report RP0402 must -# not be disabled) -int-import-graph= - -# Force import order to recognize a module as part of the standard -# compatibility libraries. -known-standard-library= - -# Force import order to recognize a module as part of a third party library. -known-third-party=enchant - - -[CLASSES] - -# List of method names used to declare (i.e. assign) instance attributes. -defining-attr-methods=__init__,__new__,setUp - -# List of member names, which should be excluded from the protected access -# warning. -exclude-protected=_asdict,_fields,_replace,_source,_make - -# List of valid names for the first argument in a class method. -valid-classmethod-first-arg=cls - -# List of valid names for the first argument in a metaclass class method. -valid-metaclass-classmethod-first-arg=mcs - - -[DESIGN] - -# Maximum number of arguments for function / method -max-args=5 - -# Maximum number of attributes for a class (see R0902). -# max-attributes=7 -max-attributes=11 - -# Maximum number of boolean expressions in a if statement -max-bool-expr=5 - -# Maximum number of branch for function / method body -max-branches=12 - -# Maximum number of locals for function / method body -max-locals=15 - -# Maximum number of parents for a class (see R0901). -max-parents=7 - -# Maximum number of public methods for a class (see R0904). -max-public-methods=20 - -# Maximum number of return / yield for function / method body -max-returns=6 - -# Maximum number of statements in function / method body -max-statements=50 - -# Minimum number of public methods for a class (see R0903). -min-public-methods=1 - - -[EXCEPTIONS] - -# Exceptions that will emit a warning when being caught. Defaults to -# "Exception" -overgeneral-exceptions=Exception diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 33c2a61..255dafd 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -8,8 +8,11 @@ # Required version: 2 +sphinx: + configuration: docs/conf.py + build: - os: ubuntu-20.04 + os: ubuntu-lts-latest tools: python: "3" diff --git a/README.rst b/README.rst index 489a996..a679ee3 100644 --- a/README.rst +++ b/README.rst @@ -13,9 +13,9 @@ Introduction :target: https://github.com/adafruit/Adafruit_CircuitPython_ESP_ATcontrol/actions/ :alt: Build Status -.. image:: https://img.shields.io/badge/code%20style-black-000000.svg - :target: https://github.com/psf/black - :alt: Code Style: Black +.. image:: https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json + :target: https://github.com/astral-sh/ruff + :alt: Code Style: Ruff This library is no longer supported! The library is being left available for continued usage, however, Adafruit is no longer supporting it. =========================================================================================================================================== @@ -32,6 +32,8 @@ Dependencies This driver depends on: * `Adafruit CircuitPython `_ +* `Adafruit CircuitPython ConnectionManager `_ +* `Adafruit CircuitPython Requests `_ Please ensure all dependencies are available on the CircuitPython filesystem. This is easily achieved by downloading diff --git a/adafruit_espatcontrol/adafruit_espatcontrol.py b/adafruit_espatcontrol/adafruit_espatcontrol.py index cf8aed4..0bbcd06 100644 --- a/adafruit_espatcontrol/adafruit_espatcontrol.py +++ b/adafruit_espatcontrol/adafruit_espatcontrol.py @@ -34,10 +34,12 @@ import gc import time -from digitalio import Direction, DigitalInOut + +from digitalio import DigitalInOut, Direction try: - from typing import Optional, Dict, Union, List + from typing import Dict, List, Optional, Union + import busio except ImportError: pass @@ -56,7 +58,6 @@ class ESP_ATcontrol: AT command firmware, you can use esptool or our CircuitPython miniesptool to upload firmware""" - # pylint: disable=too-many-public-methods, too-many-instance-attributes MODE_STATION = 1 MODE_SOFTAP = 2 MODE_SOFTAPSTATION = 3 @@ -65,10 +66,16 @@ class ESP_ATcontrol: TYPE_UDP = "UDP" TYPE_SSL = "SSL" TLS_MODE = "SSL" - STATUS_APCONNECTED = 2 - STATUS_SOCKETOPEN = 3 - STATUS_SOCKETCLOSED = 4 - STATUS_NOTCONNECTED = 5 + STATUS_APCONNECTED = 2 # CIPSTATUS method + STATUS_WIFI_APCONNECTED = 2 # CWSTATE method + STATUS_SOCKETOPEN = 3 # CIPSTATUS method + STATUS_SOCKET_OPEN = 3 # CIPSTATE method + STATUS_SOCKETCLOSED = 4 # CIPSTATUS method + STATUS_SOCKET_CLOSED = 4 # CIPSTATE method + STATUS_NOTCONNECTED = 5 # CIPSTATUS method + STATUS_WIFI_NOTCONNECTED = 1 # CWSTATE method + STATUS_WIFI_DISCONNECTED = 4 # CWSTATE method + USER_AGENT = "esp-idf/1.0 esp32" def __init__( @@ -79,7 +86,8 @@ def __init__( run_baudrate: Optional[int] = None, rts_pin: Optional[DigitalInOut] = None, reset_pin: Optional[DigitalInOut] = None, - debug: bool = False + debug: bool = False, + use_cipstatus: bool = False, ): """This function doesn't try to do any sync'ing, just sets up # the hardware, that way nothing can unexpectedly fail!""" @@ -106,6 +114,7 @@ def __init__( self._ifconfig = [] self._initialized = False self._conntype = None + self._use_cipstatus = use_cipstatus def begin(self) -> None: """Initialize the module by syncing, resetting if necessary, setting up @@ -130,6 +139,15 @@ def begin(self) -> None: except OKError: # ESP32 doesnt use CIPSSLSIZE, its ok! self.at_response("AT+CIPSSLCCONF?") + + try: + self.at_response("AT+CWSTATE?", retries=1, timeout=3) + except OKError: + # ESP8285's use CIPSTATUS and have no CWSTATE or CWIPSTATUS functions + self._use_cipstatus = True + if self._debug: + print("No CWSTATE support, using CIPSTATUS, it's ok!") + self._initialized = True return except OKError: @@ -146,7 +164,7 @@ def connect( try: if not self._initialized: self.begin() - AP = self.remote_AP # pylint: disable=invalid-name + AP = self.remote_AP if AP[0] != secrets["ssid"]: self.join_AP( secrets["ssid"], @@ -169,6 +187,59 @@ def connect( print("Failed to connect\n", exp) raise + def connect_enterprise( + self, secrets: Dict[str, Union[str, int]], timeout: int = 15, retries: int = 3 + ) -> None: + """Repeatedly try to connect to an enterprise access point with the details in + the passed in 'secrets' dictionary. Be sure 'ssid','password','username','identity' + and 'method' are defined in the secrets dict! If 'timezone' is set, we'll also + configure SNTP""" + # Connect to WiFi if not already + retries = 3 + if self._debug: + print("In connect_enterprise()") + while True: + try: + if not self._initialized or retries == 0: + self.begin() + retries = 3 + AP = self.remote_AP + if AP[0] is not None: + print("Connected to", AP[0]) + if AP[0] != secrets["ssid"]: + if self._debug: + print("Doing Enterprise connection sequence") + self.join_AP_Enterprise( + secrets["ssid"], + secrets["username"], + secrets["identity"], + secrets["password"], + secrets["method"], + timeout=timeout, + retries=retries, + ) + if "timezone" in secrets: + tzone = secrets["timezone"] + ntp = None + if "ntp_server" in secrets: + ntp = secrets["ntp_server"] + self.sntp_config(True, tzone, ntp) + print("Connected to", self.remote_AP[0]) + print("My IP Address:", self.local_ip) + return # yay! + except (RuntimeError, OKError) as exp: + print("Failed to connect, retrying\n", exp) + retries -= 1 + continue + + def set_autoconnect(self, autoconnect: bool) -> None: + """Set the auto connection status if the wifi connects automatically on powerup""" + if autoconnect is True: + auto_flag = "1" + else: + auto_flag = "0" + self.at_response("AT+CWAUTOCONN=" + auto_flag) + # *************************** SOCKET SETUP **************************** @property @@ -180,14 +251,14 @@ def cipmux(self) -> int: return int(reply[8:]) raise RuntimeError("Bad response to CIPMUX?") - def socket_connect( # pylint: disable=too-many-branches + def socket_connect( self, conntype: str, remote: str, remote_port: int, *, keepalive: int = 10, - retries: int = 1 + retries: int = 1, ) -> bool: """Open a socket. conntype can be TYPE_TCP, TYPE_UDP, or TYPE_SSL. Remote can be an IP address or DNS (we'll do the lookup for you. Remote port @@ -222,13 +293,13 @@ def socket_connect( # pylint: disable=too-many-branches self.socket_disconnect() while True: stat = self.status - if stat in (self.STATUS_APCONNECTED, self.STATUS_SOCKETCLOSED): + if stat in {self.STATUS_APCONNECTED, self.STATUS_SOCKETCLOSED}: break if stat == self.STATUS_SOCKETOPEN: self.socket_disconnect() else: time.sleep(1) - if not conntype in (self.TYPE_TCP, self.TYPE_UDP, self.TYPE_SSL): + if conntype not in {self.TYPE_TCP, self.TYPE_UDP, self.TYPE_SSL}: raise RuntimeError("Connection type must be TCP, UDL or SSL") cmd = ( 'AT+CIPSTART="' @@ -240,15 +311,18 @@ def socket_connect( # pylint: disable=too-many-branches + "," + str(keepalive) ) + if self._debug is True: + print("socket_connect(): Going to send command") replies = self.at_response(cmd, timeout=10, retries=retries).split(b"\r\n") for reply in replies: if reply == b"CONNECT" and ( - conntype in (self.TYPE_TCP, self.TYPE_SSL) + conntype in {self.TYPE_TCP, self.TYPE_SSL} and self.status == self.STATUS_SOCKETOPEN or conntype == self.TYPE_UDP ): self._conntype = conntype return True + return False def socket_send(self, buffer: bytes, timeout: int = 1) -> bool: @@ -287,7 +361,6 @@ def socket_send(self, buffer: bytes, timeout: int = 1) -> bool: return True def socket_receive(self, timeout: int = 5) -> bytearray: - # pylint: disable=too-many-nested-blocks, too-many-branches """Check for incoming data over the open socket, returns bytes""" incoming_bytes = None bundle = [] @@ -308,22 +381,17 @@ def socket_receive(self, timeout: int = 5) -> bytearray: continue i += 1 # look for the IPD message - if (ipd_start in self._ipdpacket) and chr( - self._ipdpacket[i - 1] - ) == ":": + if (ipd_start in self._ipdpacket) and chr(self._ipdpacket[i - 1]) == ":": try: ipd = str(self._ipdpacket[5 : i - 1], "utf-8") incoming_bytes = int(ipd) if self._debug: print("Receiving:", incoming_bytes) except ValueError as err: - raise RuntimeError( - "Parsing error during receive", ipd - ) from err + raise RuntimeError("Parsing error during receive", ipd) from err i = 0 # reset the input buffer now that we know the size elif i > 20: i = 0 # Hmm we somehow didnt get a proper +IPD packet? start over - else: self.hw_flow(False) # stop the flow # read as much as we can! @@ -399,25 +467,110 @@ def is_connected(self) -> bool: self.echo(False) self.baudrate = self.baudrate stat = self.status - if stat in ( + if stat in { self.STATUS_APCONNECTED, self.STATUS_SOCKETOPEN, self.STATUS_SOCKETCLOSED, - ): + }: + if self._debug: + print("is_connected(): status says connected") return True except (OKError, RuntimeError): pass + if self._debug: + print("is_connected(): status says not connected") return False @property def status(self) -> Union[int, None]: """The IP connection status number (see AT+CIPSTATUS datasheet for meaning)""" - replies = self.at_response("AT+CIPSTATUS", timeout=5).split(b"\r\n") + if self._use_cipstatus: + replies = self.at_response("AT+CIPSTATUS", timeout=5).split(b"\r\n") + for reply in replies: + if reply.startswith(b"STATUS:"): + if self._debug: + print(f"CIPSTATUS state is {int(reply[7:8])}") + return int(reply[7:8]) + else: + status_w = self.status_wifi + status_s = self.status_socket + + # debug only, Check CIPSTATUS messages against CWSTATE/CIPSTATE + if self._debug: + replies = self.at_response("AT+CIPSTATUS", timeout=5).split(b"\r\n") + for reply in replies: + if reply.startswith(b"STATUS:"): + cipstatus = int(reply[7:8]) + print(f"STATUS: CWSTATE: {status_w}, CIPSTATUS: {cipstatus}, CIPSTATE: {status_s}") + + # Produce a cipstatus-compatible status code + # Codes are not the same between CWSTATE/CIPSTATUS so in some combinations + # we just pick what we hope is best. + if status_w in { + self.STATUS_WIFI_NOTCONNECTED, + self.STATUS_WIFI_DISCONNECTED, + }: + if self._debug: + print(f"STATUS returning {self.STATUS_NOTCONNECTED}") + return self.STATUS_NOTCONNECTED + + if status_s == self.STATUS_SOCKET_OPEN: + if self._debug: + print(f"STATUS returning {self.STATUS_SOCKETOPEN}") + return self.STATUS_SOCKETOPEN + + if status_w == self.STATUS_WIFI_APCONNECTED: + if self._debug: + print(f"STATUS returning {self.STATUS_APCONNECTED}") + return self.STATUS_APCONNECTED + + # handle extra codes from CWSTATE + if status_w == 0: # station has not started any Wi-Fi connection. + if self._debug: + print("STATUS returning 1") + return 1 # this cipstatus had no previous handler variable + + if ( + status_w == 1 + ): # station has connected to an AP, but does not get an IPv4 address yet. + if self._debug: + print("STATUS returning 1") + return 1 # this cipstatus had no previous handler variable + + if status_w == 3: # station is in Wi-Fi connecting or reconnecting state. + if self._debug: + print(f"STATUS returning {self.STATUS_NOTCONNECTED}") + return self.STATUS_NOTCONNECTED + + if status_s == self.STATUS_SOCKET_CLOSED: + if self._debug: + print(f"STATUS returning {self.STATUS_SOCKET_CLOSED}") + return self.STATUS_SOCKET_CLOSED + + return None + + @property + def status_wifi(self) -> Union[int, None]: + """The WIFI connection status number (see AT+CWSTATE datasheet for meaning)""" + replies = self.at_response("AT+CWSTATE?", timeout=5).split(b"\r\n") for reply in replies: - if reply.startswith(b"STATUS:"): - return int(reply[7:8]) + if reply.startswith(b"+CWSTATE:"): + state_info = reply.split(b",") + if self._debug: + print(f"State reply is {reply}, state_info[1] is {int(state_info[0][9:10])}") + return int(state_info[0][9:10]) return None + @property + def status_socket(self) -> Union[int, None]: + """The Socket connection status number (see AT+CIPSTATE for meaning)""" + replies = self.at_response("AT+CIPSTATE?", timeout=5).split(b"\r\n") + for reply in replies: + # If there are any +CIPSTATE lines that means it's an open socket + if reply.startswith(b"+CIPSTATE:"): + return self.STATUS_SOCKET_OPEN + return self.STATUS_SOCKET_CLOSED + @property def mode(self) -> Union[int, None]: """What mode we're in, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION""" @@ -434,7 +587,7 @@ def mode(self, mode: int) -> None: """Station or AP mode selection, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION""" if not self._initialized: self.begin() - if not mode in (1, 2, 3): + if mode not in {1, 2, 3}: raise RuntimeError("Invalid Mode") self.at_response("AT+CWMODE=%d" % mode, timeout=3) @@ -481,7 +634,7 @@ def nslookup(self, host: str) -> Union[str, None]: # *************************** AP SETUP **************************** @property - def remote_AP(self) -> List[Union[int, str, None]]: # pylint: disable=invalid-name + def remote_AP(self) -> List[Union[int, str, None]]: """The name of the access point we're connected to, as a string""" stat = self.status if stat != self.STATUS_APCONNECTED: @@ -500,12 +653,12 @@ def remote_AP(self) -> List[Union[int, str, None]]: # pylint: disable=invalid-n return reply return [None] * 4 - def join_AP( # pylint: disable=invalid-name - self, ssid: str, password: str, timeout: int = 15, retries: int = 3 - ) -> None: + def join_AP(self, ssid: str, password: str, timeout: int = 15, retries: int = 3) -> None: """Try to join an access point by name and password, will return immediately if we're already connected and won't try to reconnect""" # First make sure we're in 'station' mode so we can connect to AP's + if self._debug: + print("In join_AP()") if self.mode != self.MODE_STATION: self.mode = self.MODE_STATION @@ -525,9 +678,95 @@ def join_AP( # pylint: disable=invalid-name raise RuntimeError("Didn't get IP address") return - def scan_APs( # pylint: disable=invalid-name - self, retries: int = 3 - ) -> Union[List[List[bytes]], None]: + def join_AP_Enterprise( + self, + ssid: str, + username: str, + identity: str, + password: str, + method: int, + timeout: int = 30, + retries: int = 3, + ) -> None: + """Try to join an Enterprise access point by name and password, will return + immediately if we're already connected and won't try to reconnect""" + # Not sure how to verify certificates so we set that to not verify. + certificate_security = 0 # Bit0: Client certificate.Bit1: Server certificate. + + if self._debug: + print("In join_AP_Enterprise()") + if self.mode != self.MODE_STATION: + self.mode = self.MODE_STATION + + router = self.remote_AP + if router and router[0] == ssid: + return # we're already connected! + reply = self.at_response( + 'AT+CWJEAP="' + + ssid + + '",' + + str(method) + + ',"' + + identity + + '","' + + username + + '","' + + password + + '",' + + str(certificate_security), + timeout=timeout, + retries=retries, + ) + if b"WIFI CONNECTED" not in reply: + print("no CONNECTED") + raise RuntimeError("Couldn't connect to Enterprise WiFi") + if b"WIFI GOT IP" not in reply: + print("no IP") + raise RuntimeError("Didn't get IP address") + return + + def disconnect(self, timeout: int = 5, retries: int = 3): + """Disconnect from the AP. Tries whether connected or not.""" + # If we're not connected we likely don't get a "WIFI DISCONNECT" and just get the OK + # Note it still tries to disconnect even if it says we're not connected. + if not self._initialized: + self.begin() + stat = self.status + if stat in { + self.STATUS_APCONNECTED, + self.STATUS_SOCKETOPEN, + self.STATUS_SOCKETCLOSED, + }: + wait_for_disconnect = True + else: + wait_for_disconnect = False + if self._debug is True: + print("disconnect(): Not connected, not waiting for disconnect message") + reply = self.at_response("AT+CWQAP", timeout=timeout, retries=retries) + # Don't bother waiting for disconnect message if we weren't connected already + # sometimes the "WIFI DISCONNECT" shows up in the reply and sometimes it doesn't. + if wait_for_disconnect is True: + if b"WIFI DISCONNECT" in reply: + if self._debug is True: + print(f"disconnect(): Got WIFI DISCONNECT: {reply}") + else: + stamp = time.monotonic() + response = b"" + while (time.monotonic() - stamp) < timeout: + if self._uart.in_waiting: + response += self._uart.read(1) + self.hw_flow(False) + if response[-15:] == b"WIFI DISCONNECT": + break + else: + self.hw_flow(True) + if self._debug: + if response[-15:] == b"WIFI DISCONNECT": + print(f"disconnect(): Got WIFI DISCONNECT: {response}") + else: + print(f"disconnect(): Timed out wating for WIFI DISCONNECT: {response}") + + def scan_APs(self, retries: int = 3) -> Union[List[List[bytes]], None]: """Ask the module to scan for access points and return a list of lists with name, RSSI, MAC addresses, etc""" for _ in range(retries): @@ -580,7 +819,6 @@ def at_response(self, at_cmd: str, timeout: int = 5, retries: int = 3) -> bytes: and then cut out the reply lines to return. We can set a variable timeout (how long we'll wait for response) and how many times to retry before giving up""" - # pylint: disable=too-many-branches for _ in range(retries): self.hw_flow(True) # allow any remaning data to stream in time.sleep(0.1) # wait for uart data @@ -600,12 +838,11 @@ def at_response(self, at_cmd: str, timeout: int = 5, retries: int = 3) -> bytes: break if response[-7:] == b"ERROR\r\n": break - if "AT+CWJAP=" in at_cmd: + if "AT+CWJAP=" in at_cmd or "AT+CWJEAP=" in at_cmd: if b"WIFI GOT IP\r\n" in response: break - else: - if b"WIFI CONNECTED\r\n" in response: - break + elif b"WIFI CONNECTED\r\n" in response: + break if b"ERR CODE:" in response: break else: @@ -616,15 +853,16 @@ def at_response(self, at_cmd: str, timeout: int = 5, retries: int = 3) -> bytes: # special case, AT+CWJAP= does not return an ok :P if "AT+CWJAP=" in at_cmd and b"WIFI GOT IP\r\n" in response: return response + # special case, AT+CWJEAP= does not return an ok :P + if "AT+CWJEAP=" in at_cmd and b"WIFI GOT IP\r\n" in response: + return response + if "AT+CWQAP=" in at_cmd and b"WIFI DISCONNECT" in response: + return response # special case, ping also does not return an OK if "AT+PING" in at_cmd and b"ERROR\r\n" in response: return response # special case, does return OK but in fact it is busy - if ( - "AT+CIFSR" in at_cmd - and b"busy" in response - or response[-4:] != b"OK\r\n" - ): + if "AT+CIFSR" in at_cmd and b"busy" in response or response[-4:] != b"OK\r\n": time.sleep(1) continue return response[:-4] @@ -671,16 +909,32 @@ def echo(self, echo: bool) -> None: else: self.at_response("ATE0", timeout=1) - def soft_reset(self) -> bool: + def soft_reset(self, timeout: int = 5) -> bool: """Perform a software reset by AT command. Returns True if we successfully performed, false if failed to reset""" try: self._uart.reset_input_buffer() reply = self.at_response("AT+RST", timeout=1) - if reply.strip(b"\r\n") == b"AT+RST": - time.sleep(2) - self._uart.reset_input_buffer() - return True + if self._debug: + print(f"Resetting with AT+RST, reply was {reply}") + stamp = time.monotonic() + response = b"" + while (time.monotonic() - stamp) < timeout: + if self._uart.in_waiting: + response += self._uart.read(1) + self.hw_flow(False) + if response[-5:] == b"ready": + break + else: + self.hw_flow(True) + if self._debug: + if response[-5:] == b"ready": + print(f"soft_reset(): Got ready: {response}") + else: + print(f"soft_reset(): imed out waiting for ready: {response}") + self._uart.reset_input_buffer() + self.sync() + return True except OKError: pass # fail, see below return False diff --git a/adafruit_espatcontrol/adafruit_espatcontrol_socket.py b/adafruit_espatcontrol/adafruit_espatcontrol_socket.py index e8a1c8b..6dd4339 100644 --- a/adafruit_espatcontrol/adafruit_espatcontrol_socket.py +++ b/adafruit_espatcontrol/adafruit_espatcontrol_socket.py @@ -3,20 +3,22 @@ # SPDX-License-Identifier: MIT """A 'socket' compatible interface thru the ESP AT command set""" + from micropython import const try: - from typing import Optional, Tuple, List + from typing import List, Optional, Tuple + from .adafruit_espatcontrol import ESP_ATcontrol except ImportError: pass -_the_interface = None # pylint: disable=invalid-name +_the_interface = None def set_interface(iface: ESP_ATcontrol) -> None: """Helper to set the global internet interface""" - global _the_interface # pylint: disable=global-statement, invalid-name + global _the_interface # noqa: PLW0603 _the_interface = iface @@ -24,7 +26,6 @@ def set_interface(iface: ESP_ATcontrol) -> None: AF_INET = const(2) -# pylint: disable=too-many-arguments, unused-argument def getaddrinfo( host: str, port: int, @@ -41,10 +42,6 @@ def getaddrinfo( return [(AF_INET, socktype, proto, "", (ipaddr, port))] -# pylint: enable=too-many-arguments, unused-argument - - -# pylint: disable=unused-argument, redefined-builtin, invalid-name class socket: """A simplified implementation of the Python 'socket' class, for connecting through an interface to a remote device""" @@ -69,13 +66,11 @@ def connect(self, address: Tuple[str, int], conntype: Optional[str] = None) -> N """ host, port = address - if not _the_interface.socket_connect( - conntype, host, port, keepalive=10, retries=3 - ): + if not _the_interface.socket_connect(conntype, host, port, keepalive=10, retries=3): raise RuntimeError("Failed to connect to host", host) self._buffer = b"" - def send(self, data: bytes) -> None: # pylint: disable=no-self-use + def send(self, data: bytes) -> None: # noqa: PLR6301 """Send some data to the socket""" _the_interface.socket_send(data) @@ -97,9 +92,7 @@ def recv(self, num: int = 0) -> bytes: self._buffer = b"" else: if self._buffer == b"": - self._buffer = self._buffer + _the_interface.socket_receive( - timeout=self._timeout - ) + self._buffer = self._buffer + _the_interface.socket_receive(timeout=self._timeout) ret = self._buffer[:num] self._buffer = self._buffer[num:] return ret @@ -107,14 +100,9 @@ def recv(self, num: int = 0) -> bytes: def close(self) -> None: """Close the socket, after reading whatever remains""" # read whatever's left - self._buffer = self._buffer + _the_interface.socket_receive( - timeout=self._timeout - ) + self._buffer = self._buffer + _the_interface.socket_receive(timeout=self._timeout) _the_interface.socket_disconnect() def settimeout(self, value: int) -> None: """Set the read timeout for sockets, if value is 0 it will block""" self._timeout = value - - -# pylint: enable=unused-argument, redefined-builtin, invalid-name diff --git a/adafruit_espatcontrol/adafruit_espatcontrol_wifimanager.py b/adafruit_espatcontrol/adafruit_espatcontrol_wifimanager.py index 43e17cb..5136b37 100755 --- a/adafruit_espatcontrol/adafruit_espatcontrol_wifimanager.py +++ b/adafruit_espatcontrol/adafruit_espatcontrol_wifimanager.py @@ -11,14 +11,15 @@ * Author(s): Melissa LeBlanc-Williams, ladyada, Jerry Needell """ -# pylint: disable=no-name-in-module +import adafruit_connection_manager +import adafruit_requests -import adafruit_requests as requests -import adafruit_espatcontrol.adafruit_espatcontrol_socket as socket +import adafruit_espatcontrol.adafruit_espatcontrol_socket as pool from adafruit_espatcontrol.adafruit_espatcontrol import ESP_ATcontrol try: - from typing import Dict, Any, Optional, Union, Tuple + from typing import Any, Dict, Optional, Tuple, Union + from circuitpython_typing.led import FillBasedLED except ImportError: pass @@ -35,6 +36,8 @@ def __init__( secrets: Dict[str, Union[str, int]], status_pixel: Optional[FillBasedLED] = None, attempts: int = 2, + enterprise: bool = False, + debug: bool = False, ): """ :param ESP_SPIcontrol esp: The ESP object we are using @@ -42,23 +45,34 @@ def __init__( :param status_pixel: (Optional) The pixel device - A NeoPixel or DotStar (default=None) :type status_pixel: NeoPixel or DotStar :param int attempts: (Optional) Unused, only for compatibility for old code + :param bool enterprise: (Optional) If True, try to connect to Enterprise AP + :param bool debug: (Optional) Print debug messages during operation """ # Read the settings self._esp = esp - self.debug = False + self.debug = debug self.secrets = secrets self.attempts = attempts - requests.set_socket(socket, esp) self.statuspix = status_pixel self.pixel_status(0) + self.enterprise = enterprise + + # create requests session + ssl_context = adafruit_connection_manager.create_fake_ssl_context(pool, self._esp) + self._requests = adafruit_requests.Session(pool, ssl_context) - def reset(self) -> None: + def reset(self, hard_reset: bool = True, soft_reset: bool = False) -> None: """ Perform a hard reset on the ESP """ + self.pixel_status((100, 100, 100)) if self.debug: print("Resetting ESP") - self._esp.hard_reset() + if hard_reset is True: + self._esp.hard_reset() + if soft_reset is True: + self._esp.soft_reset() + self.pixel_status(0) def connect(self, timeout: int = 15, retries: int = 3) -> None: """ @@ -68,7 +82,10 @@ def connect(self, timeout: int = 15, retries: int = 3) -> None: if self.debug: print("Connecting to AP...") self.pixel_status((100, 0, 0)) - self._esp.connect(self.secrets, timeout=timeout, retries=retries) + if self.enterprise is False: + self._esp.connect(self.secrets, timeout=timeout, retries=retries) + else: + self._esp.connect_enterprise(self.secrets, timeout=timeout, retries=retries) self.pixel_status((0, 100, 0)) except (ValueError, RuntimeError) as error: print("Failed to connect\n", error) @@ -77,12 +94,16 @@ def connect(self, timeout: int = 15, retries: int = 3) -> None: def set_conntype(self, url: str) -> None: """set the connection-type according to protocol""" self._esp.conntype = ( - ESP_ATcontrol.TYPE_SSL - if url.startswith("https") - else ESP_ATcontrol.TYPE_TCP + ESP_ATcontrol.TYPE_SSL if url.startswith("https") else ESP_ATcontrol.TYPE_TCP ) - def get(self, url: str, **kw: Any) -> requests.Response: + def disconnect(self) -> None: + """ + Disconnect the Wifi from the AP if any + """ + self._esp.disconnect() + + def get(self, url: str, **kw: Any) -> adafruit_requests.Response: """ Pass the Get request to requests and update Status NeoPixel @@ -98,11 +119,11 @@ def get(self, url: str, **kw: Any) -> requests.Response: self.connect() self.pixel_status((0, 0, 100)) self.set_conntype(url) - return_val = requests.get(url, **kw) + return_val = self._requests.get(url, **kw) self.pixel_status(0) return return_val - def post(self, url: str, **kw: Any) -> requests.Response: + def post(self, url: str, **kw: Any) -> adafruit_requests.Response: """ Pass the Post request to requests and update Status NeoPixel @@ -114,14 +135,20 @@ def post(self, url: str, **kw: Any) -> requests.Response: :return: The response from the request :rtype: Response """ + if self.debug: + print("in post()") if not self._esp.is_connected: + if self.debug: + print("post(): not connected, trying to connect") self.connect() self.pixel_status((0, 0, 100)) self.set_conntype(url) - return_val = requests.post(url, **kw) + return_val = self._requests.post(url, **kw) + self.pixel_status(0) + return return_val - def put(self, url: str, **kw: Any) -> requests.Response: + def put(self, url: str, **kw: Any) -> adafruit_requests.Response: """ Pass the put request to requests and update Status NeoPixel @@ -137,11 +164,11 @@ def put(self, url: str, **kw: Any) -> requests.Response: self.connect() self.pixel_status((0, 0, 100)) self.set_conntype(url) - return_val = requests.put(url, **kw) + return_val = self._requests.put(url, **kw) self.pixel_status(0) return return_val - def patch(self, url: str, **kw: Any) -> requests.Response: + def patch(self, url: str, **kw: Any) -> adafruit_requests.Response: """ Pass the patch request to requests and update Status NeoPixel @@ -157,11 +184,11 @@ def patch(self, url: str, **kw: Any) -> requests.Response: self.connect() self.pixel_status((0, 0, 100)) self.set_conntype(url) - return_val = requests.patch(url, **kw) + return_val = self._requests.patch(url, **kw) self.pixel_status(0) return return_val - def delete(self, url: str, **kw: Any) -> requests.Response: + def delete(self, url: str, **kw: Any) -> adafruit_requests.Response: """ Pass the delete request to requests and update Status NeoPixel @@ -177,7 +204,7 @@ def delete(self, url: str, **kw: Any) -> requests.Response: self.connect() self.pixel_status((0, 0, 100)) self.set_conntype(url) - return_val = requests.delete(url, **kw) + return_val = self._requests.delete(url, **kw) self.pixel_status(0) return return_val diff --git a/docs/api.rst b/docs/api.rst index 0698504..e9c5c09 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -4,6 +4,9 @@ .. If your library file(s) are nested in a directory (e.g. /adafruit_foo/foo.py) .. use this format as the module name: "adafruit_foo.foo" +API Reference +############# + .. automodule:: adafruit_espatcontrol.adafruit_espatcontrol :members: diff --git a/docs/conf.py b/docs/conf.py index f7b483f..8eb0876 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,12 +1,10 @@ -# -*- coding: utf-8 -*- - # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # # SPDX-License-Identifier: MIT +import datetime import os import sys -import datetime sys.path.insert(0, os.path.abspath("..")) @@ -17,6 +15,7 @@ # ones. extensions = [ "sphinx.ext.autodoc", + "sphinxcontrib.jquery", "sphinx.ext.intersphinx", "sphinx.ext.napoleon", "sphinx.ext.todo", @@ -47,9 +46,7 @@ creation_year = "2018" current_year = str(datetime.datetime.now().year) year_duration = ( - current_year - if current_year == creation_year - else creation_year + " - " + current_year + current_year if current_year == creation_year else creation_year + " - " + current_year ) copyright = year_duration + " ladyada" author = "ladyada" @@ -100,19 +97,9 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -on_rtd = os.environ.get("READTHEDOCS", None) == "True" - -if not on_rtd: # only import and set the theme if we're building docs locally - try: - import sphinx_rtd_theme - - html_theme = "sphinx_rtd_theme" - html_theme_path = [sphinx_rtd_theme.get_html_theme_path(), "."] - except: - html_theme = "default" - html_theme_path = ["."] -else: - html_theme_path = ["."] +import sphinx_rtd_theme + +html_theme = "sphinx_rtd_theme" # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, diff --git a/docs/requirements.txt b/docs/requirements.txt index 88e6733..979f568 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -2,4 +2,6 @@ # # SPDX-License-Identifier: Unlicense -sphinx>=4.0.0 +sphinx +sphinxcontrib-jquery +sphinx-rtd-theme diff --git a/examples/esp_atcontrol_AIO_no_wifimanager-enterprise.py b/examples/esp_atcontrol_AIO_no_wifimanager-enterprise.py new file mode 100644 index 0000000..4d4ac9c --- /dev/null +++ b/examples/esp_atcontrol_AIO_no_wifimanager-enterprise.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +# Note, you must create a feed called "test" in your AdafruitIO account. +# Your secrets file must contain your aio_username and aio_key + +import time + +import adafruit_connection_manager +import adafruit_requests +import board +import busio +from digitalio import DigitalInOut, Direction + +import adafruit_espatcontrol.adafruit_espatcontrol_socket as pool + +# ESP32 AT +from adafruit_espatcontrol import adafruit_espatcontrol + +# Get wifi details and more from a secrets.py file +try: + from secrets import secrets +except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + +# Debug Level +# Change the Debug Flag if you have issues with AT commands +debugflag = False + +if board.board_id == "challenger_rp2040_wifi": + RX = board.ESP_RX + TX = board.ESP_TX + resetpin = DigitalInOut(board.WIFI_RESET) + rtspin = False + uart = busio.UART(TX, RX, baudrate=11520, receiver_buffer_size=2048) + esp_boot = DigitalInOut(board.WIFI_MODE) + esp_boot.direction = Direction.OUTPUT + esp_boot.value = True + status_light = None +else: + RX = board.TX + TX = board.RX + resetpin = DigitalInOut(board.D4) + rtspin = DigitalInOut(board.D5) + uart = busio.UART(board.TX, board.RX, baudrate=11520, timeout=0.1, receiver_buffer_size=512) + esp_boot = DigitalInOut(board.D9) + esp_boot.direction = Direction.OUTPUT + esp_boot.value = True + status_light = None + +print("ESP AT commands") +esp = adafruit_espatcontrol.ESP_ATcontrol( + uart, 115200, reset_pin=resetpin, rts_pin=rtspin, debug=debugflag +) + +ssl_context = adafruit_connection_manager.create_fake_ssl_context(pool, esp) +requests = adafruit_requests.Session(pool, ssl_context) + +counter = 0 + +while True: + try: + while not esp.is_connected: + print("Connecting...") + esp.connect_enterprise(secrets) + print("Posting data...", end="") + data = counter + feed = "example" + payload = {"value": data} + response = requests.post( + "https://io.adafruit.com/api/v2/" + + secrets["aio_username"] + + "/feeds/" + + feed + + "/data", + json=payload, + headers={"X-AIO-KEY": secrets["aio_key"]}, + ) + print(response.json()) + response.close() + counter = counter + 1 + print("OK") + except (ValueError, RuntimeError, adafruit_espatcontrol.OKError) as e: + print("Failed to get data, retrying\n", e) + esp.soft_reset() + continue + response = None + time.sleep(15) diff --git a/examples/esp_atcontrol_AIO_wifimanager_enterprise.py b/examples/esp_atcontrol_AIO_wifimanager_enterprise.py new file mode 100644 index 0000000..08f48d1 --- /dev/null +++ b/examples/esp_atcontrol_AIO_wifimanager_enterprise.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +# Note, you must create a feed called "test" in your AdafruitIO account. +# Your secrets file must contain your aio_username and aio_key + +import time + +import board +import busio +import neopixel +from digitalio import DigitalInOut, Direction + +# ESP32 AT +from adafruit_espatcontrol import ( + adafruit_espatcontrol, + adafruit_espatcontrol_wifimanager, +) + +# Get wifi details and more from a secrets.py file +try: + from secrets import secrets +except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + +pixel_status = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.3, auto_write=True) + +# Debug Level +# Change the Debug Flag if you have issues with AT commands +debugflag = False + +if board.board_id == "challenger_rp2040_wifi": + RX = board.ESP_RX + TX = board.ESP_TX + resetpin = DigitalInOut(board.WIFI_RESET) + rtspin = False + uart = busio.UART(TX, RX, baudrate=11520, receiver_buffer_size=2048) + esp_boot = DigitalInOut(board.WIFI_MODE) + esp_boot.direction = Direction.OUTPUT + esp_boot.value = True + status_light = None +else: + RX = board.TX + TX = board.RX + resetpin = DigitalInOut(board.D4) + rtspin = DigitalInOut(board.D5) + uart = busio.UART(board.TX, board.RX, timeout=0.1) + esp_boot = DigitalInOut(board.D9) + esp_boot.direction = Direction.OUTPUT + esp_boot.value = True + status_light = pixel_status + +print("ESP AT commands") +esp = adafruit_espatcontrol.ESP_ATcontrol( + uart, 115200, reset_pin=resetpin, rts_pin=rtspin, debug=debugflag +) +wifi = adafruit_espatcontrol_wifimanager.ESPAT_WiFiManager( + esp, secrets, status_light, enterprise=True, debug=debugflag +) +wifi.disconnect() +wifi.reset(soft_reset=True) + +counter = 0 + +while True: + try: + print("Posting data...", end="") + data = counter + feed = "example" + payload = {"value": data} + response = wifi.post( + "https://io.adafruit.com/api/v2/" + + secrets["aio_username"] + + "/feeds/" + + feed + + "/data", + json=payload, + headers={"X-AIO-KEY": secrets["aio_key"]}, + ) + print(response.json()) + response.close() + + counter = counter + 1 + print("OK") + wifi.disconnect() + + except (ValueError, RuntimeError, adafruit_espatcontrol.OKError) as e: + print("Failed to get data, retrying\n", e) + wifi.reset(soft_reset=True) + continue + response = None + time.sleep(15) diff --git a/examples/esp_atcontrol_aio_post.py b/examples/esp_atcontrol_aio_post.py index 27bbd9c..a349913 100644 --- a/examples/esp_atcontrol_aio_post.py +++ b/examples/esp_atcontrol_aio_post.py @@ -5,10 +5,10 @@ # Your secrets file must contain your aio_username and aio_key import time + import board import busio -from digitalio import DigitalInOut -from digitalio import Direction +from digitalio import DigitalInOut, Direction # ESP32 AT from adafruit_espatcontrol import ( @@ -16,7 +16,6 @@ adafruit_espatcontrol_wifimanager, ) - # Get wifi details and more from a secrets.py file try: from secrets import secrets diff --git a/examples/esp_atcontrol_cheerlights.py b/examples/esp_atcontrol_cheerlights.py index d123176..456bb07 100644 --- a/examples/esp_atcontrol_cheerlights.py +++ b/examples/esp_atcontrol_cheerlights.py @@ -2,13 +2,12 @@ # SPDX-License-Identifier: MIT import time + +import adafruit_fancyled.adafruit_fancyled as fancy import board import busio -from digitalio import DigitalInOut -from digitalio import Direction - import neopixel -import adafruit_fancyled.adafruit_fancyled as fancy +from digitalio import DigitalInOut, Direction # ESP32 SPI from adafruit_espatcontrol import ( @@ -16,7 +15,6 @@ adafruit_espatcontrol_wifimanager, ) - # Get wifi details and more from a secrets.py file try: from secrets import secrets @@ -94,11 +92,7 @@ green = color >> 8 & 0xFF blue = color & 0xFF gamma_corrected = fancy.gamma_adjust(fancy.CRGB(red, green, blue)).pack() - print( - "Setting LED To: G:{0},R:{1},B:{2},Gamma:{3}".format( - green, red, blue, gamma_corrected - ) - ) + print(f"Setting LED To: G:{green},R:{red},B:{blue},Gamma:{gamma_corrected}") pixels.fill(gamma_corrected) last_value = value response = None diff --git a/examples/esp_atcontrol_countviewer.py b/examples/esp_atcontrol_countviewer.py index 671fb4c..5b898c4 100644 --- a/examples/esp_atcontrol_countviewer.py +++ b/examples/esp_atcontrol_countviewer.py @@ -6,16 +6,19 @@ stars, price of bitcoin, twitter followers... if you can find something that spits out JSON data, we can display it! """ + import gc import time + +import adafruit_connection_manager +import adafruit_requests import board import busio -from digitalio import DigitalInOut -from digitalio import Direction import neopixel from adafruit_ht16k33 import segments -import adafruit_requests as requests -import adafruit_espatcontrol.adafruit_espatcontrol_socket as socket +from digitalio import DigitalInOut, Direction + +import adafruit_espatcontrol.adafruit_espatcontrol_socket as pool from adafruit_espatcontrol import adafruit_espatcontrol # Get wifi details and more from a secrets.py file @@ -101,7 +104,9 @@ ) esp.hard_reset() -requests.set_socket(socket, esp) +ssl_context = adafruit_connection_manager.create_fake_ssl_context(pool, esp) +requests = adafruit_requests.Session(pool, ssl_context) + # display if DISPLAY_ATTACHED: # Create the I2C interface. @@ -112,16 +117,14 @@ # neopixels if NEOPIXELS_ON_CHANGE: - pixels = neopixel.NeoPixel( - pixel_pin, num_pixels, brightness=0.4, pixel_order=(1, 0, 2, 3) - ) + pixels = neopixel.NeoPixel(pixel_pin, num_pixels, brightness=0.4, pixel_order=(1, 0, 2, 3)) pixels.fill(20) # music! if PLAY_SOUND_ON_CHANGE: import audioio - wave_file = open("coin.wav", "rb") # pylint: disable=consider-using-with + wave_file = open("coin.wav", "rb") wave = audioio.WaveFile(wave_file) # we'll save the value in question @@ -181,11 +184,11 @@ def chime_light(): value = value[x] if not value: continue - print("Times:{0}. The Time:{1}. Value: {2}".format(times, the_time, value)) + print(f"Times:{times}. The Time:{the_time}. Value: {value}") if DISPLAY_ATTACHED: display.print(int(value)) else: - print("INT Value:{0}".format(int(value))) + print(f"INT Value:{int(value)}") if last_value != value: chime_light() # animate the neopixels @@ -195,6 +198,6 @@ def chime_light(): # normally we wouldn't have to do this, but we get bad fragments r = value = None gc.collect() - print("GC MEM:{0}".format(gc.mem_free())) # pylint: disable=no-member - print("Sleeping for: {0} Seconds".format(TIME_BETWEEN_QUERY)) + print(f"GC MEM:{gc.mem_free()}") + print(f"Sleeping for: {TIME_BETWEEN_QUERY} Seconds") time.sleep(TIME_BETWEEN_QUERY) diff --git a/examples/esp_atcontrol_countviewer_enterprise.py b/examples/esp_atcontrol_countviewer_enterprise.py new file mode 100644 index 0000000..b59c704 --- /dev/null +++ b/examples/esp_atcontrol_countviewer_enterprise.py @@ -0,0 +1,210 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +""" +This example will access an API, grab a number like hackaday skulls, github +stars, price of bitcoin, twitter followers... if you can find something that +spits out JSON data, we can display it! +""" + +import gc +import time + +import adafruit_connection_manager +import adafruit_requests +import board +import busio +import neopixel +from digitalio import DigitalInOut, Direction + +import adafruit_espatcontrol.adafruit_espatcontrol_socket as pool +from adafruit_espatcontrol import adafruit_espatcontrol + +try: + from adafruit_ht16k33 import segments +except ImportError: + pass + + +# Get wifi details and more from a secrets.py file +try: + from secrets import secrets +except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + +# CONFIGURATION +PLAY_SOUND_ON_CHANGE = False +NEOPIXELS_ON_CHANGE = True +DISPLAY_ATTACHED = False +TIME_BETWEEN_QUERY = 60 # in seconds + +# Some data sources and JSON locations to try out + +# Bitcoin value in USD +# DATA_SOURCE = "http://api.coindesk.com/v1/bpi/currentprice.json" +# DATA_LOCATION = ["bpi", "USD", "rate_float"] + +# Github stars! You can query 1ce a minute without an API key token +# DATA_SOURCE = "https://api.github.com/repos/adafruit/circuitpython" +# if 'github_token' in secrets: +# DATA_SOURCE += "?access_token="+secrets['github_token'] +# DATA_LOCATION = ["stargazers_count"] + +# Youtube stats +# CHANNEL_ID = "UCpOlOeQjj7EsVnDh3zuCgsA" # this isn't a secret but you have to look it up +# DATA_SOURCE = "https://www.googleapis.com/youtube/v3/channels/?part=statistics&id=" \ +# + CHANNEL_ID +"&key="+secrets['youtube_token'] +# #try also 'viewCount' or 'videoCount +# DATA_LOCATION = ["items", 0, "statistics", "subscriberCount"] + + +# # Subreddit subscribers +# DATA_SOURCE = "https://www.reddit.com/r/circuitpython/about.json" +# DATA_LOCATION = ["data", "subscribers"] + +# Hackaday Skulls (likes), requires an API key +# DATA_SOURCE = "https://api.hackaday.io/v1/projects/1340?api_key="+secrets['hackaday_token'] +# DATA_LOCATION = ["skulls"] + +# Twitter followers +DATA_SOURCE = ( + "http://cdn.syndication.twimg.com/widgets/followbutton/info.json?" + "screen_names=adafruit" +) +DATA_LOCATION = [0, "followers_count"] + +# Debug Level +# Change the Debug Flag if you have issues with AT commands +debugflag = True + +if board.board_id == "challenger_rp2040_wifi": + RX = board.ESP_RX + TX = board.ESP_TX + resetpin = DigitalInOut(board.WIFI_RESET) + rtspin = False + uart = busio.UART(TX, RX, baudrate=11520, receiver_buffer_size=2048) + esp_boot = DigitalInOut(board.WIFI_MODE) + esp_boot.direction = Direction.OUTPUT + esp_boot.value = True + status_light = None + pixel_pin = board.NEOPIXEL + num_pixels = 1 + pixel_type = "RGBW/GRBW" +else: + RX = board.TX + TX = board.RX + resetpin = DigitalInOut(board.D4) + rtspin = DigitalInOut(board.D5) + uart = busio.UART(board.TX, board.RX, baudrate=11520, timeout=0.1, receiver_buffer_size=512) + esp_boot = DigitalInOut(board.D9) + esp_boot.direction = Direction.OUTPUT + esp_boot.value = True + status_light = None + pixel_pin = board.A1 + num_pixels = 16 + pixel_type = "RGB/GRB" + +# Create the connection to the co-processor and reset +esp = adafruit_espatcontrol.ESP_ATcontrol( + uart, 115200, reset_pin=resetpin, rts_pin=rtspin, debug=debugflag +) +esp.soft_reset() +esp.disconnect() + +ssl_context = adafruit_connection_manager.create_fake_ssl_context(pool, esp) +requests = adafruit_requests.Session(pool, ssl_context) + +# display +if DISPLAY_ATTACHED: + # Create the I2C interface. + i2c = busio.I2C(board.SCL, board.SDA) + # Attach a 7 segment display and display -'s so we know its not live yet + display = segments.Seg7x4(i2c) + display.print("----") + +# neopixels +if NEOPIXELS_ON_CHANGE: + pixels = neopixel.NeoPixel(pixel_pin, num_pixels, brightness=0.4, pixel_order=(1, 0, 2, 3)) + pixels.fill(20) + +# music! +if PLAY_SOUND_ON_CHANGE: + import audioio + + wave_file = open("coin.wav", "rb") + wave = audioio.WaveFile(wave_file) + +# we'll save the value in question +last_value = value = None +the_time = None +times = 0 + + +def chime_light(): + """Light up LEDs and play a tune""" + if NEOPIXELS_ON_CHANGE: + for i in range(0, 100, 10): + if pixel_type == "RGB/GRB": + pixels.fill((i, i, i)) + elif pixel_type == "RGBW/GRBW": + pixels.fill((i, i, i, i)) + pixels.show() + time.sleep(1) + if PLAY_SOUND_ON_CHANGE: + with audioio.AudioOut(board.A0) as audio: + audio.play(wave) + while audio.playing: + pass + if NEOPIXELS_ON_CHANGE: + for i in range(100, 0, -10): + if pixel_type == "RGB/GRB": + pixels.fill((i, i, i)) + elif pixel_type == "RGBW/GRBW": + pixels.fill((i, i, i, i)) + pixels.show() + time.sleep(1) + pixels.fill(0) + + +while True: + try: + while not esp.is_connected: + # secrets dictionary must contain 'ssid' and 'password' at a minimum + esp.connect_enterprise(secrets) + + the_time = esp.sntp_time + + # great, lets get the data + print("Retrieving data source...", end="") + r = requests.get(DATA_SOURCE) + print("Reply is OK!") + except (ValueError, RuntimeError, adafruit_espatcontrol.OKError) as e: + print("Failed to get data, retrying\n", e) + continue + # print('-'*40,) + # print("Headers: ", r.headers) + # print("Text:", r.text) + # print('-'*40) + + value = r.json() + for x in DATA_LOCATION: + value = value[x] + if not value: + continue + print(f"Times:{times}. The Time:{the_time}. Value: {value}") + if DISPLAY_ATTACHED: + display.print(int(value)) + else: + print(f"INT Value:{int(value)}") + + if last_value != value: + chime_light() # animate the neopixels + last_value = value + times += 1 + + # normally we wouldn't have to do this, but we get bad fragments + r = value = None + gc.collect() + print(f"GC MEM:{gc.mem_free()}") + print(f"Sleeping for: {TIME_BETWEEN_QUERY} Seconds") + time.sleep(TIME_BETWEEN_QUERY) diff --git a/examples/esp_atcontrol_localtime.py b/examples/esp_atcontrol_localtime.py index 7a23a01..10e4ad7 100644 --- a/examples/esp_atcontrol_localtime.py +++ b/examples/esp_atcontrol_localtime.py @@ -2,18 +2,17 @@ # SPDX-License-Identifier: MIT import time + import board import busio -from digitalio import DigitalInOut -from digitalio import Direction import rtc +from digitalio import DigitalInOut, Direction from adafruit_espatcontrol import ( adafruit_espatcontrol, adafruit_espatcontrol_wifimanager, ) - # Get wifi details and more from a secrets.py file try: from secrets import secrets @@ -77,22 +76,20 @@ json = response.json() current_time = json["datetime"] the_date, the_time = current_time.split("T") -year, month, mday = [int(x) for x in the_date.split("-")] +year, month, mday = (int(x) for x in the_date.split("-")) the_time = the_time.split(".")[0] -hours, minutes, seconds = [int(x) for x in the_time.split(":")] +hours, minutes, seconds = (int(x) for x in the_time.split(":")) # We can also fill in these extra nice things year_day = json["day_of_year"] week_day = json["day_of_week"] is_dst = json["dst"] -now = time.struct_time( - (year, month, mday, hours, minutes, seconds, week_day, year_day, is_dst) -) +now = time.struct_time((year, month, mday, hours, minutes, seconds, week_day, year_day, is_dst)) print(now) the_rtc.datetime = now while True: print(time.localtime()) - print("Sleeping for: {0} Seconds".format(sleep_duration)) + print(f"Sleeping for: {sleep_duration} Seconds") time.sleep(sleep_duration) diff --git a/examples/esp_atcontrol_simple_enterprise.py b/examples/esp_atcontrol_simple_enterprise.py new file mode 100644 index 0000000..3db4b21 --- /dev/null +++ b/examples/esp_atcontrol_simple_enterprise.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +import time + +import adafruit_connection_manager +import adafruit_requests +import board +import busio +from digitalio import DigitalInOut, Direction + +import adafruit_espatcontrol.adafruit_espatcontrol_socket as pool +from adafruit_espatcontrol import adafruit_espatcontrol + +# Get wifi details and more from a secrets.py file +try: + from secrets import secrets +except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + +# Debug Level +# Change the Debug Flag if you have issues with AT commands +debugflag = False + +# How long between queries +TIME_BETWEEN_QUERY = 60 # in seconds + +if board.board_id == "challenger_rp2040_wifi": + RX = board.ESP_RX + TX = board.ESP_TX + resetpin = DigitalInOut(board.WIFI_RESET) + rtspin = False + uart = busio.UART(TX, RX, baudrate=11520, receiver_buffer_size=2048) + esp_boot = DigitalInOut(board.WIFI_MODE) + esp_boot.direction = Direction.OUTPUT + esp_boot.value = True + status_light = None +else: + RX = board.TX + TX = board.RX + resetpin = DigitalInOut(board.D4) + rtspin = DigitalInOut(board.D5) + uart = busio.UART(board.TX, board.RX, baudrate=11520, timeout=0.1, receiver_buffer_size=512) + esp_boot = DigitalInOut(board.D9) + esp_boot.direction = Direction.OUTPUT + esp_boot.value = True + status_light = None + +print("ESP AT commands") +esp = adafruit_espatcontrol.ESP_ATcontrol( + uart, 115200, reset_pin=resetpin, rts_pin=rtspin, debug=debugflag +) + +URL = "http://wifitest.adafruit.com/testwifi/index.html" +print("ESP AT GET URL", URL) + +print("Resetting ESP module") +esp.hard_reset() +esp.soft_reset() +esp.disconnect() +# time.sleep(20) +esp.set_autoconnect(False) + +ssl_context = adafruit_connection_manager.create_fake_ssl_context(pool, esp) +requests = adafruit_requests.Session(pool, ssl_context) + +while True: + try: + print(f"Checking connection to {secrets['ssid']}...") + while not esp.is_connected: + print("Connecting...") + esp.connect_enterprise(secrets) + # great, lets get the data + print("Retrieving URL...", end="") + r = requests.get(URL) + print("Status:", r.status_code) + print("Content type:", r.headers["content-type"]) + print("Content size:", r.headers["content-length"]) + print("Encoding:", r.encoding) + print("Text:", r.text) + print("Disconnecting from WiFi") + esp.disconnect() + esp.disconnect() + print(f"Sleeping for: {TIME_BETWEEN_QUERY} Seconds") + time.sleep(TIME_BETWEEN_QUERY) + except (ValueError, RuntimeError, adafruit_espatcontrol.OKError) as e: + print("Failed to get data, retrying\n", e) + continue diff --git a/examples/esp_atcontrol_simpletest.py b/examples/esp_atcontrol_simpletest.py index ab66cbf..8b538f3 100644 --- a/examples/esp_atcontrol_simpletest.py +++ b/examples/esp_atcontrol_simpletest.py @@ -2,12 +2,12 @@ # SPDX-License-Identifier: MIT import time + import board import busio -from digitalio import DigitalInOut -from digitalio import Direction -from adafruit_espatcontrol import adafruit_espatcontrol +from digitalio import DigitalInOut, Direction +from adafruit_espatcontrol import adafruit_espatcontrol # Get wifi details and more from a secrets.py file try: diff --git a/examples/esp_atcontrol_webclient.py b/examples/esp_atcontrol_webclient.py index da50e7f..ba7ae73 100644 --- a/examples/esp_atcontrol_webclient.py +++ b/examples/esp_atcontrol_webclient.py @@ -2,14 +2,15 @@ # SPDX-License-Identifier: MIT import time + +import adafruit_connection_manager +import adafruit_requests import board import busio -from digitalio import DigitalInOut -from digitalio import Direction -import adafruit_requests as requests -import adafruit_espatcontrol.adafruit_espatcontrol_socket as socket -from adafruit_espatcontrol import adafruit_espatcontrol +from digitalio import DigitalInOut, Direction +import adafruit_espatcontrol.adafruit_espatcontrol_socket as pool +from adafruit_espatcontrol import adafruit_espatcontrol # Get wifi details and more from a secrets.py file try: @@ -57,7 +58,8 @@ print("Resetting ESP module") esp.hard_reset() -requests.set_socket(socket, esp) +ssl_context = adafruit_connection_manager.create_fake_ssl_context(pool, esp) +requests = adafruit_requests.Session(pool, ssl_context) while True: try: @@ -73,7 +75,7 @@ print("Content size:", r.headers["content-length"]) print("Encoding:", r.encoding) print("Text:", r.text) - print("Sleeping for: {0} Seconds".format(TIME_BETWEEN_QUERY)) + print(f"Sleeping for: {TIME_BETWEEN_QUERY} Seconds") time.sleep(TIME_BETWEEN_QUERY) except (ValueError, RuntimeError, adafruit_espatcontrol.OKError) as e: print("Failed to get data, retrying\n", e) diff --git a/examples/secrets_enterprise.py b/examples/secrets_enterprise.py new file mode 100644 index 0000000..ceae755 --- /dev/null +++ b/examples/secrets_enterprise.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +# This file is where you keep secret settings, passwords, and tokens! +# If you put them in the code you risk committing that info or sharing it + +secrets = { + "ssid": "your-ssid", + "password": "your-password", + "identity": "your-identity", + "username": "your-username", + "method": 1, # 0 = EAP-TLS, 1=EAP-PEAP, 2=EAP-TTLSs + "timezone": -2, # this is offset from UTC + "github_token": "abcdefghij0123456789", + "aio_username": "your-aio-username", + "aio_key": "your-aio-key", +} diff --git a/requirements.txt b/requirements.txt index 45266c4..13401af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,6 @@ Adafruit-Blinka adafruit-circuitpython-typing>=1.4.0 +adafruit-circuitpython-connectionmanager +adafruit-circuitpython-requests pyserial diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..8e34805 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,110 @@ +# SPDX-FileCopyrightText: 2024 Tim Cocks for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +target-version = "py38" +line-length = 100 + +[lint] +preview = true +select = ["I", "PL", "UP"] + +extend-select = [ + "D419", # empty-docstring + "E501", # line-too-long + "W291", # trailing-whitespace + "PLC0414", # useless-import-alias + "PLC2401", # non-ascii-name + "PLC2801", # unnecessary-dunder-call + "PLC3002", # unnecessary-direct-lambda-call + "E999", # syntax-error + "PLE0101", # return-in-init + "F706", # return-outside-function + "F704", # yield-outside-function + "PLE0116", # continue-in-finally + "PLE0117", # nonlocal-without-binding + "PLE0241", # duplicate-bases + "PLE0302", # unexpected-special-method-signature + "PLE0604", # invalid-all-object + "PLE0605", # invalid-all-format + "PLE0643", # potential-index-error + "PLE0704", # misplaced-bare-raise + "PLE1141", # dict-iter-missing-items + "PLE1142", # await-outside-async + "PLE1205", # logging-too-many-args + "PLE1206", # logging-too-few-args + "PLE1307", # bad-string-format-type + "PLE1310", # bad-str-strip-call + "PLE1507", # invalid-envvar-value + "PLE2502", # bidirectional-unicode + "PLE2510", # invalid-character-backspace + "PLE2512", # invalid-character-sub + "PLE2513", # invalid-character-esc + "PLE2514", # invalid-character-nul + "PLE2515", # invalid-character-zero-width-space + "PLR0124", # comparison-with-itself + "PLR0202", # no-classmethod-decorator + "PLR0203", # no-staticmethod-decorator + "UP004", # useless-object-inheritance + "PLR0206", # property-with-parameters + "PLR0904", # too-many-public-methods + "PLR0911", # too-many-return-statements + "PLR0912", # too-many-branches + "PLR0913", # too-many-arguments + "PLR0914", # too-many-locals + "PLR0915", # too-many-statements + "PLR0916", # too-many-boolean-expressions + "PLR1702", # too-many-nested-blocks + "PLR1704", # redefined-argument-from-local + "PLR1711", # useless-return + "C416", # unnecessary-comprehension + "PLR1733", # unnecessary-dict-index-lookup + "PLR1736", # unnecessary-list-index-lookup + + # ruff reports this rule is unstable + #"PLR6301", # no-self-use + + "PLW0108", # unnecessary-lambda + "PLW0120", # useless-else-on-loop + "PLW0127", # self-assigning-variable + "PLW0129", # assert-on-string-literal + "B033", # duplicate-value + "PLW0131", # named-expr-without-context + "PLW0245", # super-without-brackets + "PLW0406", # import-self + "PLW0602", # global-variable-not-assigned + "PLW0603", # global-statement + "PLW0604", # global-at-module-level + + # fails on the try: import typing used by libraries + #"F401", # unused-import + + "F841", # unused-variable + "E722", # bare-except + "PLW0711", # binary-op-exception + "PLW1501", # bad-open-mode + "PLW1508", # invalid-envvar-default + "PLW1509", # subprocess-popen-preexec-fn + "PLW2101", # useless-with-lock + "PLW3301", # nested-min-max +] + +ignore = [ + "PLR2004", # magic-value-comparison + "UP030", # format literals + "PLW1514", # unspecified-encoding + "PLR0913", # too-many-arguments + "PLR0915", # too-many-statements + "PLR0917", # too-many-positional-arguments + "PLR0904", # too-many-public-methods + "PLR0912", # too-many-branches + "PLR0916", # too-many-boolean-expressions + "PLR1702", # Too many nested blocks + "PLR0911", # Too many return statements + "PLW0603", # Using the global statement to update variable is discouraged + "PLR6301", # Method could be a function, class method, or static method + "PLW2901", # 'for' loop variable overwritten by assignment target +] + +[format] +line-ending = "lf"