24
24
import argparse
25
25
import datetime
26
26
import os
27
+ import random
27
28
import time
28
29
29
30
import jwt
30
31
import paho .mqtt .client as mqtt
31
32
32
33
34
+ # The initial backoff time after a disconnection occurs, in seconds.
35
+ minimum_backoff_time = 1
36
+
37
+ # The maximum backoff time before giving up, in seconds.
38
+ MAXIMUM_BACKOFF_TIME = 32
39
+
40
+ # Whether to wait with exponential backoff before publishing.
41
+ should_backoff = False
42
+
43
+
33
44
# [START iot_mqtt_jwt]
34
45
def create_jwt (project_id , private_key_file , algorithm ):
35
46
"""Creates a JWT (https://jwt.io) to establish an MQTT connection.
@@ -76,11 +87,22 @@ def on_connect(unused_client, unused_userdata, unused_flags, rc):
76
87
"""Callback for when a device connects."""
77
88
print ('on_connect' , mqtt .connack_string (rc ))
78
89
90
+ # After a successful connect, reset backoff time and stop backing off.
91
+ global should_backoff
92
+ global minimum_backoff_time
93
+ should_backoff = False
94
+ minimum_backoff_time = 1
95
+
79
96
80
97
def on_disconnect (unused_client , unused_userdata , rc ):
81
98
"""Paho callback for when a device disconnects."""
82
99
print ('on_disconnect' , error_str (rc ))
83
100
101
+ # Since a disconnect occurred, the next loop iteration will wait with
102
+ # exponential backoff.
103
+ global should_backoff
104
+ should_backoff = True
105
+
84
106
85
107
def on_publish (unused_client , unused_userdata , unused_mid ):
86
108
"""Paho callback when a message is sent to the broker."""
@@ -134,9 +156,6 @@ def get_client(
134
156
# Subscribe to the config topic.
135
157
client .subscribe (mqtt_config_topic , qos = 1 )
136
158
137
- # Start the network loop.
138
- client .loop_start ()
139
-
140
159
return client
141
160
# [END iot_mqtt_config]
142
161
@@ -199,6 +218,8 @@ def parse_command_line_args():
199
218
200
219
# [START iot_mqtt_run]
201
220
def main ():
221
+ global minimum_backoff_time
222
+
202
223
args = parse_command_line_args ()
203
224
204
225
# Publish to the events or state topic based on the flag.
@@ -215,6 +236,23 @@ def main():
215
236
216
237
# Publish num_messages mesages to the MQTT bridge once per second.
217
238
for i in range (1 , args .num_messages + 1 ):
239
+ # Process network events.
240
+ client .loop ()
241
+
242
+ # Wait if backoff is required.
243
+ if should_backoff :
244
+ # If backoff time is too large, give up.
245
+ if minimum_backoff_time > MAXIMUM_BACKOFF_TIME :
246
+ print ('Exceeded maximum backoff time. Giving up.' )
247
+ break
248
+
249
+ # Otherwise, wait and connect again.
250
+ delay = minimum_backoff_time + random .randint (0 , 1000 ) / 1000.0
251
+ print ('Waiting for {} before reconnecting.' .format (delay ))
252
+ time .sleep (delay )
253
+ minimum_backoff_time *= 2
254
+ client .connect (args .mqtt_bridge_hostname , args .mqtt_bridge_port )
255
+
218
256
payload = '{}/{}-payload-{}' .format (
219
257
args .registry_id , args .device_id , i )
220
258
print ('Publishing message {}/{}: \' {}\' ' .format (
@@ -223,7 +261,6 @@ def main():
223
261
seconds_since_issue = (datetime .datetime .utcnow () - jwt_iat ).seconds
224
262
if seconds_since_issue > 60 * jwt_exp_mins :
225
263
print ('Refreshing token after {}s' ).format (seconds_since_issue )
226
- client .loop_stop ()
227
264
jwt_iat = datetime .datetime .utcnow ()
228
265
client = get_client (
229
266
args .project_id , args .cloud_region ,
@@ -239,8 +276,6 @@ def main():
239
276
# Send events every second. State should not be updated as often
240
277
time .sleep (1 if args .message_type == 'event' else 5 )
241
278
242
- # End the network loop and finish.
243
- client .loop_stop ()
244
279
print ('Finished.' )
245
280
# [END iot_mqtt_run]
246
281
0 commit comments