Skip to content

avoid endless loop when waiting for data from broker #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions adafruit_minimqtt/adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class MQTT:
:param str client_id: Optional client identifier, defaults to a unique, generated string.
:param bool is_ssl: Sets a secure or insecure connection with the broker.
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
:param int recv_timeout: receive timeout, in seconds.
:param socket socket_pool: A pool of socket resources available for the given radio.
:param ssl_context: SSL context for long-lived SSL connections.
:param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks.
Expand All @@ -146,6 +147,7 @@ def __init__(
client_id=None,
is_ssl=True,
keep_alive=60,
recv_timeout=10,
socket_pool=None,
ssl_context=None,
use_binary_mode=False,
Expand All @@ -157,7 +159,13 @@ def __init__(
self._sock = None
self._backwards_compatible_sock = False
self._use_binary_mode = use_binary_mode

if recv_timeout <= socket_timeout:
raise MMQTTException(
"recv_timeout must be strictly greater than socket_timeout"
)
self._socket_timeout = socket_timeout
self._recv_timeout = recv_timeout

self.keep_alive = keep_alive
self._user_data = None
Expand Down Expand Up @@ -522,6 +530,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
self._send_str(self._password)
if self.logger is not None:
self.logger.debug("Receiving CONNACK packet from broker")
stamp = time.monotonic()
while True:
op = self._wait_for_msg()
if op == 32:
Expand All @@ -535,6 +544,12 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
self.on_connect(self, self._user_data, result, rc[2])
return result

if op is None:
if time.monotonic() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)

def disconnect(self):
"""Disconnects the MiniMQTT client from the MQTT broker."""
self.is_connected()
Expand Down Expand Up @@ -645,6 +660,7 @@ def publish(self, topic, msg, retain=False, qos=0):
if qos == 0 and self.on_publish is not None:
self.on_publish(self, self._user_data, topic, self._pid)
if qos == 1:
stamp = time.monotonic()
while True:
op = self._wait_for_msg()
if op == 0x40:
Expand All @@ -657,6 +673,12 @@ def publish(self, topic, msg, retain=False, qos=0):
self.on_publish(self, self._user_data, topic, rcv_pid)
return

if op is None:
if time.monotonic() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)

def subscribe(self, topic, qos=0):
"""Subscribes to a topic on the MQTT Broker.
This method can subscribe to one topics or multiple topics.
Expand Down Expand Up @@ -705,6 +727,7 @@ def subscribe(self, topic, qos=0):
for t, q in topics:
self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q)
self._sock.send(packet)
stamp = time.monotonic()
while True:
op = self._wait_for_msg()
if op == 0x90:
Expand All @@ -718,6 +741,12 @@ def subscribe(self, topic, qos=0):
self._subscribed_topics.append(t)
return

if op is None:
if time.monotonic() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)

def unsubscribe(self, topic):
"""Unsubscribes from a MQTT topic.

Expand Down Expand Up @@ -755,6 +784,7 @@ def unsubscribe(self, topic):
if self.logger is not None:
self.logger.debug("Waiting for UNSUBACK...")
while True:
stamp = time.monotonic()
op = self._wait_for_msg()
if op == 176:
rc = self._sock_exact_recv(3)
Expand All @@ -767,6 +797,12 @@ def unsubscribe(self, topic):
self._subscribed_topics.remove(t)
return

if op is None:
if time.monotonic() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)

def reconnect(self, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.

Expand Down