Skip to content

Commit c0c8c31

Browse files
committed
update callback func
1 parent 85b57c6 commit c0c8c31

File tree

1 file changed

+21
-20
lines changed

1 file changed

+21
-20
lines changed

pubsub/cloud-client/quickstart.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -61,55 +61,56 @@ def create_subscription_safely(subscriber, subscription_path):
6161
data = 'x' * 10000
6262
data = data.encode('utf-8')
6363
# Initialize an empty dictionary to track messages.
64-
tracker = dict()
65-
delivery_times = []
64+
pub_tracker = dict()
65+
sub_tracker = dict()
66+
delivery_ts = []
6667
num_messages = 10
6768

6869
def resolve_future_callback(future):
6970
"""Resolve the publish future and update `tracker` asynchronously."""
7071
pubtime = time.time()
7172
message_id = future.result()
72-
tracker.update({message_id: {'pubtime': pubtime, 'subtime': None}})
73+
pub_tracker.update({message_id: pubtime})
7374

74-
def publish_messages(callback):
75-
for i in range(num_messages):
76-
future = publisher.publish(topic_path, data=data, index=str(i))
77-
callback(future)
7875

79-
# Publish messages.
80-
publish_messages(resolve_future_callback)
76+
for i in range(num_messages):
77+
future = publisher.publish(topic_path, data=data, index=str(i))
78+
future.add_done_callback(resolve_future_callback)
79+
8180
print('\nPublished all messages.')
8281

8382
def process_message_callback(message):
8483
message.ack()
8584
subtime = time.time()
86-
tracker[message.message_id]['subtime'] = subtime
85+
sub_tracker.update({message.message_id: subtime})
8786
print(message.attributes['index'])
8887

8988
# Receive messages asynchronously.
9089
subscriber.subscribe(subscription_path, callback=process_message_callback)
9190
print('\nListening for messages...')
9291

9392
while True:
94-
# Extract delivery times from `tracker` and deplete it over time.
95-
for message_id in list(tracker):
96-
if tracker[message_id]['subtime'] is not None:
97-
delivery_times.append(tracker[message_id]['subtime'] -
98-
tracker[message_id]['pubtime'])
99-
del tracker[message_id]
93+
# Populate delivery times.
94+
keys_pub = set(pub_tracker.keys())
95+
keys_sub = set(sub_tracker.keys())
96+
97+
for key in keys_pub & keys_sub:
98+
delivery_ts.append(sub_tracker[key] - pub_tracker[key])
99+
del pub_tracker[key]
100+
del sub_tracker[key]
100101

101102
# Exit if all the delivery times have been accounted for.
102-
if len(tracker) == 0:
103+
if len(pub_tracker) == len(sub_tracker) == 0 and len(delivery_ts) > 0:
103104
print('\nDelivery Statistics')
104105
print('Average time: {:.6f}s'.format(
105-
sum(delivery_times)/len(delivery_times)))
106-
print('Best time: {:.6f}s'.format(min(delivery_times)))
106+
sum(delivery_ts)/len(delivery_ts)))
107+
print('Best time: {:.6f}s'.format(min(delivery_ts)))
107108
break
108109
else:
109110
# Sleep the thread at 5Hz to save on resources.
110111
time.sleep(1./5)
111112
# [END pubsub_end_to_end]
112-
return delivery_times
113+
return delivery_ts
113114

114115

115116
if __name__ == '__main__':

0 commit comments

Comments
 (0)