Skip to content

Commit 293742f

Browse files
Add error handling for streaming inserts
1 parent 8ddd51f commit 293742f

File tree

2 files changed

+20
-4
lines changed

2 files changed

+20
-4
lines changed

dataflow/snippets/src/main/java/com/example/dataflow/BigQueryStreamExactlyOnce.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.beam.sdk.transforms.MapElements;
3131
import org.apache.beam.sdk.values.TimestampedValue;
3232
import org.apache.beam.sdk.values.TypeDescriptor;
33+
import org.apache.beam.sdk.values.TypeDescriptors;
3334
import org.joda.time.Duration;
3435
import org.joda.time.Instant;
3536

@@ -45,6 +46,8 @@ private static TestStream<String> createEventSource() {
4546
TimestampedValue.of("Bob,30",
4647
startTime.plus(Duration.standardSeconds(1))),
4748
TimestampedValue.of("Charles,40",
49+
startTime.plus(Duration.standardSeconds(2))),
50+
TimestampedValue.of("Dylan,Invalid value",
4851
startTime.plus(Duration.standardSeconds(2))))
4952
.advanceWatermarkToInfinity();
5053
}
@@ -80,10 +83,19 @@ public static PipelineResult main(String[] args) {
8083
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
8184
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
8285
.withMethod(Write.Method.STORAGE_WRITE_API)
83-
// For exactly-once processing, set the number of Write API streams and the triggering
84-
// frequency.
85-
.withNumStorageWriteApiStreams(1)
86-
.withTriggeringFrequency(Duration.standardSeconds(5)));
86+
// For exactly-once processing, set the triggering frequency.
87+
.withTriggeringFrequency(Duration.standardSeconds(5)))
88+
// Get the collection of write errors.
89+
.getFailedStorageApiInserts()
90+
.apply(MapElements.into(TypeDescriptors.strings())
91+
// Process each error. In production systems, it's useful to write the errors to
92+
// another destination, such as a dead-letter table or queue.
93+
.via(
94+
x -> {
95+
System.out.println("Failed insert: " + x.getErrorMessage());
96+
System.out.println("Row: " + x.getRow());
97+
return "";
98+
}));
8799
return pipeline.run();
88100
}
89101
}

dataflow/snippets/src/test/java/com/example/dataflow/BigQueryWriteIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.example.dataflow;
1818

1919
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
2021

2122
import com.google.cloud.bigquery.BigQuery;
2223
import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption;
@@ -136,5 +137,8 @@ public void streamExactlyOnce() throws Exception {
136137
QueryJobConfiguration.newBuilder(query).setDefaultDataset(datasetName).build();
137138
TableResult result = bigquery.query(queryConfig);
138139
assertEquals(3, result.getTotalRows());
140+
// Verify that the bad data was written to the error collection.
141+
String got = bout.toString();
142+
assertTrue(got.contains("Failed insert: "));
139143
}
140144
}

0 commit comments

Comments
 (0)