Skip to content

Commit cd09d24

Browse files
committed
Some fixes
1 parent b6b3197 commit cd09d24

File tree

2 files changed

+58
-72
lines changed

2 files changed

+58
-72
lines changed

pubsub/cloud-client/quickstart.py

Lines changed: 55 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,111 +18,101 @@
1818
import argparse
1919

2020

21-
def end_to_end(project_id, topic_name, subscription_name, num_messages):
22-
# [START pubsub_quickstart_end2end_basic]
21+
def end_to_end(project_id, topic_name, subscription_name):
22+
# [START pubsub_end_to_end]
2323
import time
2424

2525
from google.api_core.exceptions import NotFound
2626
from google.cloud import pubsub_v1
2727

28-
# TODO project_id = "Your Google Cloud Project ID"
28+
29+
def create_topic_safely(publisher, topic_path):
30+
try:
31+
publisher.delete_topic(topic_path)
32+
except NotFound:
33+
pass
34+
35+
topic = publisher.create_topic(topic_path)
36+
print('Topic created: \"{}\"'.format(topic.name))
37+
38+
def create_subscription_safely(subscriber, subscription_path):
39+
try:
40+
subscriber.delete_subscription(subscription_path)
41+
except NotFound:
42+
pass
43+
44+
subscription = subscriber.create_subscription(
45+
subscription_path, topic_path)
46+
print('Subscription created: \"{}\"'.format(subscription.name))
47+
48+
# TODO project_id = "Your Pub/Sub project id"
2949
# TODO topic_name = "Your Pub/Sub topic name"
3050
# TODO subscription_name = "Your Pub/Sub subscription name"
31-
# TODO num_messages = number of messages to test end-to-end
3251

33-
# Instantiate a publisher and a subscriber client
3452
publisher = pubsub_v1.PublisherClient()
3553
subscriber = pubsub_v1.SubscriberClient()
36-
37-
# The `topic_path` method creates a fully qualified identifier
38-
# in the form `projects/{project_id}/topics/{topic_name}`
3954
topic_path = publisher.topic_path(project_id, topic_name)
40-
41-
# The `subscription_path` method creates a fully qualified identifier
42-
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
4355
subscription_path = subscriber.subscription_path(
4456
project_id, subscription_name)
4557

46-
# Create a topic.
47-
try:
48-
publisher.delete_topic(topic_path)
49-
except NotFound:
50-
pass
51-
finally:
52-
topic = publisher.create_topic(topic_path)
53-
print('Topic created: \"{}\"'.format(topic.name))
54-
55-
# Create a subscription.
56-
try:
57-
subscriber.delete_subscription(subscription_path)
58-
except NotFound:
59-
pass
60-
finally:
61-
subscription = subscriber.create_subscription(
62-
subscription_path, topic_path)
63-
print('Subscription created: \"{}\"'.format(subscription.name))
58+
create_topic_safely(publisher, topic_path)
59+
create_subscription_safely(subscriber, subscription_path)
6460

65-
# `data` is roughly 10 Kb.
66-
data = 'x' * 9600
6761
# `data` must be a bytestring.
62+
data = 'x' * 10000
6863
data = data.encode('utf-8')
6964
# Initialize an empty dictionary to track messages.
7065
tracker = dict()
71-
pubsub_time = 0.0
66+
delivery_times = []
67+
num_messages = 10
7268

7369
# Publish messages.
7470
for i in range(num_messages):
75-
# When we publish a message, the client returns a future.
7671
future = publisher.publish(topic_path, data=data)
77-
78-
tracker.update({i: {'index': i,
79-
'pubtime': time.time(),
80-
'subtime': None}})
81-
82-
# `future.result()` blocks and returns a unique message ID per message.
83-
tracker[future.result()] = tracker.pop(i)
72+
tracker.update({future: {'pubtime': time.time(),
73+
'subtime': None}})
74+
# Update the old key, which is the publish future, with the unique
75+
# identifier per message returned by the `result()` method.
76+
tracker[future.result()] = tracker.pop(future)
8477

8578
print('\nPublished all messages.')
8679

87-
def callback(message):
88-
# Unacknowledged messages will be sent again.
80+
def process_message_callback(message):
8981
message.ack()
90-
# Update message `subtime`.
9182
tracker[message.message_id]['subtime'] = time.time()
9283

93-
# Receive messages. The subscriber is nonblocking.
94-
subscriber.subscribe(subscription_path, callback=callback)
84+
# Receive messages. `subscriber` is nonblocking.
85+
subscriber.subscribe(subscription_path, callback=process_message_callback)
9586

9687
print('\nListening for messages...')
9788

9889
while True:
99-
# Deplete messages in `tracker` every time we have complete info
100-
# of a message.
101-
for msg_id in list(tracker):
102-
pubtime = tracker[msg_id]['pubtime']
103-
subtime = tracker[msg_id]['subtime']
104-
if subtime is not None:
105-
pubsub_time += subtime - pubtime
106-
del tracker[msg_id]
107-
108-
# Exit if `tracker` is empty i.e. all the messages' publish-subscribe
109-
# time have been accounted for.
90+
# Extract delivery times from `tracker` and deplete it over time.
91+
for message_id in list(tracker):
92+
if tracker[message_id]['subtime'] is not None:
93+
delivery_times.append(tracker[message_id]['subtime'] -
94+
tracker[message_id]['pubtime'])
95+
del tracker[message_id]
96+
97+
# Exit if all the delivery times have been accounted for.
11098
if len(tracker) == 0:
111-
print('\nTotal publish to subscribe time for {} messages: \
112-
{:.6f}s.'.format(num_messages, pubsub_time))
99+
print('\nDelivery Statistics')
100+
print('Average time: {:.6f}s'.format(
101+
sum(delivery_times)/num_messages))
102+
print('Best time: {:.6f}s'.format(min(delivery_times)))
113103
break
114104
else:
115105
print('Messages countdown: {}'.format(len(tracker)))
116106
# Sleep the thread at 5Hz to save on resources.
117107
time.sleep(1./5)
118-
# [END pubsub_quickstart_end2end_basic]
108+
# [END pubsub_end_to_end]
109+
return delivery_times
119110

120111

121-
def end_to_end_standard(project_id, topic_name, subscription_name,
122-
num_messages):
123-
# [START pubsub_quickstart_end2end_standard]
112+
def end_to_end_standard(project_id, topic_name, subscription_name):
113+
# [START pubsub_end_to_end_standard]
124114
pass
125-
# [END pubsub_quickstart_end2end_standard]
115+
# [END pubsub_end_to_end_standard]
126116

127117

128118
if __name__ == '__main__':
@@ -139,22 +129,17 @@ def end_to_end_standard(project_id, topic_name, subscription_name,
139129
basic_parser.add_argument('topic_name', help='Your topic name')
140130
basic_parser.add_argument('subscription_name',
141131
help='Your subscription name')
142-
basic_parser.add_argument('num_messages', type=int,
143-
help='Number of test messages')
144132

145133
standard_parser = subparsers.add_parser('standard',
146134
help=end_to_end_standard.__doc__)
147135
standard_parser.add_argument('topic_name', help='Your topic name')
148136
standard_parser.add_argument('subscription_name',
149137
help='Your subscription name')
150-
standard_parser.add_argument('num_messages', type=int,
151-
help='Number of test messages')
152138

153139
args = parser.parse_args()
154140

155141
if args.command == 'basic':
156-
end_to_end(args.project_id, args.topic_name, args.subscription_name,
157-
args.num_messages)
142+
end_to_end(args.project_id, args.topic_name, args.subscription_name)
158143
if args.command == 'standard':
159144
end_to_end_standard(args.project_id, args.topic_name,
160-
args.subscription_name, args.num_messages)
145+
args.subscription_name)

pubsub/cloud-client/quickstart_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ def subscription(subscriber_client, topic):
6363

6464
def test_end_to_end(topic, subscription, capsys):
6565

66-
quickstart.end_to_end(PROJECT, topic, subscription, N)
66+
delivery_times = quickstart.end_to_end(PROJECT, topic, subscription)
6767
out, _ = capsys.readouterr()
6868

69-
assert "Total publish to subscribe time for {} messages".format(N) in out
69+
assert "Delivery Statistics" in out
70+
assert sum(delivery_times)/len(delivery_times) < 3.0

0 commit comments

Comments
 (0)