diff --git a/README.rst b/README.rst index 8ee45df5..ec635da8 100644 --- a/README.rst +++ b/README.rst @@ -20,7 +20,6 @@ Dependencies This driver depends on: * `Adafruit CircuitPython `_ -* `Adafruit Logging `_ Please ensure all dependencies are available on the CircuitPython filesystem. This is easily achieved by downloading diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 6727e05b..aecde993 100755 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2019 Brent Rubell for Adafruit Industries +# SPDX-FileCopyrightText: 2019-2021 Brent Rubell for Adafruit Industries # # SPDX-License-Identifier: MIT @@ -10,24 +10,26 @@ `adafruit_minimqtt` ================================================================================ -MQTT Library for CircuitPython. +A minimal MQTT Library for CircuitPython. * Author(s): Brent Rubell Implementation Notes -------------------- +Adapted from https://github.com/micropython/micropython-lib/tree/master/umqtt.simple/umqtt + **Software and Dependencies:** * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases """ +import errno import struct import time from random import randint from micropython import const -import adafruit_logging as logging from .matcher import MQTTMatcher __version__ = "0.0.0-auto.0" @@ -59,7 +61,6 @@ const(0x05): "Connection Refused - Unauthorized", } - _the_interface = None # pylint: disable=invalid-name _the_sock = None # pylint: disable=invalid-name @@ -71,8 +72,9 @@ class MMQTTException(Exception): # pass +# Legacy ESP32SPI Socket API def set_socket(sock, iface=None): - """Helper to set the global socket and optionally set the global network interface. + """Legacy API for setting the socket and network interface, use a Session instead. :param sock: socket object. :param iface: internet interface object @@ -85,9 +87,35 @@ def set_socket(sock, iface=None): _the_sock.set_interface(iface) -class MQTT: - """MQTT Client for CircuitPython +class _FakeSSLSocket: + def __init__(self, socket, tls_mode): + self._socket = socket + self._mode = tls_mode + self.settimeout = socket.settimeout + self.send = socket.send + self.recv = socket.recv + self.close = socket.close + + def connect(self, address): + """connect wrapper to add non-standard mode parameter""" + try: + return self._socket.connect(address, self._mode) + except RuntimeError as error: + raise OSError(errno.ENOMEM) from error + +class _FakeSSLContext: + def __init__(self, iface): + self._iface = iface + + def wrap_socket(self, socket, server_hostname=None): + """Return the same socket""" + # pylint: disable=unused-argument + return _FakeSSLSocket(socket, self._iface.TLS_MODE) + + +class MQTT: + """MQTT Client for CircuitPython. :param str broker: MQTT Broker URL or IP Address. :param int port: Optional port definition, defaults to 8883. :param str username: Username for broker authentication. @@ -95,8 +123,9 @@ class MQTT: :param network_manager: NetworkManager object, such as WiFiManager from ESPSPI_WiFiManager. :param str client_id: Optional client identifier, defaults to a unique, generated string. :param bool is_ssl: Sets a secure or insecure connection with the broker. - :param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO. :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client. + :param socket socket_pool: A pool of socket resources available for the given radio. + :param ssl_context: SSL context for long-lived SSL connections. """ @@ -109,27 +138,51 @@ def __init__( password=None, client_id=None, is_ssl=True, - log=False, keep_alive=60, + socket_pool=None, + ssl_context=None, ): + + self._socket_pool = socket_pool + # Legacy API - if we do not have a socket pool, use default socket + if self._socket_pool is None: + self._socket_pool = _the_sock + + self._ssl_context = ssl_context + # Legacy API - if we do not have SSL context, fake it + if self._ssl_context is None: + self._ssl_context = _FakeSSLContext(_the_interface) + + # Hang onto open sockets so that we can reuse them + self._socket_free = {} + self._open_sockets = {} self._sock = None + self._backwards_compatible_sock = False + + self.keep_alive = keep_alive + self._user_data = None + self._is_connected = False + self._msg_size_lim = MQTT_MSG_SZ_LIM + self._pid = 0 + self._timestamp = 0 + self.logger = None + self.broker = broker - # port/ssl + self._username = username + self._password = password + if ( + self._password and len(password.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT + ): # [MQTT-3.1.3.5] + raise MMQTTException("Password length is too large.") + self.port = MQTT_TCP_PORT if is_ssl: self.port = MQTT_TLS_PORT - if port is not None: + if port: self.port = port - # session identifiers - self.user = username - # [MQTT-3.1.3.5] - self.password = password - if ( - self.password is not None - and len(password.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT - ): - raise MMQTTException("Password length is too large.") - if client_id is not None: + + # define client identifer + if client_id: # user-defined client_id MAY allow client_id's > 23 bytes or # non-alpha-numeric characters self.client_id = client_id @@ -141,27 +194,19 @@ def __init__( # generated client_id's enforce spec.'s length rules if len(self.client_id) > 23 or not self.client_id: raise ValueError("MQTT Client ID must be between 1 and 23 bytes") - self.keep_alive = keep_alive - self.user_data = None - self.logger = None - if log is True: - self.logger = logging.getLogger("log") - self.logger.setLevel(logging.INFO) - self._sock = None - self._is_connected = False - self._msg_size_lim = MQTT_MSG_SZ_LIM - self._pid = 0 - self._timestamp = 0 + # LWT self._lw_topic = None self._lw_qos = 0 self._lw_topic = None self._lw_msg = None self._lw_retain = False + # List of subscribed topics, used for tracking self._subscribed_topics = [] self._on_message_filtered = MQTTMatcher() - # Server callbacks + + # Default topic callback methods self._on_message = None self.on_connect = None self.on_disconnect = None @@ -169,6 +214,95 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None + # Socket helpers + def _free_socket(self, socket): + """Frees a socket for re-use.""" + if socket not in self._open_sockets.values(): + raise RuntimeError("Socket not from MQTT client.") + self._socket_free[socket] = True + + def _close_socket(self, socket): + """Closes a slocket.""" + socket.close() + del self._socket_free[socket] + key = None + for k in self._open_sockets: + if self._open_sockets[k] == socket: + key = k + break + if key: + del self._open_sockets[key] + + def _free_sockets(self): + """Closes all free sockets.""" + free_sockets = [] + for sock in self._socket_free: + if self._socket_free[sock]: + free_sockets.append(sock) + for sock in free_sockets: + self._close_socket(sock) + + # pylint: disable=too-many-branches + def _get_socket(self, host, port, *, timeout=1): + key = (host, port) + if key in self._open_sockets: + sock = self._open_sockets[key] + if self._socket_free[sock]: + self._socket_free[sock] = False + return sock + if port == 8883 and not self._ssl_context: + raise RuntimeError( + "ssl_context must be set before using adafruit_mqtt for secure MQTT." + ) + + # Legacy API - use a default socket instead of socket pool + if self._socket_pool is None: + self._socket_pool = _the_sock + + addr_info = self._socket_pool.getaddrinfo( + host, port, 0, self._socket_pool.SOCK_STREAM + )[0] + retry_count = 0 + sock = None + while retry_count < 5 and sock is None: + if retry_count > 0: + if any(self._socket_free.items()): + self._free_sockets() + else: + raise RuntimeError("Sending request failed") + retry_count += 1 + + try: + sock = self._socket_pool.socket( + addr_info[0], addr_info[1], addr_info[2] + ) + except OSError: + continue + + connect_host = addr_info[-1][0] + if port == 8883: + sock = self._ssl_context.wrap_socket(sock, server_hostname=host) + connect_host = host + sock.settimeout(timeout) + + try: + sock.connect((connect_host, port)) + except MemoryError: + sock.close() + sock = None + except OSError: + sock.close() + sock = None + + if sock is None: + raise RuntimeError("Repeated socket failures") + + self._backwards_compatible_sock = not hasattr(sock, "recv_into") + + self._open_sockets[key] = sock + self._socket_free[sock] = False + return sock + def __enter__(self): return self @@ -208,6 +342,20 @@ def deinit(self): """De-initializes the MQTT client and disconnects from the mqtt broker.""" self.disconnect() + @property + def mqtt_msg(self): + """Returns maximum MQTT payload and topic size.""" + return self._msg_size_lim, MQTT_TOPIC_LENGTH_LIMIT + + @mqtt_msg.setter + def mqtt_msg(self, msg_size): + """Sets the maximum MQTT message payload size. + + :param int msg_size: Maximum MQTT payload size. + """ + if msg_size < MQTT_MSG_MAX_SZ: + self._msg_size_lim = msg_size + def will_set(self, topic=None, payload=None, qos=0, retain=False): """Sets the last will and testament properties. MUST be called before `connect()`. @@ -222,9 +370,9 @@ def will_set(self, topic=None, payload=None, qos=0, retain=False): :param bool retain: Specifies if the payload is to be retained when it is published. """ - if self.logger is not None: + if self.logger: self.logger.debug("Setting last will properties") - self._check_qos(qos) + self._valid_qos(qos) if self._is_connected: raise MMQTTException("Last Will should only be called before connect().") if payload is None: @@ -284,36 +432,39 @@ def _handle_on_message(self, client, topic, message): if not matched and self.on_message: # regular on_message self.on_message(client, topic, message) + def username_pw_set(self, username, password=None): + """Set client's username and an optional password. + :param str username: Username to use with your MQTT broker. + :param str password: Password to use with your MQTT broker. + + """ + if self._is_connected: + raise MMQTTException("This method must be called before connect().") + self._username = username + if password is not None: + self._password = password + # pylint: disable=too-many-branches, too-many-statements, too-many-locals - def connect(self, clean_session=True): + def connect(self, clean_session=True, host=None, port=None, keep_alive=None): """Initiates connection with the MQTT Broker. - :param bool clean_session: Establishes a persistent session. + :param str host: Hostname or IP address of the remote broker. + :param int port: Network port of the remote broker. + :param int keep_alive: Maximum period allowed for communication, in seconds. + """ - self._sock = _the_sock.socket() - self._sock.settimeout(15) - if self.port == 8883: - try: - if self.logger is not None: - self.logger.debug( - "Attempting to establish secure MQTT connection..." - ) - conntype = _the_interface.TLS_MODE - self._sock.connect((self.broker, self.port), conntype) - except RuntimeError as e: - raise MMQTTException("Invalid broker address defined.", e) from None - else: - try: - if self.logger is not None: - self.logger.debug( - "Attempting to establish insecure MQTT connection..." - ) - addr = _the_sock.getaddrinfo( - self.broker, self.port, 0, _the_sock.SOCK_STREAM - )[0] - self._sock.connect(addr[-1], _the_interface.TCP_MODE) - except RuntimeError as e: - raise MMQTTException("Invalid broker address defined.", e) from None + if host: + self.broker = host + if port: + self.port = port + if keep_alive: + self.keep_alive = keep_alive + + if self.logger: + self.logger.debug("Attempting to establish MQTT connection...") + + # Attempt to get a new socket + self._sock = self._get_socket(self.broker, self.port) # Fixed Header fixed_header = bytearray([0x10]) @@ -321,14 +472,13 @@ def connect(self, clean_session=True): # NOTE: Variable header is # MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0") # because final 4 bytes are 4, 2, 0, 0 - # Variable Header var_header = MQTT_HDR_CONNECT var_header[6] = clean_session << 1 # Set up variable header and remaining_length remaining_length = 12 + len(self.client_id) - if self.user is not None: - remaining_length += 2 + len(self.user) + 2 + len(self.password) + if self._username: + remaining_length += 2 + len(self._username) + 2 + len(self._password) var_header[6] |= 0xC0 if self.keep_alive: assert self.keep_alive < MQTT_TOPIC_LENGTH_LIMIT @@ -357,10 +507,10 @@ def connect(self, clean_session=True): fixed_header.append(remaining_length) fixed_header.append(0x00) - if self.logger is not None: - self.logger.debug("Sending CONNECT to broker") + if self.logger: + self.logger.debug("Sending CONNECT to broker...") self.logger.debug( - "Fixed Header: {}\nVariable Header: {}".format(fixed_header, var_header) + "Fixed Header: %x\nVariable Header: %x", fixed_header, var_header ) self._sock.send(fixed_header) self._sock.send(var_header) @@ -370,12 +520,12 @@ def connect(self, clean_session=True): # [MQTT-3.1.3-11] self._send_str(self._lw_topic) self._send_str(self._lw_msg) - if self.user is None: - self.user = None + if self._username is None: + self._username = None else: - self._send_str(self.user) - self._send_str(self.password) - if self.logger is not None: + self._send_str(self._username) + self._send_str(self._password) + if self.logger: self.logger.debug("Receiving CONNACK packet from broker") while True: op = self._wait_for_msg() @@ -387,7 +537,7 @@ def connect(self, clean_session=True): self._is_connected = True result = rc[0] & 1 if self.on_connect is not None: - self.on_connect(self, self.user_data, result, rc[2]) + self.on_connect(self, self._user_data, result, rc[2]) return result def disconnect(self): @@ -406,7 +556,7 @@ def disconnect(self): self._is_connected = False self._subscribed_topics = [] if self.on_disconnect is not None: - self.on_disconnect(self, self.user_data, 0) + self.on_disconnect(self, self._user_data, 0) def ping(self): """Pings the MQTT Broker to confirm if the broker is alive or if @@ -431,36 +581,14 @@ def ping(self): # pylint: disable=too-many-branches, too-many-statements def publish(self, topic, msg, retain=False, qos=0): """Publishes a message to a topic provided. - :param str topic: Unique topic identifier. :param str,int,float msg: Data to send to the broker. :param bool retain: Whether the message is saved by the broker. - :param int qos: Quality of Service level for the message, defaults to - zero. Conventional options are ``0`` (send at most once), ``1`` - (send at least once), or ``2`` (send exactly once). - - .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library. - - Example of sending an integer, 3, to the broker on topic 'piVal'. - - .. code-block:: python - - mqtt_client.publish('topics/piVal', 3) - - Example of sending a float, 3.14, to the broker on topic 'piVal'. - - .. code-block:: python - - mqtt_client.publish('topics/piVal', 3.14) + :param int qos: Quality of Service level for the message, defaults to zero. - Example of sending a string, 'threepointonefour', to the broker on topic piVal. - - .. code-block:: python - - mqtt_client.publish('topics/piVal', 'threepointonefour') """ self.is_connected() - self._check_topic(topic) + self._valid_topic(topic) if "+" in topic or "#" in topic: raise MMQTTException("Publish topic can not contain wildcards.") # check msg/qos kwargs @@ -488,7 +616,6 @@ def publish(self, topic, msg, retain=False, qos=0): remaining_length = 2 + len(msg) + len(topic) if qos > 0: # packet identifier where QoS level is 1 or 2. [3.3.2.2] - pid = self._pid remaining_length += 2 pub_hdr_var.append(0x00) pub_hdr_var.append(self._pid) @@ -505,18 +632,20 @@ def publish(self, topic, msg, retain=False, qos=0): else: pub_hdr_fixed.append(remaining_length) - if self.logger is not None: + if self.logger: self.logger.debug( - "Sending PUBLISH\nTopic: {0}\nMsg: {1}\ - \nQoS: {2}\nRetain? {3}".format( - topic, msg, qos, retain - ) + "Sending PUBLISH\nTopic: %s\nMsg: %x\ + \nQoS: %d\nRetain? %r", + topic, + msg, + qos, + retain, ) self._sock.send(pub_hdr_fixed) self._sock.send(pub_hdr_var) self._sock.send(msg) if qos == 0 and self.on_publish is not None: - self.on_publish(self, self.user_data, topic, self._pid) + self.on_publish(self, self._user_data, topic, self._pid) if qos == 1: while True: op = self._wait_for_msg() @@ -525,9 +654,9 @@ def publish(self, topic, msg, retain=False, qos=0): assert sz == b"\x02" rcv_pid = self._sock_exact_recv(2) rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1] - if pid == rcv_pid: + if self._pid == rcv_pid: if self.on_publish is not None: - self.on_publish(self, self.user_data, topic, rcv_pid) + self.on_publish(self, self._user_data, topic, rcv_pid) return def subscribe(self, topic, qos=0): @@ -535,56 +664,31 @@ def subscribe(self, topic, qos=0): This method can subscribe to one topics or multiple topics. :param str,tuple,list topic: Unique MQTT topic identifier string. If - this is a `tuple`, then the tuple should contain topic identifier - string and qos level integer. If this is a `list`, then each list - element should be a tuple containing a topic identifier string and - qos level integer. + this is a `tuple`, then the tuple should + contain topic identifier string and qos + level integer. If this is a `list`, then + each list element should be a tuple containing + a topic identifier string and qos level integer. :param int qos: Quality of Service level for the topic, defaults to - zero. Conventional options are ``0`` (send at most once), ``1`` - (send at least once), or ``2`` (send exactly once). - - .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library. - - Example of subscribing a topic string. - - .. code-block:: python - - mqtt_client.subscribe('topics/ledState') - - Example of subscribing to a topic and setting the qos level to 1. - - .. code-block:: python - - mqtt_client.subscribe('topics/ledState', 1) - - Example of subscribing to topic string and setting qos level to 1, as a tuple. - - .. code-block:: python - - mqtt_client.subscribe(('topics/ledState', 1)) - - Example of subscribing to multiple topics with different qos levels. - - .. code-block:: python - - mqtt_client.subscribe([('topics/ledState', 1), ('topics/servoAngle', 0)]) + zero. Conventional options are ``0`` (send at most once), ``1`` + (send at least once), or ``2`` (send exactly once). """ self.is_connected() topics = None if isinstance(topic, tuple): topic, qos = topic - self._check_topic(topic) - self._check_qos(qos) + self._valid_topic(topic) + self._valid_qos(qos) if isinstance(topic, str): - self._check_topic(topic) - self._check_qos(qos) + self._valid_topic(topic) + self._valid_qos(qos) topics = [(topic, qos)] if isinstance(topic, list): topics = [] for t, q in topic: - self._check_qos(q) - self._check_topic(t) + self._valid_qos(q) + self._valid_topic(t) topics.append((t, q)) # Assemble packet packet_length = 2 + (2 * len(topics)) + (1 * len(topics)) @@ -598,10 +702,10 @@ def subscribe(self, topic, qos=0): for t, q in topics: topic_size = len(t).to_bytes(2, "big") qos_byte = q.to_bytes(1, "big") - packet += topic_size + t + qos_byte - if self.logger is not None: + packet += topic_size + t.encode() + qos_byte + if self.logger: for t, q in topics: - self.logger.debug("SUBSCRIBING to topic {0} with QoS {1}".format(t, q)) + self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q) self._sock.send(packet) while True: op = self._wait_for_msg() @@ -612,38 +716,23 @@ def subscribe(self, topic, qos=0): raise MMQTTException("SUBACK Failure!") for t, q in topics: if self.on_subscribe is not None: - self.on_subscribe(self, self.user_data, t, q) + self.on_subscribe(self, self._user_data, t, q) self._subscribed_topics.append(t) return def unsubscribe(self, topic): """Unsubscribes from a MQTT topic. - - :param str,list topic: Unique MQTT topic identifier string or a list - of tuples, where each tuple contains an MQTT topic identier - string. - - Example of unsubscribing from a topic string. - - .. code-block:: python - - mqtt_client.unsubscribe('topics/ledState') - - Example of unsubscribing from multiple topics. - - .. code-block:: python - - mqtt_client.unsubscribe([('topics/ledState'), ('topics/servoAngle')]) + :param str,list topic: Unique MQTT topic identifier string or list. """ topics = None if isinstance(topic, str): - self._check_topic(topic) + self._valid_topic(topic) topics = [(topic)] if isinstance(topic, list): topics = [] for t in topic: - self._check_topic(t) + self._valid_topic(t) topics.append((t)) for t in topics: if t not in self._subscribed_topics: @@ -659,41 +748,38 @@ def unsubscribe(self, topic): packet = MQTT_UNSUB + packet_length_byte + packet_id_bytes for t in topics: topic_size = len(t).to_bytes(2, "big") - packet += topic_size + t - if self.logger is not None: + packet += topic_size + t.encode() + if self.logger: for t in topics: - self.logger.debug("UNSUBSCRIBING from topic {0}.".format(t)) + self.logger.debug("UNSUBSCRIBING from topic %s", t) self._sock.send(packet) - if self.logger is not None: + if self.logger: self.logger.debug("Waiting for UNSUBACK...") while True: op = self._wait_for_msg() if op == 176: - return_code = self._sock_exact_recv(3) - assert return_code[0] == 0x02 + rc = self._sock_exact_recv(3) + assert rc[0] == 0x02 # [MQTT-3.32] - assert ( - return_code[1] == packet_id_bytes[0] - and return_code[2] == packet_id_bytes[1] - ) + assert rc[1] == packet_id_bytes[0] and rc[2] == packet_id_bytes[1] for t in topics: if self.on_unsubscribe is not None: - self.on_unsubscribe(self, self.user_data, t, self._pid) + self.on_unsubscribe(self, self._user_data, t, self._pid) self._subscribed_topics.remove(t) return def reconnect(self, resub_topics=True): """Attempts to reconnect to the MQTT broker. - :param bool resub_topics: Resubscribe to previously subscribed topics. + """ - if self.logger is not None: + if self.logger: self.logger.debug("Attempting to reconnect with MQTT broker") self.connect() - if self.logger is not None: + if self.logger: self.logger.debug("Reconnected with broker") if resub_topics: - if self.logger is not None: + if self.logger: self.logger.debug( "Attempting to resubscribe to previously subscribed topics." ) @@ -703,10 +789,12 @@ def reconnect(self, resub_topics=True): feed = subscribed_topics.pop() self.subscribe(feed) - def loop(self): + def loop(self, timeout=1): """Non-blocking message loop. Use this method to check incoming subscription messages. Returns response codes of any messages received. + :param int timeout: Socket timeout, in seconds. + """ if self._timestamp == 0: self._timestamp = time.monotonic() @@ -721,15 +809,28 @@ def loop(self): rcs = self.ping() self._timestamp = 0 return rcs - self._sock.settimeout(0.1) + self._sock.settimeout(timeout) rc = self._wait_for_msg() return [rc] if rc else None - def _wait_for_msg(self, timeout=30): - """Reads and processes network events. - Returns response code if successful. - """ - res = self._sock.recv(1) + def _wait_for_msg(self, timeout=0.1): + """Reads and processes network events.""" + # CPython socket module contains a timeout attribute + if hasattr(self._socket_pool, "timeout"): + try: + res = self._sock_exact_recv(1) + except self._socket_pool.timeout as error: + return None + else: # socketpool, esp32spi + try: + res = self._sock_exact_recv(1) + except OSError as error: + if error.errno == errno.ETIMEDOUT: + # raised by a socket timeout in socketpool + return None + raise MMQTTException from error + + # Block while we parse the rest of the response self._sock.settimeout(timeout) if res in [None, b""]: # If we get here, it means that there is nothing to be received @@ -746,15 +847,18 @@ def _wait_for_msg(self, timeout=30): if res[0] & 0xF0 != 0x30: return res[0] sz = self._recv_len() + # topic length MSB & LSB topic_len = self._sock_exact_recv(2) topic_len = (topic_len[0] << 8) | topic_len[1] topic = self._sock_exact_recv(topic_len) topic = str(topic, "utf-8") sz -= topic_len + 2 + pid = 0 if res[0] & 0x06: pid = self._sock_exact_recv(2) pid = pid[0] << 0x08 | pid[1] sz -= 0x02 + # read message contents msg = self._sock_exact_recv(sz) self._handle_on_message(self, topic, str(msg, "utf-8")) if res[0] & 0x06 == 0x02: @@ -766,8 +870,10 @@ def _wait_for_msg(self, timeout=30): return res[0] def _recv_len(self): + """Unpack MQTT message length.""" n = 0 sh = 0 + b = bytearray(1) while True: b = self._sock_exact_recv(1)[0] n |= (b & 0x7F) << sh @@ -775,10 +881,55 @@ def _recv_len(self): return n sh += 7 - def _send_str(self, string): - """Packs and encodes a string to a socket. + def _recv_into(self, buf, size=0): + """Backwards-compatible _recv_into implementation.""" + if self._backwards_compatible_sock: + size = len(buf) if size == 0 else size + b = self._sock.recv(size) + read_size = len(b) + buf[:read_size] = b + return read_size + return self._sock.recv_into(buf, size) + + def _sock_exact_recv(self, bufsize): + """Reads _exact_ number of bytes from the connected socket. Will only return + string with the exact number of bytes requested. + + The semantics of native socket receive is that it returns no more than the + specified number of bytes (i.e. max size). However, it makes no guarantees in + terms of the minimum size of the buffer, which could be 1 byte. This is a + wrapper for socket recv() to ensure that no less than the expected number of + bytes is returned or trigger a timeout exception. + :param int bufsize: number of bytes to receive + + """ + if not self._backwards_compatible_sock: + # CPython/Socketpool Impl. + rc = bytearray(bufsize) + self._sock.recv_into(rc, bufsize) + else: # ESP32SPI Impl. + stamp = time.monotonic() + read_timeout = self.keep_alive + rc = self._sock.recv(bufsize) + to_read = bufsize - len(rc) + assert to_read >= 0 + read_timeout = self.keep_alive + while to_read > 0: + recv = self._sock.recv(to_read) + to_read -= len(recv) + rc += recv + if time.monotonic() - stamp > read_timeout: + raise MMQTTException( + "Unable to receive {} bytes within {} seconds.".format( + to_read, read_timeout + ) + ) + return rc + def _send_str(self, string): + """Encodes a string and sends it to a socket. :param str string: String to write to the socket. + """ self._sock.send(struct.pack("!H", len(string))) if isinstance(string, str): @@ -787,10 +938,10 @@ def _send_str(self, string): self._sock.send(string) @staticmethod - def _check_topic(topic): - """Checks if topic provided is a valid mqtt topic. - + def _valid_topic(topic): + """Validates if topic provided is proper MQTT topic format. :param str topic: Topic identifier + """ if topic is None: raise MMQTTException("Topic may not be NoneType") @@ -802,10 +953,10 @@ def _check_topic(topic): raise MMQTTException("Topic length is too large.") @staticmethod - def _check_qos(qos_level): - """Validates the quality of service level. - + def _valid_qos(qos_level): + """Validates if the QoS level is supported by this library :param int qos_level: Desired QoS level. + """ if isinstance(qos_level, int): if qos_level < 0 or qos_level > 2: @@ -813,16 +964,6 @@ def _check_qos(qos_level): else: raise MMQTTException("QoS must be an integer.") - def _set_interface(self): - """Sets a desired network hardware interface. - The network hardware must be set in init - prior to calling this method. - """ - if self._wifi: - self._socket.set_interface(self._wifi.esp) - else: - raise TypeError("Network Manager Required.") - def is_connected(self): """Returns MQTT client session status as True if connected, raises a `MMQTTException` if `False`. @@ -831,47 +972,18 @@ def is_connected(self): raise MMQTTException("MiniMQTT is not connected.") return self._is_connected - @property - def mqtt_msg(self): - """Returns maximum MQTT payload and topic size.""" - return self._msg_size_lim, MQTT_TOPIC_LENGTH_LIMIT - - @mqtt_msg.setter - def mqtt_msg(self, msg_size): - """Sets the maximum MQTT message payload size. + # Logging + def enable_logger(self, logger, log_level=20): + """Enables library logging provided a logger object. + :param logger: A python logger pacakge. + :param log_level: Numeric value of a logging level, defaults to INFO. - :param int msg_size: Maximum MQTT payload size. - """ - if msg_size < MQTT_MSG_MAX_SZ: - self._msg_size_lim = msg_size - - ### Logging ### - def attach_logger(self, logger_name="log"): - """Initializes and attaches a logger to the MQTTClient. - - :param str logger_name: Name of the logger instance """ - self.logger = logging.getLogger(logger_name) - self.logger.setLevel(logging.INFO) + self.logger = logger.getLogger("log") + self.logger.setLevel(log_level) - def set_logger_level(self, log_level): - """Sets the level of the logger, if defined during init. - - :param str log_level: Level of logging to output to the REPL. - Acceptable options are ``DEBUG``, ``INFO``, ``WARNING``, or - ``ERROR``. - """ - if self.logger is None: - raise MMQTTException( - "No logger attached - did you create it during initialization?" - ) - if log_level == "DEBUG": - self.logger.setLevel(logging.DEBUG) - elif log_level == "INFO": - self.logger.setLevel(logging.INFO) - elif log_level == "WARNING": - self.logger.setLevel(logging.WARNING) - elif log_level == "ERROR": - self.logger.setLevel(logging.CRITICIAL) - else: - raise MMQTTException("Incorrect logging level provided!") + def disable_logger(self): + """Disables logging.""" + if not self.logger: + raise MMQTTException("Can not disable logger, no logger found.") + self.logger = None diff --git a/docs/conf.py b/docs/conf.py index 6d6f6f99..7dfe9601 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -24,7 +24,7 @@ # Uncomment the below if you use native CircuitPython modules such as # digitalio, micropython and busio. List the modules you use. Without it, the # autodoc module docs will fail to generate with a warning. -autodoc_mock_imports = ["micropython", "microcontroller", "random", "adafruit_logging"] +autodoc_mock_imports = ["micropython", "microcontroller", "random"] intersphinx_mapping = { diff --git a/docs/examples.rst b/docs/examples.rst index c7fbaf6c..847e6bb1 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -6,13 +6,3 @@ Ensure your device works with this simple test. .. literalinclude:: ../examples/minimqtt_simpletest.py :caption: examples/minimqtt_simpletest.py :linenos: - -Basic forever loop ------------------- - -This example shows how to write a loop that runs forever -& can handle disconnect/re-connect events. - -.. literalinclude:: ../examples/minimqtt_pub_sub_blocking.py - :caption: examples/minimqtt_pub_sub_blocking.py - :linenos: diff --git a/examples/minimqtt_adafruitio_cellular.py b/examples/cellular/minimqtt_adafruitio_cellular.py similarity index 100% rename from examples/minimqtt_adafruitio_cellular.py rename to examples/cellular/minimqtt_adafruitio_cellular.py diff --git a/examples/minimqtt_simpletest_cellular.py b/examples/cellular/minimqtt_simpletest_cellular.py similarity index 100% rename from examples/minimqtt_simpletest_cellular.py rename to examples/cellular/minimqtt_simpletest_cellular.py diff --git a/examples/cpython/minimqtt_adafruitio_cpython.py b/examples/cpython/minimqtt_adafruitio_cpython.py new file mode 100644 index 00000000..7eb4f5fb --- /dev/null +++ b/examples/cpython/minimqtt_adafruitio_cpython.py @@ -0,0 +1,75 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +import time +import socket +import adafruit_minimqtt.adafruit_minimqtt as MQTT + +### Secrets File Setup ### + +try: + from secrets import secrets +except ImportError: + print("Connection secrets are kept in secrets.py, please add them there!") + raise + +### Feeds ### + +# Setup a feed named 'photocell' for publishing to a feed +photocell_feed = secrets["aio_username"] + "/feeds/photocell" + +# Setup a feed named 'onoff' for subscribing to changes +onoff_feed = secrets["aio_username"] + "/feeds/onoff" + +### Code ### + +# Define callback methods which are called when events occur +# pylint: disable=unused-argument, redefined-outer-name +def connected(client, userdata, flags, rc): + # This function will be called when the client is connected + # successfully to the broker. + print("Connected to Adafruit IO! Listening for topic changes on %s" % onoff_feed) + # Subscribe to all changes on the onoff_feed. + client.subscribe(onoff_feed) + + +def disconnected(client, userdata, rc): + # This method is called when the client is disconnected + print("Disconnected from Adafruit IO!") + + +def message(client, topic, message): + # This method is called when a topic the client is subscribed to + # has a new message. + print("New message on topic {0}: {1}".format(topic, message)) + + +# Set up a MiniMQTT Client +mqtt_client = MQTT.MQTT( + broker=secrets["broker"], + port=1883, + username=secrets["aio_username"], + password=secrets["aio_key"], + socket_pool=socket, +) + +# Setup the callback methods above +mqtt_client.on_connect = connected +mqtt_client.on_disconnect = disconnected +mqtt_client.on_message = message + +# Connect the client to the MQTT broker. +print("Connecting to Adafruit IO...") +mqtt_client.connect() + +photocell_val = 0 +while True: + # Poll the message queue + mqtt_client.loop() + + # Send a new message + print("Sending photocell value: %d..." % photocell_val) + mqtt_client.publish(photocell_feed, photocell_val) + print("Sent!") + photocell_val += 1 + time.sleep(1) diff --git a/examples/cpython/minimqtt_simpletest_cpython.py b/examples/cpython/minimqtt_simpletest_cpython.py new file mode 100644 index 00000000..35fe18e9 --- /dev/null +++ b/examples/cpython/minimqtt_simpletest_cpython.py @@ -0,0 +1,95 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +import socket +import adafruit_minimqtt.adafruit_minimqtt as MQTT + +### Secrets File Setup ### + +try: + from secrets import secrets +except ImportError: + print("Connection secrets are kept in secrets.py, please add them there!") + raise + +### Topic Setup ### + +# MQTT Topic +# Use this topic if you'd like to connect to a standard MQTT broker +# mqtt_topic = "test/topic" + +# Adafruit IO-style Topic +# Use this topic if you'd like to connect to io.adafruit.com +mqtt_topic = secrets["aio_username"] + "/feeds/temperature" + +### Code ### + +# Keep track of client connection state +disconnect_client = False + +# Define callback methods which are called when events occur +# pylint: disable=unused-argument, redefined-outer-name +def connect(mqtt_client, userdata, flags, rc): + # This function will be called when the mqtt_client is connected + # successfully to the broker. + print("Connected to MQTT Broker!") + print("Flags: {0}\n RC: {1}".format(flags, rc)) + + +def disconnect(mqtt_client, userdata, rc): + # This method is called when the mqtt_client disconnects + # from the broker. + print("Disconnected from MQTT Broker!") + + +def subscribe(mqtt_client, userdata, topic, granted_qos): + # This method is called when the mqtt_client subscribes to a new feed. + print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos)) + + +def unsubscribe(mqtt_client, userdata, topic, pid): + # This method is called when the mqtt_client unsubscribes from a feed. + print("Unsubscribed from {0} with PID {1}".format(topic, pid)) + + +def publish(mqtt_client, userdata, topic, pid): + # This method is called when the mqtt_client publishes data to a feed. + print("Published to {0} with PID {1}".format(topic, pid)) + + +def message(client, topic, message): + # Method callled when a client's subscribed feed has a new value. + print("New message on topic {0}: {1}".format(topic, message)) + + +# Set up a MiniMQTT Client +mqtt_client = MQTT.MQTT( + broker=secrets["broker"], + port=1883, + username=secrets["aio_username"], + password=secrets["aio_key"], + socket_pool=socket, +) + +# Connect callback handlers to mqtt_client +mqtt_client.on_connect = connect +mqtt_client.on_disconnect = disconnect +mqtt_client.on_subscribe = subscribe +mqtt_client.on_unsubscribe = unsubscribe +mqtt_client.on_publish = publish +mqtt_client.on_message = message + +print("Attempting to connect to %s" % mqtt_client.broker) +mqtt_client.connect() + +print("Subscribing to %s" % mqtt_topic) +mqtt_client.subscribe(mqtt_topic) + +print("Publishing to %s" % mqtt_topic) +mqtt_client.publish(mqtt_topic, "Hello Broker!") + +print("Unsubscribing from %s" % mqtt_topic) +mqtt_client.unsubscribe(mqtt_topic) + +print("Disconnecting from %s" % mqtt_client.broker) +mqtt_client.disconnect() diff --git a/examples/minimqtt_adafruitio_wifi.py b/examples/esp32spi/minimqtt_adafruitio_esp32spi.py similarity index 96% rename from examples/minimqtt_adafruitio_wifi.py rename to examples/esp32spi/minimqtt_adafruitio_esp32spi.py index 38817455..7141e8bc 100644 --- a/examples/minimqtt_adafruitio_wifi.py +++ b/examples/esp32spi/minimqtt_adafruitio_esp32spi.py @@ -1,9 +1,6 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT -# Adafruit MiniMQTT Pub/Sub Example -# Written by Tony DiCola for Adafruit Industries -# Modified by Brent Rubell for Adafruit Industries import time import board import busio diff --git a/examples/minimqtt_certificate.py b/examples/esp32spi/minimqtt_certificate_esp32spi.py similarity index 100% rename from examples/minimqtt_certificate.py rename to examples/esp32spi/minimqtt_certificate_esp32spi.py diff --git a/examples/minimqtt_pub_sub_blocking.py b/examples/esp32spi/minimqtt_pub_sub_blocking_esp32spi.py similarity index 98% rename from examples/minimqtt_pub_sub_blocking.py rename to examples/esp32spi/minimqtt_pub_sub_blocking_esp32spi.py index 3c41545f..734816ea 100644 --- a/examples/minimqtt_pub_sub_blocking.py +++ b/examples/esp32spi/minimqtt_pub_sub_blocking_esp32spi.py @@ -1,8 +1,6 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT -# CircuitPython MiniMQTT Library -# Adafruit IO SSL/TLS Example for WiFi import time import board import busio diff --git a/examples/minimqtt_pub_sub_blocking_topic_callbacks.py b/examples/esp32spi/minimqtt_pub_sub_blocking_topic_callbacks_esp32spi.py similarity index 92% rename from examples/minimqtt_pub_sub_blocking_topic_callbacks.py rename to examples/esp32spi/minimqtt_pub_sub_blocking_topic_callbacks_esp32spi.py index 3ff0349d..60b4504f 100644 --- a/examples/minimqtt_pub_sub_blocking_topic_callbacks.py +++ b/examples/esp32spi/minimqtt_pub_sub_blocking_topic_callbacks_esp32spi.py @@ -77,7 +77,7 @@ def on_battery_msg(client, topic, message): # Method called when device/batteryLife has a new value print("Battery level: {}v".format(message)) - # client.remove_topic_callback("device/batteryLevel") + # client.remove_topic_callback(secrets["aio_username"] + "/feeds/device.batterylevel") def on_message(client, topic, message): @@ -101,14 +101,16 @@ def on_message(client, topic, message): client.on_subscribe = subscribe client.on_unsubscribe = unsubscribe client.on_message = on_message -client.add_topic_callback("device/batteryLevel", on_battery_msg) +client.add_topic_callback( + secrets["aio_username"] + "/feeds/device.batterylevel", on_battery_msg +) # Connect the client to the MQTT broker. print("Connecting to MQTT broker...") client.connect() -# Subscribe to all notifications on the device/ topic -client.subscribe("device/#", 1) +# Subscribe to all notifications on the device group +client.subscribe(secrets["aio_username"] + "/groups/device", 1) # Start a blocking message loop... # NOTE: NO code below this loop will execute diff --git a/examples/minimqtt_pub_sub_nonblocking.py b/examples/esp32spi/minimqtt_pub_sub_nonblocking_esp32spi.py similarity index 100% rename from examples/minimqtt_pub_sub_nonblocking.py rename to examples/esp32spi/minimqtt_pub_sub_nonblocking_esp32spi.py diff --git a/examples/minimqtt_pub_sub_pyportal.py b/examples/esp32spi/minimqtt_pub_sub_pyportal_esp32spi.py similarity index 100% rename from examples/minimqtt_pub_sub_pyportal.py rename to examples/esp32spi/minimqtt_pub_sub_pyportal_esp32spi.py diff --git a/examples/esp32spi/minimqtt_simpletest_esp32spi.py b/examples/esp32spi/minimqtt_simpletest_esp32spi.py new file mode 100644 index 00000000..254253dc --- /dev/null +++ b/examples/esp32spi/minimqtt_simpletest_esp32spi.py @@ -0,0 +1,126 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT +import board +import busio +from digitalio import DigitalInOut +from adafruit_esp32spi import adafruit_esp32spi +import adafruit_esp32spi.adafruit_esp32spi_socket as socket +import adafruit_minimqtt.adafruit_minimqtt as MQTT + +# Add a secrets.py to your filesystem that has a dictionary called secrets with "ssid" and +# "password" keys with your WiFi credentials. DO NOT share that file or commit it into Git or other +# source control. +# pylint: disable=no-name-in-module,wrong-import-order +try: + from secrets import secrets +except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + +# Set your Adafruit IO Username and Key in secrets.py +# (visit io.adafruit.com if you need to create an account, +# or if you need your Adafruit IO key.) +aio_username = secrets["aio_username"] +aio_key = secrets["aio_key"] + +# If you are using a board with pre-defined ESP32 Pins: +esp32_cs = DigitalInOut(board.ESP_CS) +esp32_ready = DigitalInOut(board.ESP_BUSY) +esp32_reset = DigitalInOut(board.ESP_RESET) + +# If you have an externally connected ESP32: +# esp32_cs = DigitalInOut(board.D9) +# esp32_ready = DigitalInOut(board.D10) +# esp32_reset = DigitalInOut(board.D5) + +spi = busio.SPI(board.SCK, board.MOSI, board.MISO) +esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) + +print("Connecting to AP...") +while not esp.is_connected: + try: + esp.connect_AP(secrets["ssid"], secrets["password"]) + except RuntimeError as e: + print("could not connect to AP, retrying: ", e) + continue +print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi) + +### Topic Setup ### + +# MQTT Topic +# Use this topic if you'd like to connect to a standard MQTT broker +mqtt_topic = "test/topic" + +# Adafruit IO-style Topic +# Use this topic if you'd like to connect to io.adafruit.com +# mqtt_topic = secrets["aio_username"] + '/feeds/temperature' + +### Code ### + +# Define callback methods which are called when events occur +# pylint: disable=unused-argument, redefined-outer-name +def connect(mqtt_client, userdata, flags, rc): + # This function will be called when the mqtt_client is connected + # successfully to the broker. + print("Connected to MQTT Broker!") + print("Flags: {0}\n RC: {1}".format(flags, rc)) + + +def disconnect(mqtt_client, userdata, rc): + # This method is called when the mqtt_client disconnects + # from the broker. + print("Disconnected from MQTT Broker!") + + +def subscribe(mqtt_client, userdata, topic, granted_qos): + # This method is called when the mqtt_client subscribes to a new feed. + print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos)) + + +def unsubscribe(mqtt_client, userdata, topic, pid): + # This method is called when the mqtt_client unsubscribes from a feed. + print("Unsubscribed from {0} with PID {1}".format(topic, pid)) + + +def publish(mqtt_client, userdata, topic, pid): + # This method is called when the mqtt_client publishes data to a feed. + print("Published to {0} with PID {1}".format(topic, pid)) + + +def message(client, topic, message): + print("New message on topic {0}: {1}".format(topic, message)) + + +socket.set_interface(esp) +MQTT.set_socket(socket, esp) + +# Set up a MiniMQTT Client +mqtt_client = MQTT.MQTT( + broker=secrets["broker"], + port=secrets["port"], + username=secrets["username"], + password=secrets["password"], +) + +# Connect callback handlers to mqtt_client +mqtt_client.on_connect = connect +mqtt_client.on_disconnect = disconnect +mqtt_client.on_subscribe = subscribe +mqtt_client.on_unsubscribe = unsubscribe +mqtt_client.on_publish = publish +mqtt_client.on_message = message + +print("Attempting to connect to %s" % mqtt_client.broker) +mqtt_client.connect() + +print("Subscribing to %s" % mqtt_topic) +mqtt_client.subscribe(mqtt_topic) + +print("Publishing to %s" % mqtt_topic) +mqtt_client.publish(mqtt_topic, "Hello Broker!") + +print("Unsubscribing from %s" % mqtt_topic) +mqtt_client.unsubscribe(mqtt_topic) + +print("Disconnecting from %s" % mqtt_client.broker) +mqtt_client.disconnect() diff --git a/examples/minimqtt_adafruitio_eth.py b/examples/ethernet/minimqtt_adafruitio_eth.py similarity index 95% rename from examples/minimqtt_adafruitio_eth.py rename to examples/ethernet/minimqtt_adafruitio_eth.py index 1d8cf9cc..753bf473 100755 --- a/examples/minimqtt_adafruitio_eth.py +++ b/examples/ethernet/minimqtt_adafruitio_eth.py @@ -1,9 +1,6 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT -# Adafruit MiniMQTT Pub/Sub Example -# Written by Tony DiCola for Adafruit Industries -# Modified by Brent Rubell for Adafruit Industries import time import board import busio diff --git a/examples/minimqtt_simpletest_eth.py b/examples/ethernet/minimqtt_simpletest_eth.py similarity index 100% rename from examples/minimqtt_simpletest_eth.py rename to examples/ethernet/minimqtt_simpletest_eth.py diff --git a/examples/minimqtt_simpletest.py b/examples/minimqtt_simpletest.py index edeed52d..0a4ca3fa 100644 --- a/examples/minimqtt_simpletest.py +++ b/examples/minimqtt_simpletest.py @@ -1,51 +1,30 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT -import board -import busio -from digitalio import DigitalInOut -import neopixel -from adafruit_esp32spi import adafruit_esp32spi -from adafruit_esp32spi import adafruit_esp32spi_wifimanager -import adafruit_esp32spi.adafruit_esp32spi_socket as socket - +import ssl +import socketpool +import wifi import adafruit_minimqtt.adafruit_minimqtt as MQTT -### WiFi ### - -# Get wifi details and more from a secrets.py file +# Add a secrets.py to your filesystem that has a dictionary called secrets with "ssid" and +# "password" keys with your WiFi credentials. DO NOT share that file or commit it into Git or other +# source control. +# pylint: disable=no-name-in-module,wrong-import-order try: from secrets import secrets except ImportError: print("WiFi secrets are kept in secrets.py, please add them there!") raise -# If you are using a board with pre-defined ESP32 Pins: -esp32_cs = DigitalInOut(board.ESP_CS) -esp32_ready = DigitalInOut(board.ESP_BUSY) -esp32_reset = DigitalInOut(board.ESP_RESET) - -# If you have an externally connected ESP32: -# esp32_cs = DigitalInOut(board.D9) -# esp32_ready = DigitalInOut(board.D10) -# esp32_reset = DigitalInOut(board.D5) - -spi = busio.SPI(board.SCK, board.MOSI, board.MISO) -esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) -"""Use below for Most Boards""" -status_light = neopixel.NeoPixel( - board.NEOPIXEL, 1, brightness=0.2 -) # Uncomment for Most Boards -"""Uncomment below for ItsyBitsy M4""" -# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2) -# Uncomment below for an externally defined RGB LED -# import adafruit_rgbled -# from adafruit_esp32spi import PWMOut -# RED_LED = PWMOut.PWMOut(esp, 26) -# GREEN_LED = PWMOut.PWMOut(esp, 27) -# BLUE_LED = PWMOut.PWMOut(esp, 25) -# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED) -wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) +# Set your Adafruit IO Username and Key in secrets.py +# (visit io.adafruit.com if you need to create an account, +# or if you need your Adafruit IO key.) +aio_username = secrets["aio_username"] +aio_key = secrets["aio_key"] + +print("Connecting to %s" % secrets["ssid"]) +wifi.radio.connect(secrets["ssid"], secrets["password"]) +print("Connected to %s!" % secrets["ssid"]) ### Topic Setup ### @@ -55,71 +34,76 @@ # Adafruit IO-style Topic # Use this topic if you'd like to connect to io.adafruit.com -# mqtt_topic = 'aio_user/feeds/temperature' +# mqtt_topic = secrets["aio_username"] + '/feeds/temperature' ### Code ### - # Define callback methods which are called when events occur # pylint: disable=unused-argument, redefined-outer-name -def connect(client, userdata, flags, rc): - # This function will be called when the client is connected +def connect(mqtt_client, userdata, flags, rc): + # This function will be called when the mqtt_client is connected # successfully to the broker. print("Connected to MQTT Broker!") print("Flags: {0}\n RC: {1}".format(flags, rc)) -def disconnect(client, userdata, rc): - # This method is called when the client disconnects +def disconnect(mqtt_client, userdata, rc): + # This method is called when the mqtt_client disconnects # from the broker. print("Disconnected from MQTT Broker!") -def subscribe(client, userdata, topic, granted_qos): - # This method is called when the client subscribes to a new feed. +def subscribe(mqtt_client, userdata, topic, granted_qos): + # This method is called when the mqtt_client subscribes to a new feed. print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos)) -def unsubscribe(client, userdata, topic, pid): - # This method is called when the client unsubscribes from a feed. +def unsubscribe(mqtt_client, userdata, topic, pid): + # This method is called when the mqtt_client unsubscribes from a feed. print("Unsubscribed from {0} with PID {1}".format(topic, pid)) -def publish(client, userdata, topic, pid): - # This method is called when the client publishes data to a feed. +def publish(mqtt_client, userdata, topic, pid): + # This method is called when the mqtt_client publishes data to a feed. print("Published to {0} with PID {1}".format(topic, pid)) -# Connect to WiFi -print("Connecting to WiFi...") -wifi.connect() -print("Connected!") +def message(client, topic, message): + # Method callled when a client's subscribed feed has a new value. + print("New message on topic {0}: {1}".format(topic, message)) + -# Initialize MQTT interface with the esp interface -MQTT.set_socket(socket, esp) +# Create a socket pool +pool = socketpool.SocketPool(wifi.radio) # Set up a MiniMQTT Client -client = MQTT.MQTT( - broker=secrets["broker"], username=secrets["user"], password=secrets["pass"] +mqtt_client = MQTT.MQTT( + broker=secrets["broker"], + port=secrets["port"], + username=secrets["aio_username"], + password=secrets["aio_key"], + socket_pool=pool, + ssl_context=ssl.create_default_context(), ) -# Connect callback handlers to client -client.on_connect = connect -client.on_disconnect = disconnect -client.on_subscribe = subscribe -client.on_unsubscribe = unsubscribe -client.on_publish = publish +# Connect callback handlers to mqtt_client +mqtt_client.on_connect = connect +mqtt_client.on_disconnect = disconnect +mqtt_client.on_subscribe = subscribe +mqtt_client.on_unsubscribe = unsubscribe +mqtt_client.on_publish = publish +mqtt_client.on_message = message -print("Attempting to connect to %s" % client.broker) -client.connect() +print("Attempting to connect to %s" % mqtt_client.broker) +mqtt_client.connect() print("Subscribing to %s" % mqtt_topic) -client.subscribe(mqtt_topic) +mqtt_client.subscribe(mqtt_topic) print("Publishing to %s" % mqtt_topic) -client.publish(mqtt_topic, "Hello Broker!") +mqtt_client.publish(mqtt_topic, "Hello Broker!") print("Unsubscribing from %s" % mqtt_topic) -client.unsubscribe(mqtt_topic) +mqtt_client.unsubscribe(mqtt_topic) -print("Disconnecting from %s" % client.broker) -client.disconnect() +print("Disconnecting from %s" % mqtt_client.broker) +mqtt_client.disconnect() diff --git a/examples/native_networking/minimqtt_adafruitio_native_networking.py b/examples/native_networking/minimqtt_adafruitio_native_networking.py new file mode 100644 index 00000000..a4f5ecaa --- /dev/null +++ b/examples/native_networking/minimqtt_adafruitio_native_networking.py @@ -0,0 +1,92 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +import time +import ssl +import socketpool +import wifi +import adafruit_minimqtt.adafruit_minimqtt as MQTT + +# Add a secrets.py to your filesystem that has a dictionary called secrets with "ssid" and +# "password" keys with your WiFi credentials. DO NOT share that file or commit it into Git or other +# source control. +# pylint: disable=no-name-in-module,wrong-import-order +try: + from secrets import secrets +except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + +# Set your Adafruit IO Username and Key in secrets.py +# (visit io.adafruit.com if you need to create an account, +# or if you need your Adafruit IO key.) +aio_username = secrets["aio_username"] +aio_key = secrets["aio_key"] + +print("Connecting to %s" % secrets["ssid"]) +wifi.radio.connect(secrets["ssid"], secrets["password"]) +print("Connected to %s!" % secrets["ssid"]) +### Feeds ### + +# Setup a feed named 'photocell' for publishing to a feed +photocell_feed = secrets["aio_username"] + "/feeds/photocell" + +# Setup a feed named 'onoff' for subscribing to changes +onoff_feed = secrets["aio_username"] + "/feeds/onoff" + +### Code ### + +# Define callback methods which are called when events occur +# pylint: disable=unused-argument, redefined-outer-name +def connected(client, userdata, flags, rc): + # This function will be called when the client is connected + # successfully to the broker. + print("Connected to Adafruit IO! Listening for topic changes on %s" % onoff_feed) + # Subscribe to all changes on the onoff_feed. + client.subscribe(onoff_feed) + + +def disconnected(client, userdata, rc): + # This method is called when the client is disconnected + print("Disconnected from Adafruit IO!") + + +def message(client, topic, message): + # This method is called when a topic the client is subscribed to + # has a new message. + print("New message on topic {0}: {1}".format(topic, message)) + + +# Create a socket pool +pool = socketpool.SocketPool(wifi.radio) + +# Set up a MiniMQTT Client +mqtt_client = MQTT.MQTT( + broker=secrets["broker"], + port=secrets["port"], + username=secrets["aio_username"], + password=secrets["aio_key"], + socket_pool=pool, + ssl_context=ssl.create_default_context(), +) + +# Setup the callback methods above +mqtt_client.on_connect = connected +mqtt_client.on_disconnect = disconnected +mqtt_client.on_message = message + +# Connect the client to the MQTT broker. +print("Connecting to Adafruit IO...") +mqtt_client.connect() + +photocell_val = 0 +while True: + # Poll the message queue + mqtt_client.loop() + + # Send a new message + print("Sending photocell value: %d..." % photocell_val) + mqtt_client.publish(photocell_feed, photocell_val) + print("Sent!") + photocell_val += 1 + time.sleep(5) diff --git a/examples/native_networking/minimqtt_pub_sub_blocking_native_networking.py b/examples/native_networking/minimqtt_pub_sub_blocking_native_networking.py new file mode 100644 index 00000000..58dbc7f7 --- /dev/null +++ b/examples/native_networking/minimqtt_pub_sub_blocking_native_networking.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +import time +import ssl +import socketpool +import wifi +import adafruit_minimqtt.adafruit_minimqtt as MQTT + +# Add a secrets.py to your filesystem that has a dictionary called secrets with "ssid" and +# "password" keys with your WiFi credentials. DO NOT share that file or commit it into Git or other +# source control. +# pylint: disable=no-name-in-module,wrong-import-order +try: + from secrets import secrets +except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + +# Set your Adafruit IO Username and Key in secrets.py +# (visit io.adafruit.com if you need to create an account, +# or if you need your Adafruit IO key.) +aio_username = secrets["aio_username"] +aio_key = secrets["aio_key"] + +print("Connecting to %s" % secrets["ssid"]) +wifi.radio.connect(secrets["ssid"], secrets["password"]) +print("Connected to %s!" % secrets["ssid"]) + +### Adafruit IO Setup ### + +# Setup a feed named `testfeed` for publishing. +default_topic = secrets["aio_username"] + "/feeds/testfeed" + +### Code ### +# Define callback methods which are called when events occur +# pylint: disable=unused-argument, redefined-outer-name +def connected(client, userdata, flags, rc): + # This function will be called when the client is connected + # successfully to the broker. + print("Connected to MQTT broker! Listening for topic changes on %s" % default_topic) + # Subscribe to all changes on the default_topic feed. + client.subscribe(default_topic) + + +def disconnected(client, userdata, rc): + # This method is called when the client is disconnected + print("Disconnected from MQTT Broker!") + + +def message(client, topic, message): + """Method callled when a client's subscribed feed has a new + value. + :param str topic: The topic of the feed with a new value. + :param str message: The new value + """ + print("New message on topic {0}: {1}".format(topic, message)) + + +# Create a socket pool +pool = socketpool.SocketPool(wifi.radio) + +# Set up a MiniMQTT Client +mqtt_client = MQTT.MQTT( + broker=secrets["broker"], + port=secrets["port"], + username=secrets["aio_username"], + password=secrets["aio_key"], + socket_pool=pool, + ssl_context=ssl.create_default_context(), +) + +# Setup the callback methods above +mqtt_client.on_connect = connected +mqtt_client.on_disconnect = disconnected +mqtt_client.on_message = message + +# Connect the client to the MQTT broker. +print("Connecting to MQTT broker...") +mqtt_client.connect() + +# Start a blocking message loop... +# NOTE: NO code below this loop will execute +# NOTE: Network reconnection is handled within this loop +while True: + try: + mqtt_client.loop() + except (ValueError, RuntimeError) as e: + print("Failed to get data, retrying\n", e) + wifi.reset() + mqtt_client.reconnect() + continue + time.sleep(1) diff --git a/examples/native_networking/minimqtt_pub_sub_blocking_topic_callbacks_native_networking.py b/examples/native_networking/minimqtt_pub_sub_blocking_topic_callbacks_native_networking.py new file mode 100644 index 00000000..2a2eddf3 --- /dev/null +++ b/examples/native_networking/minimqtt_pub_sub_blocking_topic_callbacks_native_networking.py @@ -0,0 +1,107 @@ +# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +import time +import ssl +import socketpool +import wifi +import adafruit_minimqtt.adafruit_minimqtt as MQTT + +# Add a secrets.py to your filesystem that has a dictionary called secrets with "ssid" and +# "password" keys with your WiFi credentials. DO NOT share that file or commit it into Git or other +# source control. +# pylint: disable=no-name-in-module,wrong-import-order +try: + from secrets import secrets +except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + +# Set your Adafruit IO Username and Key in secrets.py +# (visit io.adafruit.com if you need to create an account, +# or if you need your Adafruit IO key.) +aio_username = secrets["aio_username"] +aio_key = secrets["aio_key"] + +print("Connecting to %s" % secrets["ssid"]) +wifi.radio.connect(secrets["ssid"], secrets["password"]) +print("Connected to %s!" % secrets["ssid"]) + +### Code ### + +# Define callback methods which are called when events occur +# pylint: disable=unused-argument, redefined-outer-name +def connected(client, userdata, flags, rc): + # This function will be called when the client is connected + # successfully to the broker. + print("Connected to MQTT Broker!") + + +def disconnected(client, userdata, rc): + # This method is called when the client is disconnected + print("Disconnected from MQTT Broker!") + + +def subscribe(client, userdata, topic, granted_qos): + # This method is called when the client subscribes to a new feed. + print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos)) + + +def unsubscribe(client, userdata, topic, pid): + # This method is called when the client unsubscribes from a feed. + print("Unsubscribed from {0} with PID {1}".format(topic, pid)) + + +def on_battery_msg(client, topic, message): + # Method called when device/batteryLife has a new value + print("Battery level: {}v".format(message)) + + # client.remove_topic_callback(secrets["aio_username"] + "/feeds/device.batterylevel") + + +def on_message(client, topic, message): + # Method callled when a client's subscribed feed has a new value. + print("New message on topic {0}: {1}".format(topic, message)) + + +# Create a socket pool +pool = socketpool.SocketPool(wifi.radio) + +# Set up a MiniMQTT Client +client = MQTT.MQTT( + broker=secrets["broker"], + port=secrets["port"], + username=secrets["aio_username"], + password=secrets["aio_key"], + socket_pool=pool, + ssl_context=ssl.create_default_context(), +) + +# Setup the callback methods above +client.on_connect = connected +client.on_disconnect = disconnected +client.on_subscribe = subscribe +client.on_unsubscribe = unsubscribe +client.on_message = on_message +client.add_topic_callback( + secrets["aio_username"] + "/feeds/device.batterylevel", on_battery_msg +) + +# Connect the client to the MQTT broker. +print("Connecting to MQTT broker...") +client.connect() + +# Subscribe to all notifications on the device group +client.subscribe(secrets["aio_username"] + "/groups/device", 1) + +# Start a blocking message loop... +# NOTE: NO code below this loop will execute +while True: + try: + client.loop() + except (ValueError, RuntimeError) as e: + print("Failed to get data, retrying\n", e) + wifi.reset() + client.reconnect() + continue + time.sleep(1)