From 16496faf6dac95436a826a870716a94834c319d6 Mon Sep 17 00:00:00 2001 From: Damien Mascord Date: Thu, 30 Mar 2017 21:05:53 +1100 Subject: [PATCH 1/4] Fix issue when CONNECT packet is greater than 127 bytes using logic from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349213 --- umqtt.simple/umqtt/simple.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/umqtt.simple/umqtt/simple.py b/umqtt.simple/umqtt/simple.py index 76e630705..e3b5835f6 100644 --- a/umqtt.simple/umqtt/simple.py +++ b/umqtt.simple/umqtt/simple.py @@ -71,6 +71,17 @@ def connect(self, clean_session=True): msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 msg[9] |= self.lw_retain << 5 + # fix "remaining length field" for message size over 128 + rem_len = msg[1] + enc_bytearray = bytearray() + while (rem_len > 0): + enc_byte = rem_len % 128 + rem_len = int(rem_len / 128) + if (rem_len > 0): + enc_byte |= 128 + enc_bytearray.append(enc_byte) + msg[1:2] = enc_bytearray + self.sock.write(msg) #print(hex(len(msg)), hexlify(msg, ":")) self._send_str(self.client_id) From 1e166eed2e4fc1e32d90b021792c642ea9d84c3d Mon Sep 17 00:00:00 2001 From: Damien Mascord Date: Tue, 4 Apr 2017 15:22:30 +1000 Subject: [PATCH 2/4] Reduce reassignments --- umqtt.simple/umqtt/simple.py | 55 +++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/umqtt.simple/umqtt/simple.py b/umqtt.simple/umqtt/simple.py index e3b5835f6..3fdb3bfd1 100644 --- a/umqtt.simple/umqtt/simple.py +++ b/umqtt.simple/umqtt/simple.py @@ -13,7 +13,8 @@ def __init__(self, client_id, server, port=0, user=None, password=None, keepaliv port = 8883 if ssl else 1883 self.client_id = client_id self.sock = None - self.addr = socket.getaddrinfo(server, port)[0][-1] + self.server = server + self.port = port self.ssl = ssl self.ssl_params = ssl_params self.pid = 0 @@ -53,35 +54,45 @@ def set_last_will(self, topic, msg, retain=False, qos=0): def connect(self, clean_session=True): self.sock = socket.socket() + self.addr = socket.getaddrinfo(self.server, self.port)[0][-1] self.sock.connect(self.addr) if self.ssl: import ussl self.sock = ussl.wrap_socket(self.sock, **self.ssl_params) - msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0") - msg[1] = 10 + 2 + len(self.client_id) - msg[9] = clean_session << 1 + sz = 10 + 2 + len(self.client_id) if self.user is not None: - msg[1] += 2 + len(self.user) + 2 + len(self.pswd) - msg[9] |= 0xC0 + sz += 2 + len(self.user) + 2 + len(self.pswd) + if self.lw_topic: + sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) + + if sz < 128: + msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0") + elif sz < 16384: + msg = bytearray(b"\x10\0\0\0\x04MQTT\x04\x02\0\0") + elif sz < 2097152: + msg = bytearray(b"\x10\0\0\0\0\x04MQTT\x04\x02\0\0") + else: + msg = bytearray(b"\x10\0\0\0\0\0\x04MQTT\x04\x02\0\0") + + indexOfSize = 0 + while (sz > 0): + enc_byte = sz % 128 + sz = int(sz / 128) + if (sz > 0): + enc_byte |= 128 + indexOfSize += 1 + msg[indexOfSize] = enc_byte + + msg[8 + indexOfSize] = clean_session << 1 + if self.user is not None: + msg[8 + indexOfSize] |= 0xC0 if self.keepalive: assert self.keepalive < 65536 - msg[10] |= self.keepalive >> 8 - msg[11] |= self.keepalive & 0x00FF + msg[9 + indexOfSize] |= self.keepalive >> 8 + msg[10 + indexOfSize] |= self.keepalive & 0x00FF if self.lw_topic: - msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) - msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 - msg[9] |= self.lw_retain << 5 - # fix "remaining length field" for message size over 128 - rem_len = msg[1] - enc_bytearray = bytearray() - while (rem_len > 0): - enc_byte = rem_len % 128 - rem_len = int(rem_len / 128) - if (rem_len > 0): - enc_byte |= 128 - enc_bytearray.append(enc_byte) - msg[1:2] = enc_bytearray - + msg[8 + indexOfSize] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 + msg[8 + indexOfSize] |= self.lw_retain << 5 self.sock.write(msg) #print(hex(len(msg)), hexlify(msg, ":")) self._send_str(self.client_id) From 2bc4d209e69e6a183780ba5eab86ce5015e48401 Mon Sep 17 00:00:00 2001 From: Damien Mascord Date: Sun, 9 Apr 2017 18:52:32 +1000 Subject: [PATCH 3/4] Copy technique from publish(...) --- umqtt.simple/umqtt/simple.py | 55 +++++++++++++++--------------------- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/umqtt.simple/umqtt/simple.py b/umqtt.simple/umqtt/simple.py index 3fdb3bfd1..c50d1bd6d 100644 --- a/umqtt.simple/umqtt/simple.py +++ b/umqtt.simple/umqtt/simple.py @@ -13,8 +13,7 @@ def __init__(self, client_id, server, port=0, user=None, password=None, keepaliv port = 8883 if ssl else 1883 self.client_id = client_id self.sock = None - self.server = server - self.port = port + self.addr = socket.getaddrinfo(server, port)[0][-1] self.ssl = ssl self.ssl_params = ssl_params self.pid = 0 @@ -54,45 +53,35 @@ def set_last_will(self, topic, msg, retain=False, qos=0): def connect(self, clean_session=True): self.sock = socket.socket() - self.addr = socket.getaddrinfo(self.server, self.port)[0][-1] self.sock.connect(self.addr) if self.ssl: import ussl self.sock = ussl.wrap_socket(self.sock, **self.ssl_params) + premsg = bytearray(b"\x10\0\0\0\0\0") + msg = bytearray(b"\x04MQTT\x04\x02\0\0") + sz = 10 + 2 + len(self.client_id) - if self.user is not None: + msg[6] = clean_session << 1 + if user is not None: sz += 2 + len(self.user) + 2 + len(self.pswd) - if self.lw_topic: - sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) - - if sz < 128: - msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0") - elif sz < 16384: - msg = bytearray(b"\x10\0\0\0\x04MQTT\x04\x02\0\0") - elif sz < 2097152: - msg = bytearray(b"\x10\0\0\0\0\x04MQTT\x04\x02\0\0") - else: - msg = bytearray(b"\x10\0\0\0\0\0\x04MQTT\x04\x02\0\0") - - indexOfSize = 0 - while (sz > 0): - enc_byte = sz % 128 - sz = int(sz / 128) - if (sz > 0): - enc_byte |= 128 - indexOfSize += 1 - msg[indexOfSize] = enc_byte - - msg[8 + indexOfSize] = clean_session << 1 - if self.user is not None: - msg[8 + indexOfSize] |= 0xC0 + msg[6] |= 0xC0 if self.keepalive: assert self.keepalive < 65536 - msg[9 + indexOfSize] |= self.keepalive >> 8 - msg[10 + indexOfSize] |= self.keepalive & 0x00FF - if self.lw_topic: - msg[8 + indexOfSize] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 - msg[8 + indexOfSize] |= self.lw_retain << 5 + msg[7] |= self.keepalive >> 8 + msg[8] |= self.keepalive & 0x00FF + if lw_topic: + sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) + msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 + msg[6] |= self.lw_retain << 5 + + i = 1 + while sz > 0x7f: + premsg[i] = (sz & 0x7f) | 0x80 + sz >>= 7 + i += 1 + premsg[i] = sz + + self.sock.write(premsg, i+1) self.sock.write(msg) #print(hex(len(msg)), hexlify(msg, ":")) self._send_str(self.client_id) From 09d9cb9d64a04f355c0ce7b508070401429aa862 Mon Sep 17 00:00:00 2001 From: Damien Mascord Date: Sun, 9 Apr 2017 19:16:44 +1000 Subject: [PATCH 4/4] Fix bugs, connects fine to mosquitto (again) --- umqtt.simple/umqtt/simple.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/umqtt.simple/umqtt/simple.py b/umqtt.simple/umqtt/simple.py index c50d1bd6d..c5c70d710 100644 --- a/umqtt.simple/umqtt/simple.py +++ b/umqtt.simple/umqtt/simple.py @@ -62,14 +62,14 @@ def connect(self, clean_session=True): sz = 10 + 2 + len(self.client_id) msg[6] = clean_session << 1 - if user is not None: + if self.user is not None: sz += 2 + len(self.user) + 2 + len(self.pswd) msg[6] |= 0xC0 if self.keepalive: assert self.keepalive < 65536 msg[7] |= self.keepalive >> 8 msg[8] |= self.keepalive & 0x00FF - if lw_topic: + if self.lw_topic: sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 msg[6] |= self.lw_retain << 5 @@ -81,7 +81,7 @@ def connect(self, clean_session=True): i += 1 premsg[i] = sz - self.sock.write(premsg, i+1) + self.sock.write(premsg, i + 2) self.sock.write(msg) #print(hex(len(msg)), hexlify(msg, ":")) self._send_str(self.client_id)