1
- /*
2
- * Licensed to the Apache Software Foundation (ASF) under one
3
- * or more contributor license agreements. See the NOTICE file
4
- * distributed with this work for additional information
5
- * regarding copyright ownership. The ASF licenses this file
6
- * to you under the Apache License, Version 2.0 (the
7
- * "License"); you may not use this file except in compliance
8
- * with the License. You may obtain a copy of the License at
9
- *
10
- * http://www.apache.org/licenses/LICENSE-2.0
11
- *
12
- * Unless required by applicable law or agreed to in writing, software
13
- * distributed under the License is distributed on an "AS IS" BASIS,
14
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
- * See the License for the specific language governing permissions and
16
- * limitations under the License.
17
- */
18
- package org .apache .beam .samples ;
1
+ // Copyright 2020 Google Inc.
2
+ //
3
+ // Licensed under the Apache License, Version 2.0 (the "License");
4
+ // you may not use this file except in compliance with the License.
5
+ // You may obtain a copy of the License at
6
+ //
7
+ // http://www.apache.org/licenses/LICENSE-2.0
8
+ //
9
+ // Unless required by applicable law or agreed to in writing, software
10
+ // distributed under the License is distributed on an "AS IS" BASIS,
11
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ // See the License for the specific language governing permissions and
13
+ // limitations under the License.
19
14
20
- import java . util . Arrays ;
15
+ package org . apache . beam . samples ;
21
16
22
17
import com .google .api .services .bigquery .model .TableFieldSchema ;
23
18
import com .google .api .services .bigquery .model .TableRow ;
24
19
import com .google .api .services .bigquery .model .TableSchema ;
25
20
import com .google .gson .Gson ;
26
21
import com .google .pubsub .v1 .ProjectSubscriptionName ;
27
-
22
+ import java . util . Arrays ;
28
23
import org .apache .avro .reflect .Nullable ;
29
24
import org .apache .beam .sdk .Pipeline ;
30
25
import org .apache .beam .sdk .coders .AvroCoder ;
31
26
import org .apache .beam .sdk .coders .DefaultCoder ;
32
27
import org .apache .beam .sdk .extensions .gcp .options .GcpOptions ;
33
28
import org .apache .beam .sdk .extensions .sql .SqlTransform ;
29
+ import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO ;
34
30
import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO .Write .CreateDisposition ;
35
31
import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO .Write .WriteDisposition ;
36
- import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO ;
37
32
import org .apache .beam .sdk .io .gcp .pubsub .PubsubIO ;
38
33
import org .apache .beam .sdk .options .Default ;
39
34
import org .apache .beam .sdk .options .Description ;
40
35
import org .apache .beam .sdk .options .PipelineOptionsFactory ;
41
36
import org .apache .beam .sdk .options .StreamingOptions ;
42
37
import org .apache .beam .sdk .options .Validation ;
43
38
import org .apache .beam .sdk .schemas .Schema ;
44
- import org .apache .beam .sdk .transforms .Create ;
45
39
import org .apache .beam .sdk .transforms .MapElements ;
46
40
import org .apache .beam .sdk .transforms .WithTimestamps ;
47
41
import org .apache .beam .sdk .transforms .windowing .FixedWindows ;
57
51
* An Apache Beam streaming pipeline that reads JSON encoded messages fromPub/Sub,
58
52
* uses Beam SQL to transform the message data, and writes the results to a BigQuery.
59
53
*/
60
- public class StreamingBeamSQL {
61
- private static final Logger LOG = LoggerFactory .getLogger (StreamingBeamSQL .class );
54
+ public class StreamingBeamSql {
55
+ private static final Logger LOG = LoggerFactory .getLogger (StreamingBeamSql .class );
62
56
private static final Gson GSON = new Gson ();
63
57
64
58
public interface Options extends StreamingOptions {
65
59
@ Description ("Pub/Sub subscription to read from." )
66
60
@ Validation .Required
67
61
String getInputSubscription ();
62
+
68
63
void setInputSubscription (String value );
69
64
70
- @ Description ("BigQuery table to write to, in the form 'project:dataset.table' or 'dataset.table'." )
65
+ @ Description ("BigQuery table to write to, in the form "
66
+ + "'project:dataset.table' or 'dataset.table'." )
71
67
@ Default .String ("beam_samples.streaming_beam_sql" )
72
68
String getOutputTable ();
69
+
73
70
void setOutputTable (String value );
74
71
}
75
72
76
73
@ DefaultCoder (AvroCoder .class )
77
74
private static class PageReviewMessage {
78
- @ Nullable String url ;
79
- @ Nullable String review ;
75
+ @ Nullable
76
+ String url ;
77
+ @ Nullable
78
+ String review ;
80
79
}
81
80
82
81
public static void main (final String [] args ) {
83
82
Options options = PipelineOptionsFactory .fromArgs (args ).withValidation ().as (Options .class );
84
83
options .setStreaming (true );
85
84
86
85
var project = options .as (GcpOptions .class ).getProject ();
87
- var subscription = ProjectSubscriptionName .of (project , options .getInputSubscription ()).toString ();
86
+ var subscription = ProjectSubscriptionName
87
+ .of (project , options .getInputSubscription ()).toString ();
88
88
89
89
var schema = Schema .builder ()
90
90
.addStringField ("url" )
@@ -96,47 +96,50 @@ public static void main(final String[] args) {
96
96
pipeline
97
97
// Read, parse, and validate messages from Pub/Sub.
98
98
.apply ("Read messages from Pub/Sub" , PubsubIO .readStrings ().fromSubscription (subscription ))
99
- .apply ("Parse JSON into SQL rows" , MapElements .into (TypeDescriptor .of (Row .class )).via (message -> {
100
- // This is a good place to add error handling.
101
- // The first transform should act as a validation layer to make sure
102
- // that any data coming to the processing pipeline must be valid.
103
- // See `MapElements.MapWithFailures` for more details.
104
- LOG .info ("message: {}" , message );
105
- var msg = GSON .fromJson (message , PageReviewMessage .class );
106
- return Row .withSchema (schema ).addValues (
107
- msg .url , // row url
108
- msg .review .equals ("positive" ) ? 1.0 : 0.0 , // row page_score
109
- new Instant () // row processing_time
110
- ).build ();
111
- })).setRowSchema (schema ) // make sure to set the row schema for the PCollection
99
+ .apply ("Parse JSON into SQL rows" , MapElements .into (TypeDescriptor .of (Row .class ))
100
+ .via (message -> {
101
+ // This is a good place to add error handling.
102
+ // The first transform should act as a validation layer to make sure
103
+ // that any data coming to the processing pipeline must be valid.
104
+ // See `MapElements.MapWithFailures` for more details.
105
+ LOG .info ("message: {}" , message );
106
+ var msg = GSON .fromJson (message , PageReviewMessage .class );
107
+ return Row .withSchema (schema ).addValues (
108
+ msg .url , // row url
109
+ msg .review .equals ("positive" ) ? 1.0 : 0.0 , // row page_score
110
+ new Instant () // row processing_time
111
+ ).build ();
112
+ })).setRowSchema (schema ) // make sure to set the row schema for the PCollection
112
113
113
114
// Add timestamps and bundle elements into windows.
114
- .apply ("Add processing time" , WithTimestamps .of ((row ) -> row .getDateTime ("processing_time" ).toInstant ()))
115
+ .apply ("Add processing time" , WithTimestamps
116
+ .of ((row ) -> row .getDateTime ("processing_time" ).toInstant ()))
115
117
.apply ("Fixed-size windows" , Window .into (FixedWindows .of (Duration .standardMinutes (1 ))))
116
118
117
119
// Apply a SQL query for every window of elements.
118
120
.apply ("Run Beam SQL query" , SqlTransform .query (
119
- "SELECT " +
120
- " url, " +
121
- " COUNT(page_score) AS num_reviews, " +
122
- " AVG(page_score) AS score, " +
123
- " MIN(processing_time) AS first_date, " +
124
- " MAX(processing_time) AS last_date " +
125
- "FROM PCOLLECTION " +
126
- "GROUP BY url"
121
+ "SELECT "
122
+ + " url, "
123
+ + " COUNT(page_score) AS num_reviews, "
124
+ + " AVG(page_score) AS score, "
125
+ + " MIN(processing_time) AS first_date, "
126
+ + " MAX(processing_time) AS last_date "
127
+ + "FROM PCOLLECTION "
128
+ + "GROUP BY url"
127
129
))
128
130
129
131
// Convert the SQL Rows into BigQuery TableRows and write them to BigQuery.
130
- .apply ("Convert to BigQuery TableRow" , MapElements .into (TypeDescriptor .of (TableRow .class )).via (row -> {
131
- LOG .info ("rating summary: {} {} ({} reviews)" , row .getDouble ("score" ), row .getString ("url" ),
132
- row .getInt64 ("num_reviews" ));
133
- return new TableRow ()
134
- .set ("url" , row .getString ("url" ))
135
- .set ("num_reviews" , row .getInt64 ("num_reviews" ))
136
- .set ("score" , row .getDouble ("score" ))
137
- .set ("first_date" , row .getDateTime ("first_date" ).toInstant ().toString ())
138
- .set ("last_date" , row .getDateTime ("last_date" ).toInstant ().toString ());
139
- }))
132
+ .apply ("Convert to BigQuery TableRow" , MapElements .into (TypeDescriptor .of (TableRow .class ))
133
+ .via (row -> {
134
+ LOG .info ("rating summary: {} {} ({} reviews)" , row .getDouble ("score" ),
135
+ row .getString ("url" ), row .getInt64 ("num_reviews" ));
136
+ return new TableRow ()
137
+ .set ("url" , row .getString ("url" ))
138
+ .set ("num_reviews" , row .getInt64 ("num_reviews" ))
139
+ .set ("score" , row .getDouble ("score" ))
140
+ .set ("first_date" , row .getDateTime ("first_date" ).toInstant ().toString ())
141
+ .set ("last_date" , row .getDateTime ("last_date" ).toInstant ().toString ());
142
+ }))
140
143
.apply ("Write to BigQuery" , BigQueryIO .writeTableRows ()
141
144
.to (options .getOutputTable ())
142
145
.withSchema (new TableSchema ().setFields (Arrays .asList (
0 commit comments