@@ -61,55 +61,56 @@ def create_subscription_safely(subscriber, subscription_path):
61
61
data = 'x' * 10000
62
62
data = data .encode ('utf-8' )
63
63
# Initialize an empty dictionary to track messages.
64
- tracker = dict ()
65
- delivery_times = []
64
+ pub_tracker = dict ()
65
+ sub_tracker = dict ()
66
+ delivery_ts = []
66
67
num_messages = 10
67
68
68
69
def resolve_future_callback (future ):
69
70
"""Resolve the publish future and update `tracker` asynchronously."""
70
71
pubtime = time .time ()
71
72
message_id = future .result ()
72
- tracker .update ({message_id : { ' pubtime' : pubtime , 'subtime' : None } })
73
+ pub_tracker .update ({message_id : pubtime })
73
74
74
- def publish_messages (callback ):
75
- for i in range (num_messages ):
76
- future = publisher .publish (topic_path , data = data , index = str (i ))
77
- callback (future )
78
75
79
- # Publish messages.
80
- publish_messages (resolve_future_callback )
76
+ for i in range (num_messages ):
77
+ future = publisher .publish (topic_path , data = data , index = str (i ))
78
+ future .add_done_callback (resolve_future_callback )
79
+
81
80
print ('\n Published all messages.' )
82
81
83
82
def process_message_callback (message ):
84
83
message .ack ()
85
84
subtime = time .time ()
86
- tracker [ message .message_id ][ 'subtime' ] = subtime
85
+ sub_tracker . update ({ message .message_id : subtime })
87
86
print (message .attributes ['index' ])
88
87
89
88
# Receive messages asynchronously.
90
89
subscriber .subscribe (subscription_path , callback = process_message_callback )
91
90
print ('\n Listening for messages...' )
92
91
93
92
while True :
94
- # Extract delivery times from `tracker` and deplete it over time.
95
- for message_id in list (tracker ):
96
- if tracker [message_id ]['subtime' ] is not None :
97
- delivery_times .append (tracker [message_id ]['subtime' ] -
98
- tracker [message_id ]['pubtime' ])
99
- del tracker [message_id ]
93
+ # Populate delivery times.
94
+ keys_pub = set (pub_tracker .keys ())
95
+ keys_sub = set (sub_tracker .keys ())
96
+
97
+ for key in keys_pub & keys_sub :
98
+ delivery_ts .append (sub_tracker [key ] - pub_tracker [key ])
99
+ del pub_tracker [key ]
100
+ del sub_tracker [key ]
100
101
101
102
# Exit if all the delivery times have been accounted for.
102
- if len (tracker ) == 0 :
103
+ if len (pub_tracker ) == len ( sub_tracker ) == 0 and len ( delivery_ts ) > 0 :
103
104
print ('\n Delivery Statistics' )
104
105
print ('Average time: {:.6f}s' .format (
105
- sum (delivery_times )/ len (delivery_times )))
106
- print ('Best time: {:.6f}s' .format (min (delivery_times )))
106
+ sum (delivery_ts )/ len (delivery_ts )))
107
+ print ('Best time: {:.6f}s' .format (min (delivery_ts )))
107
108
break
108
109
else :
109
110
# Sleep the thread at 5Hz to save on resources.
110
111
time .sleep (1. / 5 )
111
112
# [END pubsub_end_to_end]
112
- return delivery_times
113
+ return delivery_ts
113
114
114
115
115
116
if __name__ == '__main__' :
0 commit comments