Skip to content

Commit e72e7cb

Browse files
authored
Fixed validation regular expressions (GoogleCloudPlatform#3332)
* Fixed validation regular expressions Updated the Pub/Sub regular expression to what Pub/Sub actually validates to. * Linked to resources for validation * Fixed lint warnings
1 parent 5ee8237 commit e72e7cb

File tree

5 files changed

+79
-64
lines changed

5 files changed

+79
-64
lines changed

dataflow/flex-templates/streaming_beam_sql/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
FROM gcr.io/dataflow-templates-base/java11-template-launcher-base:latest
1818

1919
# Define the Java command options required by Dataflow Flex Templates.
20-
ENV FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSQL"
20+
ENV FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"
2121
ENV FLEX_TEMPLATE_JAVA_CLASSPATH="/template/pipeline.jar"
2222

2323
# Make sure to package as an uber-jar including all dependencies.

dataflow/flex-templates/streaming_beam_sql/README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ to transform the message data, and writes the results to a
101101
[BigQuery](https://cloud.google.com/bigquery) table.
102102

103103
* [Dockerfile](Dockerfile)
104-
* [StreamingBeamSQL.java](src/main/java/org/apache/beam/samples/StreamingBeamSQL.java)
104+
* [StreamingBeamSql.java](src/main/java/org/apache/beam/samples/StreamingBeamSql.java)
105105
* [pom.xml](pom.xml)
106106
* [metadata.json](metadata.json)
107107

@@ -114,7 +114,7 @@ to transform the message data, and writes the results to a
114114
>
115115
> ```sh
116116
> mvn compile exec:java \
117-
> -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSQL \
117+
> -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
118118
> -Dexec.args="\
119119
> --project=$PROJECT \
120120
> --inputSubscription=$SUBSCRIPTION \
@@ -189,6 +189,12 @@ necessary information to run the job, such as the SDK information and metadata.
189189
The [`metadata.json`](metadata.json) file contains additional information for
190190
the template such as the "name", "description", and input "parameters" field.
191191

192+
We used
193+
[regular expressions](https://docs.microsoft.com/en-us/dotnet/standard/base-types/regular-expression-language-quick-reference)
194+
for validation on the input
195+
[Pub/Sub subscription](https://cloud.google.com/pubsub/docs/admin#resource_names)
196+
and [BigQuery table](https://cloud.google.com/bigquery/docs/tables#table_naming).
197+
192198
The template file must be created in a Cloud Storage location,
193199
and is used to run a new Dataflow job.
194200

dataflow/flex-templates/streaming_beam_sql/metadata.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"label": "Pub/Sub input subscription.",
88
"helpText": "Pub/Sub subscription to read from.",
99
"regexes": [
10-
"[-_.a-zA-Z0-9]+"
10+
"(?!goog)[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
1111
]
1212
},
1313
{

dataflow/flex-templates/streaming_beam_sql/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2020
<modelVersion>4.0.0</modelVersion>
2121

22+
<parent>
23+
<groupId>com.google.cloud.samples</groupId>
24+
<artifactId>shared-configuration</artifactId>
25+
<version>1.0.17</version>
26+
</parent>
27+
2228
<groupId>org.apache.beam.samples</groupId>
2329
<artifactId>streaming-beam-sql</artifactId>
2430
<version>1.0</version>
Lines changed: 63 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,41 @@
1-
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
9-
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
12-
* Unless required by applicable law or agreed to in writing, software
13-
* distributed under the License is distributed on an "AS IS" BASIS,
14-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
* See the License for the specific language governing permissions and
16-
* limitations under the License.
17-
*/
18-
package org.apache.beam.samples;
1+
// Copyright 2020 Google Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
1914

20-
import java.util.Arrays;
15+
package org.apache.beam.samples;
2116

2217
import com.google.api.services.bigquery.model.TableFieldSchema;
2318
import com.google.api.services.bigquery.model.TableRow;
2419
import com.google.api.services.bigquery.model.TableSchema;
2520
import com.google.gson.Gson;
2621
import com.google.pubsub.v1.ProjectSubscriptionName;
27-
22+
import java.util.Arrays;
2823
import org.apache.avro.reflect.Nullable;
2924
import org.apache.beam.sdk.Pipeline;
3025
import org.apache.beam.sdk.coders.AvroCoder;
3126
import org.apache.beam.sdk.coders.DefaultCoder;
3227
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3328
import org.apache.beam.sdk.extensions.sql.SqlTransform;
29+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
3430
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
3531
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
36-
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
3732
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
3833
import org.apache.beam.sdk.options.Default;
3934
import org.apache.beam.sdk.options.Description;
4035
import org.apache.beam.sdk.options.PipelineOptionsFactory;
4136
import org.apache.beam.sdk.options.StreamingOptions;
4237
import org.apache.beam.sdk.options.Validation;
4338
import org.apache.beam.sdk.schemas.Schema;
44-
import org.apache.beam.sdk.transforms.Create;
4539
import org.apache.beam.sdk.transforms.MapElements;
4640
import org.apache.beam.sdk.transforms.WithTimestamps;
4741
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -57,34 +51,40 @@
5751
* An Apache Beam streaming pipeline that reads JSON encoded messages fromPub/Sub,
5852
* uses Beam SQL to transform the message data, and writes the results to a BigQuery.
5953
*/
60-
public class StreamingBeamSQL {
61-
private static final Logger LOG = LoggerFactory.getLogger(StreamingBeamSQL.class);
54+
public class StreamingBeamSql {
55+
private static final Logger LOG = LoggerFactory.getLogger(StreamingBeamSql.class);
6256
private static final Gson GSON = new Gson();
6357

6458
public interface Options extends StreamingOptions {
6559
@Description("Pub/Sub subscription to read from.")
6660
@Validation.Required
6761
String getInputSubscription();
62+
6863
void setInputSubscription(String value);
6964

70-
@Description("BigQuery table to write to, in the form 'project:dataset.table' or 'dataset.table'.")
65+
@Description("BigQuery table to write to, in the form "
66+
+ "'project:dataset.table' or 'dataset.table'.")
7167
@Default.String("beam_samples.streaming_beam_sql")
7268
String getOutputTable();
69+
7370
void setOutputTable(String value);
7471
}
7572

7673
@DefaultCoder(AvroCoder.class)
7774
private static class PageReviewMessage {
78-
@Nullable String url;
79-
@Nullable String review;
75+
@Nullable
76+
String url;
77+
@Nullable
78+
String review;
8079
}
8180

8281
public static void main(final String[] args) {
8382
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
8483
options.setStreaming(true);
8584

8685
var project = options.as(GcpOptions.class).getProject();
87-
var subscription = ProjectSubscriptionName.of(project, options.getInputSubscription()).toString();
86+
var subscription = ProjectSubscriptionName
87+
.of(project, options.getInputSubscription()).toString();
8888

8989
var schema = Schema.builder()
9090
.addStringField("url")
@@ -96,47 +96,50 @@ public static void main(final String[] args) {
9696
pipeline
9797
// Read, parse, and validate messages from Pub/Sub.
9898
.apply("Read messages from Pub/Sub", PubsubIO.readStrings().fromSubscription(subscription))
99-
.apply("Parse JSON into SQL rows", MapElements.into(TypeDescriptor.of(Row.class)).via(message -> {
100-
// This is a good place to add error handling.
101-
// The first transform should act as a validation layer to make sure
102-
// that any data coming to the processing pipeline must be valid.
103-
// See `MapElements.MapWithFailures` for more details.
104-
LOG.info("message: {}", message);
105-
var msg = GSON.fromJson(message, PageReviewMessage.class);
106-
return Row.withSchema(schema).addValues(
107-
msg.url, // row url
108-
msg.review.equals("positive") ? 1.0 : 0.0, // row page_score
109-
new Instant() // row processing_time
110-
).build();
111-
})).setRowSchema(schema) // make sure to set the row schema for the PCollection
99+
.apply("Parse JSON into SQL rows", MapElements.into(TypeDescriptor.of(Row.class))
100+
.via(message -> {
101+
// This is a good place to add error handling.
102+
// The first transform should act as a validation layer to make sure
103+
// that any data coming to the processing pipeline must be valid.
104+
// See `MapElements.MapWithFailures` for more details.
105+
LOG.info("message: {}", message);
106+
var msg = GSON.fromJson(message, PageReviewMessage.class);
107+
return Row.withSchema(schema).addValues(
108+
msg.url, // row url
109+
msg.review.equals("positive") ? 1.0 : 0.0, // row page_score
110+
new Instant() // row processing_time
111+
).build();
112+
})).setRowSchema(schema) // make sure to set the row schema for the PCollection
112113

113114
// Add timestamps and bundle elements into windows.
114-
.apply("Add processing time", WithTimestamps.of((row) -> row.getDateTime("processing_time").toInstant()))
115+
.apply("Add processing time", WithTimestamps
116+
.of((row) -> row.getDateTime("processing_time").toInstant()))
115117
.apply("Fixed-size windows", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
116118

117119
// Apply a SQL query for every window of elements.
118120
.apply("Run Beam SQL query", SqlTransform.query(
119-
"SELECT " +
120-
" url, " +
121-
" COUNT(page_score) AS num_reviews, " +
122-
" AVG(page_score) AS score, " +
123-
" MIN(processing_time) AS first_date, " +
124-
" MAX(processing_time) AS last_date " +
125-
"FROM PCOLLECTION " +
126-
"GROUP BY url"
121+
"SELECT "
122+
+ " url, "
123+
+ " COUNT(page_score) AS num_reviews, "
124+
+ " AVG(page_score) AS score, "
125+
+ " MIN(processing_time) AS first_date, "
126+
+ " MAX(processing_time) AS last_date "
127+
+ "FROM PCOLLECTION "
128+
+ "GROUP BY url"
127129
))
128130

129131
// Convert the SQL Rows into BigQuery TableRows and write them to BigQuery.
130-
.apply("Convert to BigQuery TableRow", MapElements.into(TypeDescriptor.of(TableRow.class)).via(row -> {
131-
LOG.info("rating summary: {} {} ({} reviews)", row.getDouble("score"), row.getString("url"),
132-
row.getInt64("num_reviews"));
133-
return new TableRow()
134-
.set("url", row.getString("url"))
135-
.set("num_reviews", row.getInt64("num_reviews"))
136-
.set("score", row.getDouble("score"))
137-
.set("first_date", row.getDateTime("first_date").toInstant().toString())
138-
.set("last_date", row.getDateTime("last_date").toInstant().toString());
139-
}))
132+
.apply("Convert to BigQuery TableRow", MapElements.into(TypeDescriptor.of(TableRow.class))
133+
.via(row -> {
134+
LOG.info("rating summary: {} {} ({} reviews)", row.getDouble("score"),
135+
row.getString("url"), row.getInt64("num_reviews"));
136+
return new TableRow()
137+
.set("url", row.getString("url"))
138+
.set("num_reviews", row.getInt64("num_reviews"))
139+
.set("score", row.getDouble("score"))
140+
.set("first_date", row.getDateTime("first_date").toInstant().toString())
141+
.set("last_date", row.getDateTime("last_date").toInstant().toString());
142+
}))
140143
.apply("Write to BigQuery", BigQueryIO.writeTableRows()
141144
.to(options.getOutputTable())
142145
.withSchema(new TableSchema().setFields(Arrays.asList(

0 commit comments

Comments
 (0)