Skip to content

Commit c9a7df7

Browse files
authored
Pubsub getting started (GoogleCloudPlatform#684)
1 parent bf99206 commit c9a7df7

File tree

8 files changed

+400
-113
lines changed

8 files changed

+400
-113
lines changed

pubsub/cloud-client/README.md

+33-17
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,40 @@ For more samples, see the samples in
1414
## Quickstart
1515

1616
#### Setup
17-
- Install [Maven](http://maven.apache.org/) <p>
18-
- Install the [Google Cloud SDK](https://cloud.google.com/sdk/) and run :
19-
20-
21-
gcloud config set project [YOUR PROJECT ID]
22-
17+
- Install [Maven](http://maven.apache.org/).
18+
- [Enable](https://console.cloud.google.com/apis/api/pubsub.googleapis.com/overview) Pub/Sub API.
19+
- Set up [authentication](https://cloud.google.com/docs/authentication/getting-started).
2320

21+
#### Build
2422
- Build your project with:
25-
26-
27-
mvn clean package -DskipTests
23+
```
24+
mvn clean package -DskipTests
25+
```
26+
27+
#### Create a new topic
28+
```
29+
mvn exec:java -Dexec.mainClass=com.example.pubsub.CreateTopicExample -Dexec.args=my-topic
30+
```
31+
32+
#### Create a subscription
33+
```
34+
mvn exec:java -Dexec.mainClass=com.example.pubsub.CreatePullSubscriptionExample -Dexec.args="my-topic-id my-sub"
35+
```
36+
37+
#### Publish messages
38+
```
39+
mvn exec:java -Dexec.mainClass=com.example.pubsub.PublisherExample -Dexec.args=my-topic
40+
```
41+
Publishes 5 messages to the topic `my-topic`.
42+
43+
#### Receive messages
44+
```
45+
mvn exec:java -Dexec.mainClass=com.example.pubsub.SubscriberExample -Dexec.args=my-sub
46+
```
47+
Subscriber will continue to listen on the topic for 5 minutes and print out message id and data as messages are received.
2848

2949
#### Testing
30-
31-
Run the tests with Maven.
32-
33-
mvn clean verify
34-
35-
#### Creating a new topic (using the quickstart sample)
36-
37-
mvn exec:java -Dexec.mainClass=com.example.pubsub.QuickstartSample
50+
Run the test with Maven.
51+
```
52+
mvn verify
53+
```

pubsub/cloud-client/pom.xml

+1-12
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,16 @@
3131
<maven.compiler.target>1.8</maven.compiler.target>
3232
<maven.compiler.source>1.8</maven.compiler.source>
3333
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
34-
<pubsub.version>0.21.1-beta</pubsub.version>
3534
</properties>
3635

3736
<dependencies>
3837
<dependency>
3938
<groupId>com.google.cloud</groupId>
4039
<artifactId>google-cloud-pubsub</artifactId>
41-
<version>${pubsub.version}</version>
40+
<version>0.21.0-beta</version>
4241
</dependency>
4342

4443
<!-- Test dependencies -->
45-
<dependency>
46-
<groupId>com.google.guava</groupId>
47-
<artifactId>guava</artifactId>
48-
<version>20.0</version>
49-
</dependency>
50-
<dependency>
51-
<groupId>com.google.auth</groupId>
52-
<artifactId>google-auth-library-oauth2-http</artifactId>
53-
<version>0.7.1</version>
54-
</dependency>
5544
<dependency>
5645
<groupId>junit</groupId>
5746
<artifactId>junit</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright 2017, Google, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package com.example.pubsub;
18+
19+
// [START pubsub_quickstart_create_subscription]
20+
import com.google.cloud.ServiceOptions;
21+
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
22+
import com.google.pubsub.v1.PushConfig;
23+
import com.google.pubsub.v1.Subscription;
24+
import com.google.pubsub.v1.SubscriptionName;
25+
import com.google.pubsub.v1.TopicName;
26+
27+
public class CreatePullSubscriptionExample {
28+
29+
/**
30+
* Create a pull subscription.
31+
*
32+
* @param args topic subscriptionId
33+
* @throws Exception exception thrown if operation is unsuccessful
34+
*/
35+
public static void main(String... args) throws Exception {
36+
37+
// Your Google Cloud Platform project ID
38+
String projectId = ServiceOptions.getDefaultProjectId();
39+
40+
// Your topic ID, eg. "my-topic"
41+
String topicId = args[0];
42+
43+
// Your subscription ID eg. "my-sub"
44+
String subscriptionId = args[1];
45+
46+
TopicName topicName = TopicName.create(projectId, topicId);
47+
48+
// Create a new subscription
49+
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
50+
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
51+
// create a pull subscription with default acknowledgement deadline (= 10 seconds)
52+
Subscription subscription =
53+
subscriptionAdminClient.createSubscription(
54+
subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
55+
}
56+
57+
System.out.printf(
58+
"Subscription %s:%s created.\n",
59+
subscriptionName.getProject(), subscriptionName.getSubscription());
60+
}
61+
}
62+
// [END pubsub_quickstart_create_subscription]

pubsub/cloud-client/src/main/java/com/example/pubsub/QuickstartSample.java renamed to pubsub/cloud-client/src/main/java/com/example/pubsub/CreateTopicExample.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,27 @@
1616

1717
package com.example.pubsub;
1818

19-
// [START pubsub_quickstart]
19+
// [START pubsub_quickstart_create_topic]
2020
// Imports the Google Cloud client library
21-
2221
import com.google.cloud.ServiceOptions;
2322
import com.google.cloud.pubsub.v1.TopicAdminClient;
2423
import com.google.pubsub.v1.TopicName;
2524

26-
public class QuickstartSample {
25+
public class CreateTopicExample {
2726

27+
/**
28+
* Create a topic.
29+
*
30+
* @param args topicId
31+
* @throws Exception exception thrown if operation is unsuccessful
32+
*/
2833
public static void main(String... args) throws Exception {
2934

3035
// Your Google Cloud Platform project ID
3136
String projectId = ServiceOptions.getDefaultProjectId();
3237

33-
// Your topic ID
34-
String topicId = "my-new-topic";
38+
// Your topic ID, eg. "my-topic"
39+
String topicId = args[0];
3540

3641
// Create a new topic
3742
TopicName topic = TopicName.create(projectId, topicId);
@@ -42,4 +47,4 @@ public static void main(String... args) throws Exception {
4247
System.out.printf("Topic %s:%s created.\n", topic.getProject(), topic.getTopic());
4348
}
4449
}
45-
// [END pubsub_quickstart]
50+
// [END pubsub_quickstart_create_topic]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2017 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.pubsub;
18+
// [START pubsub_quickstart_publisher]
19+
20+
import com.google.api.core.ApiFuture;
21+
import com.google.api.core.ApiFutures;
22+
import com.google.cloud.ServiceOptions;
23+
import com.google.cloud.pubsub.v1.Publisher;
24+
import com.google.protobuf.ByteString;
25+
import com.google.pubsub.v1.PubsubMessage;
26+
import com.google.pubsub.v1.TopicName;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
30+
public class PublisherExample {
31+
32+
static final int MESSAGE_COUNT = 5;
33+
34+
// use the default project id
35+
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
36+
37+
//schedule a message to be published, messages are automatically batched
38+
private static ApiFuture<String> publishMessage(Publisher publisher, String message)
39+
throws Exception {
40+
// convert message to bytes
41+
ByteString data = ByteString.copyFromUtf8(message);
42+
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
43+
return publisher.publish(pubsubMessage);
44+
}
45+
46+
/** Publish messages to a topic. */
47+
public static void main(String... args) throws Exception {
48+
// topic id, eg. "my-topic"
49+
String topicId = args[0];
50+
TopicName topicName = TopicName.create(PROJECT_ID, topicId);
51+
Publisher publisher = null;
52+
List<ApiFuture<String>> apiFutures = new ArrayList<>();
53+
try {
54+
// Create a publisher instance with default settings bound to the topic
55+
publisher = Publisher.defaultBuilder(topicName).build();
56+
for (int i = 0; i < MESSAGE_COUNT; i++) {
57+
String message = "message-" + i;
58+
ApiFuture<String> messageId = publishMessage(publisher, message);
59+
apiFutures.add(messageId);
60+
}
61+
} finally {
62+
// Once published, returns server-assigned message ids (unique within the topic)
63+
List<String> messageIds = ApiFutures.allAsList(apiFutures).get();
64+
for (String messageId : messageIds) {
65+
System.out.println(messageId);
66+
}
67+
if (publisher != null) {
68+
// When finished with the publisher, shutdown to free up resources.
69+
publisher.shutdown();
70+
}
71+
}
72+
}
73+
}
74+
// [END pubsub_quickstart_quickstart]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2017 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.pubsub;
18+
19+
// [START pubsub_quickstart_subscriber]
20+
21+
import com.google.cloud.ServiceOptions;
22+
import com.google.cloud.pubsub.v1.AckReplyConsumer;
23+
import com.google.cloud.pubsub.v1.MessageReceiver;
24+
import com.google.cloud.pubsub.v1.Subscriber;
25+
import com.google.pubsub.v1.PubsubMessage;
26+
import com.google.pubsub.v1.SubscriptionName;
27+
import java.util.concurrent.BlockingQueue;
28+
import java.util.concurrent.LinkedBlockingDeque;
29+
30+
public class SubscriberExample {
31+
32+
// use the default project id
33+
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
34+
35+
private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
36+
37+
static class MessageReceiverExample implements MessageReceiver {
38+
39+
@Override
40+
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
41+
messages.offer(message);
42+
consumer.ack();
43+
}
44+
}
45+
46+
/** Receive messages over a subscription. */
47+
public static void main(String... args) throws Exception {
48+
// set subscriber id, eg. my-sub
49+
String subscriptionId = args[0];
50+
SubscriptionName subscriptionName = SubscriptionName.create(PROJECT_ID, subscriptionId);
51+
Subscriber subscriber = null;
52+
try {
53+
// create a subscriber bound to the asynchronous message receiver
54+
subscriber =
55+
Subscriber.defaultBuilder(subscriptionName, new MessageReceiverExample()).build();
56+
subscriber.startAsync().awaitRunning();
57+
// Continue to listen to messages
58+
while (true) {
59+
PubsubMessage message = messages.take();
60+
System.out.println("Message Id: " + message.getMessageId());
61+
System.out.println("Data: " + message.getData().toStringUtf8());
62+
}
63+
} finally {
64+
if (subscriber != null) {
65+
subscriber.stopAsync();
66+
}
67+
}
68+
}
69+
}
70+
// [END pubsub_quickstart_subscriber]

0 commit comments

Comments
 (0)