Skip to content

Commit 3883be0

Browse files
anguillanneufdavidcavazos
authored andcommitted
Pub/Sub: new Pub/Sub DataFlow doc sample (GoogleCloudPlatform#1541)
Add new DataFlow code sample that reads from Pub/Sub and writes to GCS
1 parent 329f576 commit 3883be0

File tree

3 files changed

+426
-0
lines changed

3 files changed

+426
-0
lines changed

pubsub/streaming-analytics/README.md

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# Stream Cloud Pub/Sub with Cloud Dataflow
2+
3+
Sample(s) showing how to use [Google Cloud Pub/Sub] with [Google Cloud Dataflow].
4+
5+
## Before you begin
6+
7+
1. Install the [Cloud SDK].
8+
9+
1. [Create a new project].
10+
11+
1. [Enable billing].
12+
13+
1. [Enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,pubsub,cloudresourcemanager.googleapis.com,cloudscheduler.googleapis.com,appengine.googleapis.com): Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Cloud Resource Manager, and App Engine.
14+
15+
1. Setup the Cloud SDK to your GCP project.
16+
17+
```bash
18+
gcloud init
19+
```
20+
21+
1. [Create a service account key] as a JSON file.
22+
For more information, see [Creating and managing service accounts].
23+
24+
* From the **Service account** list, select **New service account**.
25+
* In the **Service account name** field, enter a name.
26+
* From the **Role** list, select **Project > Owner**.
27+
28+
> **Note**: The **Role** field authorizes your service account to access resources.
29+
> You can view and change this field later by using the [GCP Console IAM page].
30+
> If you are developing a production app, specify more granular permissions than **Project > Owner**.
31+
> For more information, see [Granting roles to service accounts].
32+
33+
* Click **Create**. A JSON file that contains your key downloads to your computer.
34+
35+
1. Set your `GOOGLE_APPLICATION_CREDENTIALS` environment variable to point to your service account key file.
36+
37+
```bash
38+
export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/credentials.json
39+
```
40+
41+
1. Create a Cloud Storage bucket.
42+
43+
```bash
44+
BUCKET_NAME=your-gcs-bucket
45+
PROJECT_NAME=$(gcloud config get-value project)
46+
47+
gsutil mb gs://$BUCKET_NAME
48+
```
49+
50+
1. Start a [Google Cloud Scheduler] job that publishes one message to a [Google Cloud Pub/Sub] topic every minute. This will create an [App Engine] app if one has never been created on the project.
51+
52+
```bash
53+
# Create a Pub/Sub topic.
54+
gcloud pubsub topics create cron-topic
55+
56+
# Create a Cloud Scheduler job
57+
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
58+
--topic=cron-topic --message-body="Hello!"
59+
60+
# Run the job.
61+
gcloud scheduler jobs run publisher-job
62+
```
63+
64+
## Setup
65+
66+
The following instructions will help you prepare your development environment.
67+
68+
1. Download and install the [Java Development Kit (JDK)].
69+
Verify that the [JAVA_HOME] environment variable is set and points to your JDK installation.
70+
71+
1. Download and install [Apache Maven] by following the [Maven installation guide] for your specific operating system.
72+
73+
1. Clone the `java-docs-samples` repository.
74+
75+
```bash
76+
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
77+
```
78+
79+
1. Navigate to the sample code directory.
80+
81+
```bash
82+
cd java-docs-samples/pubsub/streaming-analytics
83+
```
84+
85+
## Streaming Analytics
86+
87+
### Google Cloud Pub/Sub to Google Cloud Storage
88+
89+
* [PubSubToGCS.java](src/main/java/com/examples/pubsub/streaming/PubSubToGCS.java)
90+
91+
The following example will run a streaming pipeline. It will read messages from a Pub/Sub topic, then window them into fixed-sized intervals, and write one file per window into a GCS location.
92+
93+
+ `--project`: sets the Google Cloud project ID to run the pipeline on
94+
+ `--inputTopic`: sets the input Pub/Sub topic to read messages from
95+
+ `--output`: sets the output GCS path prefix to write files to
96+
+ `--runner [optional]`: specifies the runner to run the pipeline, defaults to `DirectRunner`
97+
+ `--windowSize [optional]`: specifies the window size in minutes, defaults to 1
98+
99+
```bash
100+
mvn compile exec:java \
101+
-Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGCS \
102+
-Dexec.cleanupDaemonThreads=false \
103+
-Dexec.args="\
104+
--project=$PROJECT_NAME \
105+
--inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
106+
--output=gs://$BUCKET_NAME/samples/output \
107+
--runner=DataflowRunner \
108+
--windowSize=2"
109+
```
110+
111+
After the job has been submitted, you can check its status in the [GCP Console Dataflow page].
112+
113+
You can also check the output to your GCS bucket using the command line below or in the [GCP Console Storage page]. You may need to wait a few minutes for the files to appear.
114+
115+
```bash
116+
gsutil ls gs://$BUCKET_NAME/samples/
117+
```
118+
119+
## Cleanup
120+
121+
1. Delete the [Google Cloud Scheduler] job.
122+
```bash
123+
gcloud scheduler jobs delete publisher-job
124+
```
125+
126+
1. `Ctrl+C` to stop the program in your terminal. Note that this does not actually stop the job if you use `DataflowRunner`. Skip 3 if you use the `DirectRunner`.
127+
128+
1. Stop the Dataflow job in [GCP Console Dataflow page]. Cancel the job instead of draining it. This may take some minutes.
129+
130+
1. Delete the topic. [Google Cloud Dataflow] will automatically delete the subscription associated with the streaming pipeline when the job is canceled.
131+
```bash
132+
gcloud pubsub topics delete cron-topic
133+
```
134+
135+
1. Lastly, to avoid incurring charges to your GCP account for the resources created in this tutorial:
136+
137+
```bash
138+
# Delete only the files created by this sample.
139+
gsutil -m rm -rf "gs://$BUCKET_NAME/samples/output*"
140+
141+
# [optional] Remove the Cloud Storage bucket.
142+
gsutil rb gs://$BUCKET_NAME
143+
```
144+
145+
[Apache Beam]: https://beam.apache.org/
146+
[Google Cloud Pub/Sub]: https://cloud.google.com/pubsub/docs/
147+
[Google Cloud Dataflow]: https://cloud.google.com/dataflow/docs/
148+
[Google Cloud Scheduler]: https://cloud.google.com/scheduler/docs/
149+
[App Engine]: https://cloud.google.com/appengine/docs/
150+
151+
[Cloud SDK]: https://cloud.google.com/sdk/docs/
152+
[Create a new project]: https://console.cloud.google.com/projectcreate
153+
[Enable billing]: https://cloud.google.com/billing/docs/how-to/modify-project
154+
[Create a service account key]: https://console.cloud.google.com/apis/credentials/serviceaccountkey
155+
[Creating and managing service accounts]: https://cloud.google.com/iam/docs/creating-managing-service-accounts
156+
[GCP Console IAM page]: https://console.cloud.google.com/iam-admin/iam
157+
[Granting roles to service accounts]: https://cloud.google.com/iam/docs/granting-roles-to-service-accounts
158+
159+
[Java Development Kit (JDK)]: https://www.oracle.com/technetwork/java/javase/downloads/index.html
160+
[JAVA_HOME]: https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/envvars001.html
161+
[Apache Maven]: http://maven.apache.org/download.cgi
162+
[Maven installation guide]: http://maven.apache.org/install.html
163+
164+
[GCP Console create Dataflow job page]: https://console.cloud.google.com/dataflow/createjob
165+
[GCP Console Dataflow page]: https://console.cloud.google.com/dataflow
166+
[GCP Console Storage page]: https://console.cloud.google.com/storage

pubsub/streaming-analytics/pom.xml

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright 2019 Google LLC
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0"
18+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<groupId>com.example</groupId>
23+
<artifactId>pubsub-streaming</artifactId>
24+
<version>1.0</version>
25+
26+
<properties>
27+
<maven.compiler.source>1.8</maven.compiler.source>
28+
<maven.compiler.target>1.8</maven.compiler.target>
29+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
30+
31+
<beam.version>2.14.0</beam.version>
32+
33+
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
34+
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
35+
<maven-jar-plugin.version>3.1.2</maven-jar-plugin.version>
36+
<maven-shade-plugin.version>3.2.1</maven-shade-plugin.version>
37+
<slf4j.version>1.7.28</slf4j.version>
38+
</properties>
39+
40+
<repositories>
41+
<repository>
42+
<id>apache.snapshots</id>
43+
<name>Apache Development Snapshot Repository</name>
44+
<url>https://repository.apache.org/content/repositories/snapshots/</url>
45+
<releases>
46+
<enabled>false</enabled>
47+
</releases>
48+
<snapshots>
49+
<enabled>true</enabled>
50+
</snapshots>
51+
</repository>
52+
</repositories>
53+
54+
<build>
55+
<plugins>
56+
<plugin>
57+
<groupId>org.apache.maven.plugins</groupId>
58+
<artifactId>maven-compiler-plugin</artifactId>
59+
<version>${maven-compiler-plugin.version}</version>
60+
</plugin>
61+
62+
<plugin>
63+
<groupId>org.apache.maven.plugins</groupId>
64+
<artifactId>maven-jar-plugin</artifactId>
65+
<version>${maven-jar-plugin.version}</version>
66+
<configuration>
67+
<archive>
68+
<manifest>
69+
<addClasspath>true</addClasspath>
70+
<classpathPrefix>lib/</classpathPrefix>
71+
<mainClass>com.examples.pubsub.streaming.PubSubToGCS</mainClass>
72+
</manifest>
73+
</archive>
74+
</configuration>
75+
</plugin>
76+
77+
<!--
78+
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
79+
that require this for job submission to a cluster.
80+
-->
81+
<plugin>
82+
<groupId>org.apache.maven.plugins</groupId>
83+
<artifactId>maven-shade-plugin</artifactId>
84+
<version>${maven-shade-plugin.version}</version>
85+
<executions>
86+
<execution>
87+
<phase>package</phase>
88+
<goals>
89+
<goal>shade</goal>
90+
</goals>
91+
<configuration>
92+
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
93+
<filters>
94+
<filter>
95+
<artifact>*:*</artifact>
96+
<excludes>
97+
<exclude>META-INF/LICENSE</exclude>
98+
<exclude>META-INF/*.SF</exclude>
99+
<exclude>META-INF/*.DSA</exclude>
100+
<exclude>META-INF/*.RSA</exclude>
101+
</excludes>
102+
</filter>
103+
</filters>
104+
<transformers>
105+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
106+
</transformers>
107+
</configuration>
108+
</execution>
109+
</executions>
110+
</plugin>
111+
</plugins>
112+
113+
<pluginManagement>
114+
<plugins>
115+
<plugin>
116+
<groupId>org.codehaus.mojo</groupId>
117+
<artifactId>exec-maven-plugin</artifactId>
118+
<version>${maven-exec-plugin.version}</version>
119+
<configuration>
120+
<cleanupDaemonThreads>false</cleanupDaemonThreads>
121+
</configuration>
122+
</plugin>
123+
</plugins>
124+
</pluginManagement>
125+
</build>
126+
127+
<dependencies>
128+
<dependency>
129+
<groupId>org.apache.beam</groupId>
130+
<artifactId>beam-sdks-java-core</artifactId>
131+
<version>${beam.version}</version>
132+
</dependency>
133+
134+
<!--
135+
By default, the starter project has a dependency on the Beam DirectRunner
136+
to enable development and testing of pipelines. To run on another of the
137+
Beam runners, add its module to this pom.xml according to the
138+
runner-specific setup instructions on the Beam website:
139+
http://beam.apache.org/documentation/#runners
140+
-->
141+
<dependency>
142+
<groupId>org.apache.beam</groupId>
143+
<artifactId>beam-runners-direct-java</artifactId>
144+
<version>${beam.version}</version>
145+
<scope>runtime</scope>
146+
</dependency>
147+
148+
<dependency>
149+
<groupId>org.apache.beam</groupId>
150+
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
151+
<version>${beam.version}</version>
152+
<scope>runtime</scope>
153+
</dependency>
154+
155+
<dependency>
156+
<groupId>org.apache.beam</groupId>
157+
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
158+
<version>${beam.version}</version>
159+
</dependency>
160+
161+
<dependency>
162+
<groupId>org.apache.beam</groupId>
163+
<artifactId>beam-sdks-java-maven-archetypes-examples</artifactId>
164+
<version>${beam.version}</version>
165+
</dependency>
166+
167+
<!-- slf4j API frontend binding with JUL backend -->
168+
<dependency>
169+
<groupId>org.slf4j</groupId>
170+
<artifactId>slf4j-api</artifactId>
171+
<version>${slf4j.version}</version>
172+
</dependency>
173+
174+
<dependency>
175+
<groupId>org.slf4j</groupId>
176+
<artifactId>slf4j-jdk14</artifactId>
177+
<version>${slf4j.version}</version>
178+
</dependency>
179+
</dependencies>
180+
</project>

0 commit comments

Comments
 (0)