Skip to content

Commit b8f73b6

Browse files
gguussandrewsg
authored andcommitted
MQTT Exponential backoff and manager updates (GoogleCloudPlatform#1345)
1 parent 9240951 commit b8f73b6

File tree

4 files changed

+62
-10
lines changed

4 files changed

+62
-10
lines changed

iot/api-client/manager/README.rst

+13-3
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ To run this sample:
7777
[--project_id PROJECT_ID] [--registry_id REGISTRY_ID]
7878
[--rsa_certificate_file RSA_CERTIFICATE_FILE]
7979
[--service_account_json SERVICE_ACCOUNT_JSON]
80-
[--version VERSION]
81-
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config}
80+
[--version VERSION] [--member MEMBER] [--role ROLE]
81+
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions}
8282
...
8383
8484
Example of using the Google Cloud IoT Core device manager to administer
@@ -95,7 +95,7 @@ To run this sample:
9595
list
9696
9797
positional arguments:
98-
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config}
98+
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions}
9999
create-es256 Create a new device with the given id, using ES256 for
100100
authentication.
101101
create-registry Gets or creates a device registry.
@@ -107,6 +107,11 @@ To run this sample:
107107
delete-device Delete the device with the given id.
108108
delete-registry Deletes the specified registry.
109109
get Retrieve the device with the given id.
110+
get-config-versions
111+
Lists versions of a device config in descending order
112+
(newest first).
113+
get-iam-permissions
114+
Retrieves IAM permissions for the given registry.
110115
get-registry Retrieves a device registry.
111116
get-state Retrieve a device's state blobs.
112117
list List all devices in the registry.
@@ -117,6 +122,9 @@ To run this sample:
117122
device.
118123
set-config Patch the device to add an RSA256 public key to the
119124
device.
125+
set-iam-permissions
126+
Sets IAM permissions for the given registry to a
127+
single role/member.
120128
121129
optional arguments:
122130
-h, --help show this help message and exit
@@ -139,6 +147,8 @@ To run this sample:
139147
--service_account_json SERVICE_ACCOUNT_JSON
140148
Path to service account json file.
141149
--version VERSION Version number for setting device configuration.
150+
--member MEMBER Member used for IAM commands.
151+
--role ROLE Role used for IAM commands.
142152
143153
144154

iot/api-client/manager/manager.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def get_iam_permissions(
438438
def set_iam_permissions(
439439
service_account_json, project_id, cloud_region, registry_id, role,
440440
member):
441-
"""Retrieves IAM permissions for the given registry."""
441+
"""Sets IAM permissions for the given registry to a single role/member."""
442442
client = get_client(service_account_json)
443443

444444
registry_path = 'projects/{}/locations/{}/registries/{}'.format(

iot/api-client/mqtt_example/cloudiot_mqtt_example.py

+41-6
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,23 @@
2424
import argparse
2525
import datetime
2626
import os
27+
import random
2728
import time
2829

2930
import jwt
3031
import paho.mqtt.client as mqtt
3132

3233

34+
# The initial backoff time after a disconnection occurs, in seconds.
35+
minimum_backoff_time = 1
36+
37+
# The maximum backoff time before giving up, in seconds.
38+
MAXIMUM_BACKOFF_TIME = 32
39+
40+
# Whether to wait with exponential backoff before publishing.
41+
should_backoff = False
42+
43+
3344
# [START iot_mqtt_jwt]
3445
def create_jwt(project_id, private_key_file, algorithm):
3546
"""Creates a JWT (https://jwt.io) to establish an MQTT connection.
@@ -76,11 +87,22 @@ def on_connect(unused_client, unused_userdata, unused_flags, rc):
7687
"""Callback for when a device connects."""
7788
print('on_connect', mqtt.connack_string(rc))
7889

90+
# After a successful connect, reset backoff time and stop backing off.
91+
global should_backoff
92+
global minimum_backoff_time
93+
should_backoff = False
94+
minimum_backoff_time = 1
95+
7996

8097
def on_disconnect(unused_client, unused_userdata, rc):
8198
"""Paho callback for when a device disconnects."""
8299
print('on_disconnect', error_str(rc))
83100

101+
# Since a disconnect occurred, the next loop iteration will wait with
102+
# exponential backoff.
103+
global should_backoff
104+
should_backoff = True
105+
84106

85107
def on_publish(unused_client, unused_userdata, unused_mid):
86108
"""Paho callback when a message is sent to the broker."""
@@ -134,9 +156,6 @@ def get_client(
134156
# Subscribe to the config topic.
135157
client.subscribe(mqtt_config_topic, qos=1)
136158

137-
# Start the network loop.
138-
client.loop_start()
139-
140159
return client
141160
# [END iot_mqtt_config]
142161

@@ -199,6 +218,8 @@ def parse_command_line_args():
199218

200219
# [START iot_mqtt_run]
201220
def main():
221+
global minimum_backoff_time
222+
202223
args = parse_command_line_args()
203224

204225
# Publish to the events or state topic based on the flag.
@@ -215,6 +236,23 @@ def main():
215236

216237
# Publish num_messages mesages to the MQTT bridge once per second.
217238
for i in range(1, args.num_messages + 1):
239+
# Process network events.
240+
client.loop()
241+
242+
# Wait if backoff is required.
243+
if should_backoff:
244+
# If backoff time is too large, give up.
245+
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
246+
print('Exceeded maximum backoff time. Giving up.')
247+
break
248+
249+
# Otherwise, wait and connect again.
250+
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
251+
print('Waiting for {} before reconnecting.'.format(delay))
252+
time.sleep(delay)
253+
minimum_backoff_time *= 2
254+
client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)
255+
218256
payload = '{}/{}-payload-{}'.format(
219257
args.registry_id, args.device_id, i)
220258
print('Publishing message {}/{}: \'{}\''.format(
@@ -223,7 +261,6 @@ def main():
223261
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
224262
if seconds_since_issue > 60 * jwt_exp_mins:
225263
print('Refreshing token after {}s').format(seconds_since_issue)
226-
client.loop_stop()
227264
jwt_iat = datetime.datetime.utcnow()
228265
client = get_client(
229266
args.project_id, args.cloud_region,
@@ -239,8 +276,6 @@ def main():
239276
# Send events every second. State should not be updated as often
240277
time.sleep(1 if args.message_type == 'event' else 5)
241278

242-
# End the network loop and finish.
243-
client.loop_stop()
244279
print('Finished.')
245280
# [END iot_mqtt_run]
246281

iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py

+7
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def test_event(test_topic, capsys):
7474
rsa_private_path, 'RS256', ca_cert_path,
7575
'mqtt.googleapis.com', 443)
7676

77+
client.loop_start()
7778
client.publish(mqtt_topic, 'just test', qos=1)
7879
time.sleep(2)
7980
client.loop_stop()
@@ -115,7 +116,10 @@ def test_state(test_topic, capsys):
115116
rsa_private_path, 'RS256', ca_cert_path,
116117
'mqtt.googleapis.com', 443)
117118
client.publish(mqtt_topic, 'state test', qos=1)
119+
client.loop_start()
120+
118121
time.sleep(3)
122+
119123
client.loop_stop()
120124

121125
manager.get_state(
@@ -152,7 +156,10 @@ def test_config(test_topic, capsys):
152156
project_id, cloud_region, registry_id, device_id,
153157
rsa_private_path, 'RS256', ca_cert_path,
154158
'mqtt.googleapis.com', 443)
159+
client.loop_start()
160+
155161
time.sleep(5)
162+
156163
client.loop_stop()
157164

158165
manager.get_state(

0 commit comments

Comments
 (0)