Skip to content

Commit fa7afe3

Browse files
Pub/Sub: add shared-config and fix lint (GoogleCloudPlatform#3441)
* add shared-config and fix lint * fix consecutive capital letters and new line for import
1 parent 8ca4bf2 commit fa7afe3

File tree

3 files changed

+28
-21
lines changed

3 files changed

+28
-21
lines changed

pubsub/streaming-analytics/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ The following instructions will help you prepare your development environment.
8686

8787
### Google Cloud Pub/Sub to Google Cloud Storage
8888

89-
* [PubSubToGCS.java](src/main/java/com/examples/pubsub/streaming/PubSubToGCS.java)
89+
* [PubSubToGCS.java](src/main/java/com/examples/pubsub/streaming/PubSubToGcs.java)
9090

9191
The following example will run a streaming pipeline. It will read messages from a Pub/Sub topic, then window them into fixed-sized intervals, and write one file per window into a GCS location.
9292

pubsub/streaming-analytics/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323
<artifactId>pubsub-streaming</artifactId>
2424
<version>1.0</version>
2525

26+
<parent>
27+
<groupId>com.google.cloud.samples</groupId>
28+
<artifactId>shared-configuration</artifactId>
29+
<version>1.0.17</version>
30+
</parent>
31+
2632
<properties>
2733
<maven.compiler.source>1.8</maven.compiler.source>
2834
<maven.compiler.target>1.8</maven.compiler.target>

pubsub/streaming-analytics/src/main/java/com/examples/pubsub/streaming/PubSubToGCS.java renamed to pubsub/streaming-analytics/src/main/java/com/examples/pubsub/streaming/PubSubToGcs.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,68 +13,69 @@
1313
// limitations under the License.
1414

1515
package com.examples.pubsub.streaming;
16+
1617
// [START pubsub_to_gcs]
18+
19+
import java.io.IOException;
1720
import org.apache.beam.examples.common.WriteOneFilePerWindow;
21+
import org.apache.beam.sdk.Pipeline;
1822
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
1923
import org.apache.beam.sdk.options.Default;
2024
import org.apache.beam.sdk.options.Description;
2125
import org.apache.beam.sdk.options.PipelineOptions;
2226
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2327
import org.apache.beam.sdk.options.StreamingOptions;
2428
import org.apache.beam.sdk.options.Validation.Required;
25-
import org.apache.beam.sdk.Pipeline;
2629
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
2730
import org.apache.beam.sdk.transforms.windowing.Window;
2831
import org.joda.time.Duration;
2932

30-
import java.io.IOException;
31-
32-
33-
public class PubSubToGCS {
33+
public class PubSubToGcs {
3434
/*
35-
* Define your own configuration options. Add your own arguments to be processed
36-
* by the command-line parser, and specify default values for them.
37-
*/
38-
public interface PubSubToGCSOptions extends PipelineOptions, StreamingOptions {
35+
* Define your own configuration options. Add your own arguments to be processed
36+
* by the command-line parser, and specify default values for them.
37+
*/
38+
public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
3939
@Description("The Cloud Pub/Sub topic to read from.")
4040
@Required
4141
String getInputTopic();
42+
4243
void setInputTopic(String value);
4344

4445
@Description("Output file's window size in number of minutes.")
4546
@Default.Integer(1)
4647
Integer getWindowSize();
48+
4749
void setWindowSize(Integer value);
4850

4951
@Description("Path of the output file including its filename prefix.")
5052
@Required
5153
String getOutput();
54+
5255
void setOutput(String value);
5356
}
5457

5558
public static void main(String[] args) throws IOException {
5659
// The maximum number of shards when writing output.
5760
int numShards = 1;
5861

59-
PubSubToGCSOptions options = PipelineOptionsFactory
60-
.fromArgs(args)
61-
.withValidation()
62-
.as(PubSubToGCSOptions.class);
62+
PubSubToGcsOptions options =
63+
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);
6364

6465
options.setStreaming(true);
6566

6667
Pipeline pipeline = Pipeline.create(options);
6768

6869
pipeline
69-
// 1) Read string messages from a Pub/Sub topic.
70-
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
71-
// 2) Group the messages into fixed-sized minute intervals.
72-
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
73-
// 3) Write one file to GCS for every window of messages.
74-
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
70+
// 1) Read string messages from a Pub/Sub topic.
71+
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
72+
// 2) Group the messages into fixed-sized minute intervals.
73+
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
74+
// 3) Write one file to GCS for every window of messages.
75+
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
7576

7677
// Execute the pipeline and wait until it finishes running.
7778
pipeline.run().waitUntilFinish();
7879
}
7980
}
80-
// [END pubsub_to_gcs]
81+
// [END pubsub_to_gcs]

0 commit comments

Comments
 (0)