Skip to content

Commit c7c2196

Browse files
committed
lib/cmwx1: Add CMWX1ZZABZ-093 driver.
Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
1 parent aca36f7 commit c7c2196

File tree

3 files changed

+386
-0
lines changed

3 files changed

+386
-0
lines changed

lib/cmwx1/__init__.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# This file is part of the cmwx1 module.
2+
#
3+
# Copyright (c) 2013-2021 Ibrahim Abdelkader <iabdalkader@openmv.io>
4+
# Copyright (c) 2013-2021 Sebastian Romero <s.romero@arduino.cc>
5+
# Copyright (c) 2021 Arduino SA
6+
#
7+
# This work is licensed under the MIT license, see the file LICENSE for details.
8+
#
9+
# CMWX1ZZABZ-093 driver for Arduino boards.
10+
# Note this module runs a stock or a custom Arduino firmware.
11+
12+
13+
from .cmwx1 import * # noqa

lib/cmwx1/cmwx1.py

+367
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
# This file is part of the cmwx1 module.
2+
#
3+
# Copyright (c) 2013-2021 Ibrahim Abdelkader <iabdalkader@openmv.io>
4+
# Copyright (c) 2013-2021 Sebastian Romero <s.romero@arduino.cc>
5+
# Copyright (c) 2021 Arduino SA
6+
#
7+
# This work is licensed under the MIT license, see the file LICENSE for details.
8+
#
9+
# CMWX1ZZABZ-093 driver for Arduino boards.
10+
# Note this module runs a stock or a custom Arduino firmware.
11+
12+
from time import sleep_ms
13+
from time import ticks_ms
14+
from machine import UART
15+
from machine import Pin
16+
from micropython import const
17+
18+
19+
class LoraError(Exception):
20+
pass
21+
22+
23+
class LoraErrorTimeout(LoraError):
24+
pass
25+
26+
27+
class LoraErrorParam(LoraError):
28+
pass
29+
30+
31+
class LoraErrorBusy(LoraError):
32+
pass
33+
34+
35+
class LoraErrorOverflow(LoraError):
36+
pass
37+
38+
39+
class LoraErrorNoNetwork(LoraError):
40+
pass
41+
42+
43+
class LoraErrorRX(LoraError):
44+
pass
45+
46+
47+
class LoraErrorUnknown(LoraError):
48+
pass
49+
50+
51+
class Lora:
52+
MODE_ABP = const(0)
53+
MODE_OTAA = const(1)
54+
55+
CLASS_A = const("A")
56+
CLASS_B = const("B")
57+
CLASS_C = const("C")
58+
59+
BAND_AS923 = const(0)
60+
BAND_AU915 = const(1)
61+
BAND_EU868 = const(5)
62+
BAND_KR920 = const(6)
63+
BAND_IN865 = const(7)
64+
BAND_US915 = const(8)
65+
BAND_US915_HYBRID = const(9)
66+
67+
RF_MODE_RFO = const(0)
68+
RF_MODE_PABOOST = const(1)
69+
70+
LoraErrors = {
71+
"": LoraErrorTimeout, # empty buffer
72+
"+ERR": LoraError,
73+
"+ERR_PARAM": LoraErrorParam,
74+
"+ERR_BUSY": LoraErrorBusy,
75+
"+ERR_PARAM_OVERFLOW": LoraErrorOverflow,
76+
"+ERR_NO_NETWORK": LoraErrorNoNetwork,
77+
"+ERR_RX": LoraErrorRX,
78+
"+ERR_UNKNOWN": LoraErrorUnknown,
79+
}
80+
81+
def __init__(
82+
self,
83+
uart=None,
84+
rst_pin=None,
85+
boot_pin=None,
86+
band=BAND_EU868, # noqa
87+
poll_ms=300000,
88+
debug=False,
89+
):
90+
self.debug = debug
91+
self.uart = uart
92+
self.rst_pin = rst_pin
93+
self.boot_pin = boot_pin
94+
self.band = band
95+
self.poll_ms = poll_ms
96+
self.last_poll_ms = ticks_ms()
97+
98+
self.init_modem()
99+
100+
# Reset module
101+
self.boot_pin.value(0)
102+
self.rst_pin.value(1)
103+
sleep_ms(200)
104+
self.rst_pin.value(0)
105+
sleep_ms(200)
106+
self.rst_pin.value(1)
107+
108+
# Restart module
109+
self.restart()
110+
111+
def init_modem(self):
112+
# Arduino Portenta H7 Pin Configuration
113+
if not self.rst_pin:
114+
self.rst_pin = Pin("PC6", Pin.OUT_PP, Pin.PULL_UP, value=1)
115+
if not self.boot_pin:
116+
self.boot_pin = Pin("PG7", Pin.OUT_PP, Pin.PULL_DOWN, value=0)
117+
if not self.uart:
118+
self.uart = UART(8, 19200)
119+
# self.uart = UART(1, 19200) # Use external module
120+
self.uart.init(
121+
19200, bits=8, parity=None, stop=2, timeout=250, timeout_char=100
122+
)
123+
124+
def debug_print(self, data):
125+
if self.debug:
126+
print(data)
127+
128+
def is_arduino_firmware(self):
129+
return "ARD-078" in self.fw_version
130+
131+
def configure_class(self, _class):
132+
self.send_command("+CLASS=", _class)
133+
134+
def configure_band(self, band):
135+
self.send_command("+BAND=", band)
136+
if band == self.BAND_EU868 and self.is_arduino_firmware():
137+
self.send_command("+DUTYCYCLE=", 1)
138+
return True
139+
140+
def set_baudrate(self, baudrate):
141+
self.send_command("+UART=", baudrate)
142+
143+
def set_autobaud(self, timeout=10000):
144+
start = ticks_ms()
145+
while (ticks_ms() - start) < timeout:
146+
if self.send_command("", timeout=200, raise_error=False) == "+OK":
147+
sleep_ms(200)
148+
while self.uart.any():
149+
self.uart.readchar()
150+
return True
151+
return False
152+
153+
def get_fw_version(self):
154+
dev = self.send_command("+DEV?")
155+
fw_ver = self.send_command("+VER?")
156+
return dev + " " + fw_ver
157+
158+
def get_device_eui(self):
159+
return self.send_command("+DEVEUI?")
160+
161+
def factory_default(self):
162+
self.send_command("+FACNEW")
163+
164+
def restart(self):
165+
if self.set_autobaud() is False:
166+
raise (LoraError("Failed to set autobaud"))
167+
168+
# Different delimiter as REBOOT response EVENT doesn't end with '\r'.
169+
if self.send_command("+REBOOT", delimiter="+EVENT=0,0", timeout=10000) != "+EVENT=0,0":
170+
raise (LoraError("Failed to reboot module"))
171+
sleep_ms(1000)
172+
self.fw_version = self.get_fw_version()
173+
self.configure_band(self.band)
174+
175+
def set_rf_power(self, mode, power):
176+
self.send_command("+RFPOWER=", mode, ",", power)
177+
178+
def set_port(self, port):
179+
self.send_command("+PORT=", port)
180+
181+
def set_public_network(self, enable):
182+
self.send_command("+NWK=", int(enable))
183+
184+
def sleep(self, enable):
185+
self.send_command("+SLEEP=", int(enable))
186+
187+
def format(self, hexMode):
188+
self.send_command("+DFORMAT=", int(hexMode))
189+
190+
def set_datarate(self, dr):
191+
self.send_command("+DR=", dr)
192+
193+
def get_datarate(self):
194+
return int(self.send_command("+DR?"))
195+
196+
def set_adr(self, adr):
197+
self.send_command("+ADR=", int(adr))
198+
199+
def get_adr(self):
200+
return int(self.send_command("+ADR?"))
201+
202+
def get_devaddr(self):
203+
return self.send_command("+DEVADDR?")
204+
205+
def get_nwk_skey(self):
206+
return self.send_command("+NWKSKEY?")
207+
208+
def get_appskey(self):
209+
return self.send_command("+APPSKEY?")
210+
211+
def get_rx2dr(self):
212+
return int(self.send_command("+RX2DR?"))
213+
214+
def set_rx2dr(self, dr):
215+
self.send_command("+RX2DR=", dr)
216+
217+
def get_ex2freq(self):
218+
return int(self.send_command("+RX2FQ?"))
219+
220+
def set_rx2freq(self, freq):
221+
self.send_command("+RX2FQ=", freq)
222+
223+
def set_fcu(self, fcu):
224+
self.send_command("+FCU=", fcu)
225+
226+
def get_fcu(self):
227+
return int(self.send_command("+FCU?"))
228+
229+
def set_fcd(self, fcd):
230+
self.send_command("+FCD=", fcd)
231+
232+
def get_fcd(self):
233+
return int(self.send_command("+FCD?"))
234+
235+
def change_mode(self, mode):
236+
self.send_command("+MODE=", mode)
237+
238+
def join(self, timeout_ms):
239+
if self.send_command("+JOIN", timeout=timeout_ms) != "+ACK":
240+
return False
241+
response = self.receive("\r", timeout=timeout_ms)
242+
return response == "+EVENT=1,1"
243+
244+
def get_join_status(self):
245+
return int(self.send_command("+NJS?")) == 1
246+
247+
def get_max_size(self):
248+
if self.is_arduino_firmware():
249+
return 64
250+
return int(self.send_command("+MSIZE?", timeout=2000))
251+
252+
def poll(self):
253+
if (ticks_ms() - self.last_poll_ms) > self.poll_ms:
254+
self.last_poll_ms = ticks_ms()
255+
# Triggers a fake write
256+
self.send_data("\0", True)
257+
258+
def send_data(self, buff, confirmed=True):
259+
max_len = self.get_max_size()
260+
if len(buff) > max_len:
261+
raise (LoraError("Packet exceeds max length"))
262+
if self.send_command("+CTX " if confirmed else "+UTX ", len(buff), data=buff) != "+OK":
263+
return False
264+
if confirmed:
265+
response = self.receive("\r", timeout=10000)
266+
return response == "+ACK"
267+
return True
268+
269+
def receive_data(self, timeout=1000):
270+
response = self.receive("\r", timeout=timeout)
271+
if response.startswith("+RECV"):
272+
params = response.split("=")[1].split(",")
273+
port = params[0]
274+
length = int(params[1])
275+
dummy_data_length = 2 # Data starts with \n\n sequence
276+
data = self.receive(max_bytes=length + dummy_data_length, timeout=timeout)[
277+
dummy_data_length:
278+
]
279+
return {"port": port, "data": data}
280+
281+
def receive(self, delimiter=None, max_bytes=None, timeout=1000):
282+
buf = []
283+
start = ticks_ms()
284+
while (ticks_ms() - start) < timeout:
285+
while self.uart.any():
286+
buf += chr(self.uart.readchar())
287+
288+
if max_bytes and len(buf) == max_bytes:
289+
data = "".join(buf)
290+
self.debug_print(data)
291+
return data
292+
if len(buf) and delimiter is not None:
293+
data = "".join(buf)
294+
trimmed = data[0:-1] if data[-1] == "\r" else data
295+
296+
if isinstance(delimiter, str) and len(delimiter) == 1 and buf[-1] == delimiter:
297+
self.debug_print(trimmed)
298+
return trimmed
299+
if isinstance(delimiter, str) and trimmed == delimiter:
300+
self.debug_print(trimmed)
301+
return trimmed
302+
if isinstance(delimiter, list) and trimmed in delimiter:
303+
self.debug_print(trimmed)
304+
return trimmed
305+
306+
data = "".join(buf)
307+
self.debug_print(data)
308+
return data[0:-1] if len(data) != 0 and data[-1] == "\r" else data
309+
310+
def available(self):
311+
return self.uart.any()
312+
313+
def join_OTAA(self, appEui, appKey, devEui=None, timeout=60000):
314+
self.change_mode(self.MODE_OTAA)
315+
self.send_command("+APPEUI=", appEui)
316+
self.send_command("+APPKEY=", appKey)
317+
if devEui:
318+
self.send_command("+DEVEUI=", devEui)
319+
network_joined = self.join(timeout)
320+
# This delay was in MKRWAN.h
321+
# delay(1000);
322+
return network_joined
323+
324+
def join_ABP(self, nwkId, devAddr, nwkSKey, appSKey, timeout=60000):
325+
self.change_mode(self.MODE_ABP)
326+
# Commented in MKRWAN.h
327+
# self.send_command("+IDNWK=", nwkId)
328+
self.send_command("+DEVADDR=", devAddr)
329+
self.send_command("+NWKSKEY=", nwkSKey)
330+
self.send_command("+APPSKEY=", appSKey)
331+
self.join(timeout)
332+
return self.get_join_status()
333+
334+
def handle_error(self, command, data):
335+
if not data.startswith("+ERR") and data != "":
336+
return
337+
if data in self.LoraErrors:
338+
raise (self.LoraErrors[data]('Command "%s" has failed!' % command))
339+
raise (
340+
LoraError(
341+
'Command: "%s" failed with unknown status: "%s"' % (command, data)
342+
)
343+
)
344+
345+
def send_command(
346+
self, cmd, *args, delimiter="\r", data=None, timeout=1000, raise_error=True
347+
):
348+
# Write command and args
349+
uart_cmd = "AT" + cmd + "".join([str(x) for x in args]) + "\r"
350+
self.debug_print(uart_cmd)
351+
self.uart.write(uart_cmd)
352+
353+
# Attach raw data
354+
if data:
355+
self.debug_print(data)
356+
self.uart.write(data)
357+
358+
# Read status and value (if any)
359+
response = self.receive(delimiter, timeout=timeout)
360+
361+
# Error handling
362+
if raise_error:
363+
self.handle_error(cmd, response)
364+
365+
if cmd.endswith("?"):
366+
return response.split("=")[1]
367+
return response

lib/cmwx1/manifest.py

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
metadata(
2+
description="CMWX1ZZABZ-093 driver for Arduino boards.",
3+
version="0.0.1",
4+
)
5+
6+
module("cmwx1.py")

0 commit comments

Comments
 (0)