18
18
import argparse
19
19
20
20
21
- def end_to_end (project_id , topic_name , subscription_name , num_messages ):
22
- # [START pubsub_quickstart_end2end_basic ]
21
+ def end_to_end (project_id , topic_name , subscription_name ):
22
+ # [START pubsub_end_to_end ]
23
23
import time
24
24
25
25
from google .api_core .exceptions import NotFound
26
26
from google .cloud import pubsub_v1
27
27
28
- # TODO project_id = "Your Google Cloud Project ID"
28
+
29
+ def create_topic_safely (publisher , topic_path ):
30
+ try :
31
+ publisher .delete_topic (topic_path )
32
+ except NotFound :
33
+ pass
34
+
35
+ topic = publisher .create_topic (topic_path )
36
+ print ('Topic created: \" {}\" ' .format (topic .name ))
37
+
38
+ def create_subscription_safely (subscriber , subscription_path ):
39
+ try :
40
+ subscriber .delete_subscription (subscription_path )
41
+ except NotFound :
42
+ pass
43
+
44
+ subscription = subscriber .create_subscription (
45
+ subscription_path , topic_path )
46
+ print ('Subscription created: \" {}\" ' .format (subscription .name ))
47
+
48
+ # TODO project_id = "Your Pub/Sub project id"
29
49
# TODO topic_name = "Your Pub/Sub topic name"
30
50
# TODO subscription_name = "Your Pub/Sub subscription name"
31
- # TODO num_messages = number of messages to test end-to-end
32
51
33
- # Instantiate a publisher and a subscriber client
34
52
publisher = pubsub_v1 .PublisherClient ()
35
53
subscriber = pubsub_v1 .SubscriberClient ()
36
-
37
- # The `topic_path` method creates a fully qualified identifier
38
- # in the form `projects/{project_id}/topics/{topic_name}`
39
54
topic_path = publisher .topic_path (project_id , topic_name )
40
-
41
- # The `subscription_path` method creates a fully qualified identifier
42
- # in the form `projects/{project_id}/subscriptions/{subscription_name}`
43
55
subscription_path = subscriber .subscription_path (
44
56
project_id , subscription_name )
45
57
46
- # Create a topic.
47
- try :
48
- publisher .delete_topic (topic_path )
49
- except NotFound :
50
- pass
51
- finally :
52
- topic = publisher .create_topic (topic_path )
53
- print ('Topic created: \" {}\" ' .format (topic .name ))
54
-
55
- # Create a subscription.
56
- try :
57
- subscriber .delete_subscription (subscription_path )
58
- except NotFound :
59
- pass
60
- finally :
61
- subscription = subscriber .create_subscription (
62
- subscription_path , topic_path )
63
- print ('Subscription created: \" {}\" ' .format (subscription .name ))
58
+ create_topic_safely (publisher , topic_path )
59
+ create_subscription_safely (subscriber , subscription_path )
64
60
65
- # `data` is roughly 10 Kb.
66
- data = 'x' * 9600
67
61
# `data` must be a bytestring.
62
+ data = 'x' * 10000
68
63
data = data .encode ('utf-8' )
69
64
# Initialize an empty dictionary to track messages.
70
65
tracker = dict ()
71
- pubsub_time = 0.0
66
+ delivery_times = []
67
+ num_messages = 10
72
68
73
69
# Publish messages.
74
70
for i in range (num_messages ):
75
- # When we publish a message, the client returns a future.
76
71
future = publisher .publish (topic_path , data = data )
77
-
78
- tracker .update ({i : {'index' : i ,
79
- 'pubtime' : time .time (),
80
- 'subtime' : None }})
81
-
82
- # `future.result()` blocks and returns a unique message ID per message.
83
- tracker [future .result ()] = tracker .pop (i )
72
+ tracker .update ({future : {'pubtime' : time .time (),
73
+ 'subtime' : None }})
74
+ # Update the old key, which is the publish future, with the unique
75
+ # identifier per message returned by the `result()` method.
76
+ tracker [future .result ()] = tracker .pop (future )
84
77
85
78
print ('\n Published all messages.' )
86
79
87
- def callback (message ):
88
- # Unacknowledged messages will be sent again.
80
+ def process_message_callback (message ):
89
81
message .ack ()
90
- # Update message `subtime`.
91
82
tracker [message .message_id ]['subtime' ] = time .time ()
92
83
93
- # Receive messages. The subscriber is nonblocking.
94
- subscriber .subscribe (subscription_path , callback = callback )
84
+ # Receive messages. ` subscriber` is nonblocking.
85
+ subscriber .subscribe (subscription_path , callback = process_message_callback )
95
86
96
87
print ('\n Listening for messages...' )
97
88
98
89
while True :
99
- # Deplete messages in `tracker` every time we have complete info
100
- # of a message.
101
- for msg_id in list (tracker ):
102
- pubtime = tracker [msg_id ]['pubtime' ]
103
- subtime = tracker [msg_id ]['subtime' ]
104
- if subtime is not None :
105
- pubsub_time += subtime - pubtime
106
- del tracker [msg_id ]
107
-
108
- # Exit if `tracker` is empty i.e. all the messages' publish-subscribe
109
- # time have been accounted for.
90
+ # Extract delivery times from `tracker` and deplete it over time.
91
+ for message_id in list (tracker ):
92
+ if tracker [message_id ]['subtime' ] is not None :
93
+ delivery_times .append (tracker [message_id ]['subtime' ] -
94
+ tracker [message_id ]['pubtime' ])
95
+ del tracker [message_id ]
96
+
97
+ # Exit if all the delivery times have been accounted for.
110
98
if len (tracker ) == 0 :
111
- print ('\n Total publish to subscribe time for {} messages: \
112
- {:.6f}s.' .format (num_messages , pubsub_time ))
99
+ print ('\n Delivery Statistics' )
100
+ print ('Average time: {:.6f}s' .format (
101
+ sum (delivery_times )/ num_messages ))
102
+ print ('Best time: {:.6f}s' .format (min (delivery_times )))
113
103
break
114
104
else :
115
105
print ('Messages countdown: {}' .format (len (tracker )))
116
106
# Sleep the thread at 5Hz to save on resources.
117
107
time .sleep (1. / 5 )
118
- # [END pubsub_quickstart_end2end_basic]
108
+ # [END pubsub_end_to_end]
109
+ return delivery_times
119
110
120
111
121
- def end_to_end_standard (project_id , topic_name , subscription_name ,
122
- num_messages ):
123
- # [START pubsub_quickstart_end2end_standard]
112
+ def end_to_end_standard (project_id , topic_name , subscription_name ):
113
+ # [START pubsub_end_to_end_standard]
124
114
pass
125
- # [END pubsub_quickstart_end2end_standard ]
115
+ # [END pubsub_end_to_end_standard ]
126
116
127
117
128
118
if __name__ == '__main__' :
@@ -139,22 +129,17 @@ def end_to_end_standard(project_id, topic_name, subscription_name,
139
129
basic_parser .add_argument ('topic_name' , help = 'Your topic name' )
140
130
basic_parser .add_argument ('subscription_name' ,
141
131
help = 'Your subscription name' )
142
- basic_parser .add_argument ('num_messages' , type = int ,
143
- help = 'Number of test messages' )
144
132
145
133
standard_parser = subparsers .add_parser ('standard' ,
146
134
help = end_to_end_standard .__doc__ )
147
135
standard_parser .add_argument ('topic_name' , help = 'Your topic name' )
148
136
standard_parser .add_argument ('subscription_name' ,
149
137
help = 'Your subscription name' )
150
- standard_parser .add_argument ('num_messages' , type = int ,
151
- help = 'Number of test messages' )
152
138
153
139
args = parser .parse_args ()
154
140
155
141
if args .command == 'basic' :
156
- end_to_end (args .project_id , args .topic_name , args .subscription_name ,
157
- args .num_messages )
142
+ end_to_end (args .project_id , args .topic_name , args .subscription_name )
158
143
if args .command == 'standard' :
159
144
end_to_end_standard (args .project_id , args .topic_name ,
160
- args .subscription_name , args . num_messages )
145
+ args .subscription_name )
0 commit comments