Skip to content

Add type hints #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 55 additions & 37 deletions adafruit_espatcontrol/adafruit_espatcontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@

import gc
import time
from digitalio import Direction
from digitalio import Direction, DigitalInOut

try:
import busio
from typing import Optional, Dict, Union, List
except ImportError:
pass

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_espATcontrol.git"
Expand Down Expand Up @@ -67,13 +73,13 @@ class ESP_ATcontrol:

def __init__(
self,
uart,
default_baudrate,
uart: busio.UART,
default_baudrate: int,
*,
run_baudrate=None,
rts_pin=None,
reset_pin=None,
debug=False
run_baudrate: Optional[int] = None,
rts_pin: Optional[DigitalInOut] = None,
reset_pin: Optional[DigitalInOut] = None,
debug: bool = False
):
"""This function doesn't try to do any sync'ing, just sets up
# the hardware, that way nothing can unexpectedly fail!"""
Expand All @@ -100,7 +106,7 @@ def __init__(
self._ifconfig = []
self._initialized = False

def begin(self):
def begin(self) -> None:
"""Initialize the module by syncing, resetting if necessary, setting up
the desired baudrate, turning on single-socket mode, and configuring
SSL support. Required before using the module but we dont do in __init__
Expand Down Expand Up @@ -128,7 +134,7 @@ def begin(self):
except OKError:
pass # retry

def connect(self, secrets):
def connect(self, secrets: Dict[str, Union[str, int]]) -> None:
"""Repeatedly try to connect to an access point with the details in
the passed in 'secrets' dictionary. Be sure 'ssid' and 'password' are
defined in the secrets dict! If 'timezone' is set, we'll also configure
Expand Down Expand Up @@ -160,15 +166,23 @@ def connect(self, secrets):
# *************************** SOCKET SETUP ****************************

@property
def cipmux(self):
def cipmux(self) -> int:
"""The IP socket multiplexing setting. 0 for one socket, 1 for multi-socket"""
replies = self.at_response("AT+CIPMUX?", timeout=3).split(b"\r\n")
for reply in replies:
if reply.startswith(b"+CIPMUX:"):
return int(reply[8:])
raise RuntimeError("Bad response to CIPMUX?")

def socket_connect(self, conntype, remote, remote_port, *, keepalive=10, retries=1):
def socket_connect(
self,
conntype: str,
remote: str,
remote_port: int,
*,
keepalive: int = 10,
retries: int = 1
) -> bool:
"""Open a socket. conntype can be TYPE_TCP, TYPE_UDP, or TYPE_SSL. Remote
can be an IP address or DNS (we'll do the lookup for you. Remote port
is integer port on other side. We can't set the local port"""
Expand Down Expand Up @@ -199,7 +213,7 @@ def socket_connect(self, conntype, remote, remote_port, *, keepalive=10, retries
return True
return False

def socket_send(self, buffer, timeout=1):
def socket_send(self, buffer: bytes, timeout: int = 1) -> bool:
"""Send data over the already-opened socket, buffer must be bytes"""
cmd = "AT+CIPSEND=%d" % len(buffer)
self.at_response(cmd, timeout=5, retries=1)
Expand Down Expand Up @@ -232,7 +246,7 @@ def socket_send(self, buffer, timeout=1):
# Get newlines off front and back, then split into lines
return True

def socket_receive(self, timeout=5):
def socket_receive(self, timeout: int = 5) -> bytearray:
# pylint: disable=too-many-nested-blocks, too-many-branches
"""Check for incoming data over the open socket, returns bytes"""
incoming_bytes = None
Expand Down Expand Up @@ -298,7 +312,7 @@ def socket_receive(self, timeout=5):
gc.collect()
return ret

def socket_disconnect(self):
def socket_disconnect(self) -> None:
"""Close any open socket, if there is one"""
try:
self.at_response("AT+CIPCLOSE", retries=1)
Expand All @@ -307,7 +321,9 @@ def socket_disconnect(self):

# *************************** SNTP SETUP ****************************

def sntp_config(self, enable, timezone=None, server=None):
def sntp_config(
self, enable: bool, timezone: Optional[int] = None, server: Optional[str] = None
) -> None:
"""Configure the built in ESP SNTP client with a UTC-offset number (timezone)
and server as IP or hostname."""
cmd = "AT+CIPSNTPCFG="
Expand All @@ -322,7 +338,7 @@ def sntp_config(self, enable, timezone=None, server=None):
self.at_response(cmd, timeout=3)

@property
def sntp_time(self):
def sntp_time(self) -> Union[bytes, None]:
"""Return a string with time/date information using SNTP, may return
1970 'bad data' on the first few minutes, without warning!"""
replies = self.at_response("AT+CIPSNTPTIME?", timeout=5).split(b"\r\n")
Expand All @@ -334,7 +350,7 @@ def sntp_time(self):
# *************************** WIFI SETUP ****************************

@property
def is_connected(self):
def is_connected(self) -> bool:
"""Initialize module if not done yet, and check if we're connected to
an access point, returns True or False"""
if not self._initialized:
Expand All @@ -354,7 +370,7 @@ def is_connected(self):
return False

@property
def status(self):
def status(self) -> Union[int, None]:
"""The IP connection status number (see AT+CIPSTATUS datasheet for meaning)"""
replies = self.at_response("AT+CIPSTATUS", timeout=5).split(b"\r\n")
for reply in replies:
Expand All @@ -363,7 +379,7 @@ def status(self):
return None

@property
def mode(self):
def mode(self) -> Union[int, None]:
"""What mode we're in, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
if not self._initialized:
self.begin()
Expand All @@ -374,7 +390,7 @@ def mode(self):
raise RuntimeError("Bad response to CWMODE?")

@mode.setter
def mode(self, mode):
def mode(self, mode: int) -> None:
"""Station or AP mode selection, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
if not self._initialized:
self.begin()
Expand All @@ -383,15 +399,15 @@ def mode(self, mode):
self.at_response("AT+CWMODE=%d" % mode, timeout=3)

@property
def local_ip(self):
def local_ip(self) -> Union[str, None]:
"""Our local IP address as a dotted-quad string"""
reply = self.at_response("AT+CIFSR").strip(b"\r\n")
for line in reply.split(b"\r\n"):
if line and line.startswith(b'+CIFSR:STAIP,"'):
return str(line[14:-1], "utf-8")
raise RuntimeError("Couldn't find IP address")

def ping(self, host):
def ping(self, host: str) -> Union[int, None]:
"""Ping the IP or hostname given, returns ms time or None on failure"""
reply = self.at_response('AT+PING="%s"' % host.strip('"'), timeout=5)
for line in reply.split(b"\r\n"):
Expand All @@ -404,7 +420,7 @@ def ping(self, host):
return None
raise RuntimeError("Couldn't ping")

def nslookup(self, host):
def nslookup(self, host: str) -> Union[str, None]:
"""Return a dotted-quad IP address strings that matches the hostname"""
reply = self.at_response('AT+CIPDOMAIN="%s"' % host.strip('"'), timeout=3)
for line in reply.split(b"\r\n"):
Expand All @@ -415,7 +431,7 @@ def nslookup(self, host):
# *************************** AP SETUP ****************************

@property
def remote_AP(self): # pylint: disable=invalid-name
def remote_AP(self) -> List[Union[int, str, None]]: # pylint: disable=invalid-name
"""The name of the access point we're connected to, as a string"""
stat = self.status
if stat != self.STATUS_APCONNECTED:
Expand All @@ -434,7 +450,7 @@ def remote_AP(self): # pylint: disable=invalid-name
return reply
return [None] * 4

def join_AP(self, ssid, password): # pylint: disable=invalid-name
def join_AP(self, ssid: str, password: str) -> None: # pylint: disable=invalid-name
"""Try to join an access point by name and password, will return
immediately if we're already connected and won't try to reconnect"""
# First make sure we're in 'station' mode so we can connect to AP's
Expand All @@ -456,7 +472,9 @@ def join_AP(self, ssid, password): # pylint: disable=invalid-name
raise RuntimeError("Didn't get IP address")
return

def scan_APs(self, retries=3): # pylint: disable=invalid-name
def scan_APs( # pylint: disable=invalid-name
self, retries: int = 3
) -> Union[List[List[bytes]], None]:
"""Ask the module to scan for access points and return a list of lists
with name, RSSI, MAC addresses, etc"""
for _ in range(retries):
Expand All @@ -482,11 +500,11 @@ def scan_APs(self, retries=3): # pylint: disable=invalid-name
# ************************** AT LOW LEVEL ****************************

@property
def version(self):
def version(self) -> Union[str, None]:
"""The cached version string retrieved via the AT+GMR command"""
return self._version

def get_version(self):
def get_version(self) -> Union[str, None]:
"""Request the AT firmware version string and parse out the
version number"""
reply = self.at_response("AT+GMR", timeout=3).strip(b"\r\n")
Expand All @@ -499,12 +517,12 @@ def get_version(self):
self._version = str(line, "utf-8")
return self._version

def hw_flow(self, flag):
def hw_flow(self, flag: bool) -> None:
"""Turn on HW flow control (if available) on to allow data, or off to stop"""
if self._rts_pin:
self._rts_pin.value = not flag

def at_response(self, at_cmd, timeout=5, retries=3):
def at_response(self, at_cmd: str, timeout: int = 5, retries: int = 3) -> bytes:
"""Send an AT command, check that we got an OK response,
and then cut out the reply lines to return. We can set
a variable timeout (how long we'll wait for response) and
Expand Down Expand Up @@ -554,7 +572,7 @@ def at_response(self, at_cmd, timeout=5, retries=3):
return response[:-4]
raise OKError("No OK response to " + at_cmd)

def sync(self):
def sync(self) -> bool:
"""Check if we have AT commmand sync by sending plain ATs"""
try:
self.at_response("AT", timeout=1)
Expand All @@ -563,12 +581,12 @@ def sync(self):
return False

@property
def baudrate(self):
def baudrate(self) -> int:
"""The baudrate of our UART connection"""
return self._uart.baudrate

@baudrate.setter
def baudrate(self, baudrate):
def baudrate(self, baudrate: int) -> None:
"""Change the modules baudrate via AT commands and then check
that we're still sync'd."""
at_cmd = "AT+UART_CUR=" + str(baudrate) + ",8,1,0,"
Expand All @@ -588,14 +606,14 @@ def baudrate(self, baudrate):
if not self.sync():
raise RuntimeError("Failed to resync after Baudrate change")

def echo(self, echo):
def echo(self, echo: bool) -> None:
"""Set AT command echo on or off"""
if echo:
self.at_response("ATE1", timeout=1)
else:
self.at_response("ATE0", timeout=1)

def soft_reset(self):
def soft_reset(self) -> bool:
"""Perform a software reset by AT command. Returns True
if we successfully performed, false if failed to reset"""
try:
Expand All @@ -609,13 +627,13 @@ def soft_reset(self):
pass # fail, see below
return False

def factory_reset(self):
def factory_reset(self) -> None:
"""Perform a hard reset, then send factory restore settings request"""
self.hard_reset()
self.at_response("AT+RESTORE", timeout=1)
self._initialized = False

def hard_reset(self):
def hard_reset(self) -> None:
"""Perform a hardware reset by toggling the reset pin, if it was
defined in the initialization of this object"""
if self._reset_pin:
Expand Down
Loading