From 8bed87fb2d877397c839929ef651f405030dda18 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 29 Aug 2018 16:03:51 -0700 Subject: [PATCH 1/3] Added sample for synchronous pull --- pubsub/cloud-client/subscriber.py | 31 ++++++++++++++++++++++++++ pubsub/cloud-client/subscriber_test.py | 15 +++++++++++++ 2 files changed, 46 insertions(+) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 51fa96b8675..13cd8f7261f 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -207,6 +207,29 @@ def callback(message): # [END pubsub_subscriber_flow_settings] +def receive_messages_synchronously(project, subscription_name): + """Pulling messages synchronously.""" + # [START pubsub_subscriber_sync_pull] + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, subscription_name) + + # Builds a pull request with a specified number of messages to return. + response = subscriber.pull( + subscription_path, + max_messages=3, + return_immediately=False) # Waits until three messages are available. + + ack_ids = [] + for received_message in response.received_messages: + print("Received: {}".format(received_message.message.data)) + ack_ids.append(received_message.ack_id) + + # Acknowledges the received messages so they will not be sent again. + subscriber.acknowledge(subscription_path, ack_ids) + # [END pubsub_subscriber_sync_pull] + + def listen_for_errors(project, subscription_name): """Receives messages and catches errors from a pull subscription.""" # [START pubsub_subscriber_error_listener] @@ -281,6 +304,11 @@ def callback(message): help=receive_messages_with_flow_control.__doc__) receive_with_flow_control_parser.add_argument('subscription_name') + receive_messages_synchronously_parser = subparsers.add_parser( + 'receive-synchronously', + help=receive_messages_synchronously.__doc__) + receive_messages_synchronously_parser.add_argument('subscription_name') + listen_for_errors_parser = subparsers.add_parser( 'listen_for_errors', help=listen_for_errors.__doc__) listen_for_errors_parser.add_argument('subscription_name') @@ -314,5 +342,8 @@ def callback(message): elif args.command == 'receive-flow-control': receive_messages_with_flow_control( args.project, args.subscription_name) + elif args.command == 'receive-synchronously': + receive_messages_synchronously( + args.project, args.subscription_name) elif args.command == 'listen_for_errors': listen_for_errors(args.project, args.subscription_name) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index adbc44e8425..728f971f2e7 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -188,3 +188,18 @@ def test_receive_with_flow_control( assert 'Listening' in out assert subscription in out assert 'Message 1' in out + + +def test_receive_synchronously( + publisher_client, topic, subscription, capsys): + _publish_messages(publisher_client, topic) + + with _make_sleep_patch(): + with pytest.raises(RuntimeError, match='sigil'): + subscriber.receive_messages_with_flow_control( + PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert 'Message 1' in out + assert 'Message 2' in out + assert 'Message 3' in out From b497784f09075a152684254a72f22e85d2dd0a5b Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 29 Aug 2018 16:13:56 -0700 Subject: [PATCH 2/3] Updated comment --- pubsub/cloud-client/subscriber.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 13cd8f7261f..6adf6f877e4 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -214,11 +214,13 @@ def receive_messages_synchronously(project, subscription_name): subscription_path = subscriber.subscription_path( project, subscription_name) - # Builds a pull request with a specified number of messages to return. + # Builds a pull request with a specific number of messages to return. + # `return_immediately` is set to False so that the system waits (for a + # bounded amount of time) until at lease one message is available. response = subscriber.pull( subscription_path, max_messages=3, - return_immediately=False) # Waits until three messages are available. + return_immediately=False) ack_ids = [] for received_message in response.received_messages: From 620a53f0242d8af44fea52587973676dceee026e Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 30 Aug 2018 11:40:41 -0700 Subject: [PATCH 3/3] Added comments --- pubsub/cloud-client/subscriber.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 6adf6f877e4..83e5700dd95 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -90,6 +90,8 @@ def create_push_subscription(project, def delete_subscription(project, subscription_name): """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_subscription] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -138,6 +140,8 @@ def receive_messages(project, subscription_name): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] # [START pubsub_quickstart_subscriber] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -160,6 +164,8 @@ def callback(message): def receive_messages_with_custom_attributes(project, subscription_name): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_sync_pull_custom_attributes] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -186,6 +192,8 @@ def callback(message): def receive_messages_with_flow_control(project, subscription_name): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -210,6 +218,8 @@ def callback(message): def receive_messages_synchronously(project, subscription_name): """Pulling messages synchronously.""" # [START pubsub_subscriber_sync_pull] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -235,6 +245,8 @@ def receive_messages_synchronously(project, subscription_name): def listen_for_errors(project, subscription_name): """Receives messages and catches errors from a pull subscription.""" # [START pubsub_subscriber_error_listener] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name)