13
13
// limitations under the License.
14
14
15
15
package com .examples .pubsub .streaming ;
16
+
16
17
// [START pubsub_to_gcs]
18
+
19
+ import java .io .IOException ;
17
20
import org .apache .beam .examples .common .WriteOneFilePerWindow ;
21
+ import org .apache .beam .sdk .Pipeline ;
18
22
import org .apache .beam .sdk .io .gcp .pubsub .PubsubIO ;
19
23
import org .apache .beam .sdk .options .Default ;
20
24
import org .apache .beam .sdk .options .Description ;
21
25
import org .apache .beam .sdk .options .PipelineOptions ;
22
26
import org .apache .beam .sdk .options .PipelineOptionsFactory ;
23
27
import org .apache .beam .sdk .options .StreamingOptions ;
24
28
import org .apache .beam .sdk .options .Validation .Required ;
25
- import org .apache .beam .sdk .Pipeline ;
26
29
import org .apache .beam .sdk .transforms .windowing .FixedWindows ;
27
30
import org .apache .beam .sdk .transforms .windowing .Window ;
28
31
import org .joda .time .Duration ;
29
32
30
- import java .io .IOException ;
31
-
32
-
33
- public class PubSubToGCS {
33
+ public class PubSubToGcs {
34
34
/*
35
- * Define your own configuration options. Add your own arguments to be processed
36
- * by the command-line parser, and specify default values for them.
37
- */
38
- public interface PubSubToGCSOptions extends PipelineOptions , StreamingOptions {
35
+ * Define your own configuration options. Add your own arguments to be processed
36
+ * by the command-line parser, and specify default values for them.
37
+ */
38
+ public interface PubSubToGcsOptions extends PipelineOptions , StreamingOptions {
39
39
@ Description ("The Cloud Pub/Sub topic to read from." )
40
40
@ Required
41
41
String getInputTopic ();
42
+
42
43
void setInputTopic (String value );
43
44
44
45
@ Description ("Output file's window size in number of minutes." )
45
46
@ Default .Integer (1 )
46
47
Integer getWindowSize ();
48
+
47
49
void setWindowSize (Integer value );
48
50
49
51
@ Description ("Path of the output file including its filename prefix." )
50
52
@ Required
51
53
String getOutput ();
54
+
52
55
void setOutput (String value );
53
56
}
54
57
55
58
public static void main (String [] args ) throws IOException {
56
59
// The maximum number of shards when writing output.
57
60
int numShards = 1 ;
58
61
59
- PubSubToGCSOptions options = PipelineOptionsFactory
60
- .fromArgs (args )
61
- .withValidation ()
62
- .as (PubSubToGCSOptions .class );
62
+ PubSubToGcsOptions options =
63
+ PipelineOptionsFactory .fromArgs (args ).withValidation ().as (PubSubToGcsOptions .class );
63
64
64
65
options .setStreaming (true );
65
66
66
67
Pipeline pipeline = Pipeline .create (options );
67
68
68
69
pipeline
69
- // 1) Read string messages from a Pub/Sub topic.
70
- .apply ("Read PubSub Messages" , PubsubIO .readStrings ().fromTopic (options .getInputTopic ()))
71
- // 2) Group the messages into fixed-sized minute intervals.
72
- .apply (Window .into (FixedWindows .of (Duration .standardMinutes (options .getWindowSize ()))))
73
- // 3) Write one file to GCS for every window of messages.
74
- .apply ("Write Files to GCS" , new WriteOneFilePerWindow (options .getOutput (), numShards ));
70
+ // 1) Read string messages from a Pub/Sub topic.
71
+ .apply ("Read PubSub Messages" , PubsubIO .readStrings ().fromTopic (options .getInputTopic ()))
72
+ // 2) Group the messages into fixed-sized minute intervals.
73
+ .apply (Window .into (FixedWindows .of (Duration .standardMinutes (options .getWindowSize ()))))
74
+ // 3) Write one file to GCS for every window of messages.
75
+ .apply ("Write Files to GCS" , new WriteOneFilePerWindow (options .getOutput (), numShards ));
75
76
76
77
// Execute the pipeline and wait until it finishes running.
77
78
pipeline .run ().waitUntilFinish ();
78
79
}
79
80
}
80
- // [END pubsub_to_gcs]
81
+ // [END pubsub_to_gcs]
0 commit comments