30
30
import org .apache .beam .sdk .transforms .MapElements ;
31
31
import org .apache .beam .sdk .values .TimestampedValue ;
32
32
import org .apache .beam .sdk .values .TypeDescriptor ;
33
+ import org .apache .beam .sdk .values .TypeDescriptors ;
33
34
import org .joda .time .Duration ;
34
35
import org .joda .time .Instant ;
35
36
@@ -45,6 +46,8 @@ private static TestStream<String> createEventSource() {
45
46
TimestampedValue .of ("Bob,30" ,
46
47
startTime .plus (Duration .standardSeconds (1 ))),
47
48
TimestampedValue .of ("Charles,40" ,
49
+ startTime .plus (Duration .standardSeconds (2 ))),
50
+ TimestampedValue .of ("Dylan,Invalid value" ,
48
51
startTime .plus (Duration .standardSeconds (2 ))))
49
52
.advanceWatermarkToInfinity ();
50
53
}
@@ -80,10 +83,19 @@ public static PipelineResult main(String[] args) {
80
83
.withCreateDisposition (CreateDisposition .CREATE_NEVER )
81
84
.withWriteDisposition (WriteDisposition .WRITE_APPEND )
82
85
.withMethod (Write .Method .STORAGE_WRITE_API )
83
- // For exactly-once processing, set the number of Write API streams and the triggering
84
- // frequency.
85
- .withNumStorageWriteApiStreams (1 )
86
- .withTriggeringFrequency (Duration .standardSeconds (5 )));
86
+ // For exactly-once processing, set the triggering frequency.
87
+ .withTriggeringFrequency (Duration .standardSeconds (5 )))
88
+ // Get the collection of write errors.
89
+ .getFailedStorageApiInserts ()
90
+ .apply (MapElements .into (TypeDescriptors .strings ())
91
+ // Process each error. In production systems, it's useful to write the errors to
92
+ // another destination, such as a dead-letter table or queue.
93
+ .via (
94
+ x -> {
95
+ System .out .println ("Failed insert: " + x .getErrorMessage ());
96
+ System .out .println ("Row: " + x .getRow ());
97
+ return "" ;
98
+ }));
87
99
return pipeline .run ();
88
100
}
89
101
}
0 commit comments