Skip to content

Commit d99ebb6

Browse files
authored
Merge pull request apache#15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic
2 parents ee32c23 + 5ff77d7 commit d99ebb6

File tree

9 files changed

+518
-52
lines changed

9 files changed

+518
-52
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
`VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME`
8585
(Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)).
8686
* Reading from JDBC source by partitions (Java) ([BEAM-12456](https://issues.apache.org/jira/browse/BEAM-12456)).
87+
* PubsubIO can now write to a dead-letter topic after a parsing error (Java)([BEAM-12474](https://issues.apache.org/jira/browse/BEAM-12474)).
8788

8889
## Breaking Changes
8990

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.beam.sdk.Pipeline;
2626
import org.apache.beam.sdk.annotations.Experimental;
2727
import org.apache.beam.sdk.annotations.Experimental.Kind;
28+
import org.apache.beam.sdk.values.EncodableThrowable;
2829
import org.apache.beam.sdk.values.KV;
2930
import org.apache.beam.sdk.values.PCollection;
3031
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -92,6 +93,24 @@ public static <T> ExceptionElement<T> of(T element, Exception exception) {
9293
}
9394
}
9495

96+
/**
97+
* A handler that holds onto the {@link Throwable} that led to the exception, returning it along
98+
* with the original value as a {@link KV}.
99+
*
100+
* <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
101+
* {@link EncodableThrowable} coders can be easily inferred by Beam, so coder inference can be
102+
* successfully applied if the consuming transform passes type information to the failure
103+
* collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
104+
* class rather than of this class directly.
105+
*/
106+
public static class ThrowableHandler<T>
107+
extends SimpleFunction<ExceptionElement<T>, KV<T, EncodableThrowable>> {
108+
@Override
109+
public KV<T, EncodableThrowable> apply(ExceptionElement<T> f) {
110+
return KV.of(f.element(), EncodableThrowable.forThrowable(f.exception()));
111+
}
112+
}
113+
95114
/**
96115
* A simple handler that extracts information from an exception to a {@code Map<String, String>}
97116
* and returns a {@link KV} where the key is the input element that failed processing, and the
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.values;
19+
20+
import java.io.Serializable;
21+
import javax.annotation.Nullable;
22+
23+
/**
24+
* A wrapper around a {@link Throwable} for use with coders.
25+
*
26+
* <p>Though {@link Throwable} is serializable, it doesn't override {@link Object#equals(Object)},
27+
* which can lead to false positives in mutation detection for coders. This class provides a coder-
28+
* safe way to pass exceptions around without running into problems like log spam.
29+
*
30+
* <p>This class is not suitable for general-purpose equality comparison among {@link Throwable}s
31+
* and should only be used to pass a {@link Throwable} from one PTransform to another.
32+
*/
33+
public final class EncodableThrowable implements Serializable {
34+
private Throwable throwable;
35+
36+
private EncodableThrowable() {
37+
// Can't set this to null without adding a pointless @Nullable annotation to the field. It also
38+
// needs to be set from the constructor to avoid a checkstyle violation.
39+
this.throwable = new Throwable();
40+
}
41+
42+
/** Wraps {@code throwable} and returns the result. */
43+
public static EncodableThrowable forThrowable(Throwable throwable) {
44+
EncodableThrowable comparable = new EncodableThrowable();
45+
comparable.throwable = throwable;
46+
return comparable;
47+
}
48+
49+
/** Returns the underlying {@link Throwable}. */
50+
public Throwable throwable() {
51+
return throwable;
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return throwable.hashCode();
57+
}
58+
59+
@Override
60+
public boolean equals(@Nullable Object obj) {
61+
if (!(obj instanceof EncodableThrowable)) {
62+
return false;
63+
}
64+
Throwable other = ((EncodableThrowable) obj).throwable;
65+
66+
// Assuming class preservation is enough to know that serialization/deserialization worked.
67+
return throwable.getClass().equals(other.getClass());
68+
}
69+
}

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithFailuresTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.beam.sdk.testing.PAssert;
3131
import org.apache.beam.sdk.testing.TestPipeline;
3232
import org.apache.beam.sdk.transforms.WithFailures.ExceptionAsMapHandler;
33+
import org.apache.beam.sdk.transforms.WithFailures.ThrowableHandler;
34+
import org.apache.beam.sdk.values.EncodableThrowable;
3335
import org.apache.beam.sdk.values.KV;
3436
import org.apache.beam.sdk.values.PCollection;
3537
import org.apache.beam.sdk.values.PCollectionList;
@@ -47,6 +49,35 @@ public class WithFailuresTest implements Serializable {
4749

4850
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
4951

52+
@Test
53+
@Category(NeedsRunner.class)
54+
public void testDirectException() {
55+
List<PCollection<KV<Integer, EncodableThrowable>>> errorCollections = new ArrayList<>();
56+
PCollection<Integer> output =
57+
pipeline
58+
.apply(Create.of(0, 1))
59+
.apply(
60+
MapElements.into(TypeDescriptors.integers())
61+
.via((Integer i) -> 1 / i)
62+
.exceptionsVia(new ThrowableHandler<Integer>() {}))
63+
.failuresTo(errorCollections);
64+
65+
PAssert.that(output).containsInAnyOrder(1);
66+
67+
PAssert.thatSingleton(PCollectionList.of(errorCollections).apply(Flatten.pCollections()))
68+
.satisfies(
69+
kv -> {
70+
assertEquals(Integer.valueOf(0), kv.getKey());
71+
72+
Throwable throwable = kv.getValue().throwable();
73+
assertEquals("java.lang.ArithmeticException", throwable.getClass().getName());
74+
assertEquals("/ by zero", throwable.getMessage());
75+
return null;
76+
});
77+
78+
pipeline.run();
79+
}
80+
5081
/** Test of {@link WithFailures.Result#failuresTo(List)}. */
5182
@Test
5283
@Category(NeedsRunner.class)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.values;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNotEquals;
22+
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import org.junit.runners.JUnit4;
26+
27+
/** Tests for {@link EncodableThrowable}. */
28+
@RunWith(JUnit4.class)
29+
public final class EncodableThrowableTest {
30+
@Test
31+
public void testEquals() {
32+
IllegalStateException exception =
33+
new IllegalStateException(
34+
"Some illegal state",
35+
new RuntimeException(
36+
"Some nested exception", new Exception("Deeply nested exception")));
37+
38+
EncodableThrowable comparable1 = EncodableThrowable.forThrowable(exception);
39+
EncodableThrowable comparable2 = EncodableThrowable.forThrowable(exception);
40+
41+
assertEquals(comparable1, comparable1);
42+
assertEquals(comparable1, comparable2);
43+
}
44+
45+
@Test
46+
public void testEqualsNonComparable() {
47+
assertNotEquals(EncodableThrowable.forThrowable(new Exception()), new Throwable());
48+
}
49+
50+
@Test
51+
public void testEqualsDifferentUnderlyingTypes() {
52+
String message = "some message";
53+
assertNotEquals(
54+
EncodableThrowable.forThrowable(new RuntimeException(message)),
55+
EncodableThrowable.forThrowable(new Exception(message)));
56+
}
57+
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Lines changed: 112 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,13 @@
6262
import org.apache.beam.sdk.transforms.ParDo;
6363
import org.apache.beam.sdk.transforms.SerializableFunction;
6464
import org.apache.beam.sdk.transforms.SimpleFunction;
65+
import org.apache.beam.sdk.transforms.WithFailures;
66+
import org.apache.beam.sdk.transforms.WithFailures.Result;
6567
import org.apache.beam.sdk.transforms.display.DisplayData;
6668
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
6769
import org.apache.beam.sdk.util.CoderUtils;
70+
import org.apache.beam.sdk.values.EncodableThrowable;
71+
import org.apache.beam.sdk.values.KV;
6872
import org.apache.beam.sdk.values.PBegin;
6973
import org.apache.beam.sdk.values.PCollection;
7074
import org.apache.beam.sdk.values.PDone;
@@ -643,6 +647,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
643647

644648
abstract @Nullable ValueProvider<PubsubTopic> getTopicProvider();
645649

650+
abstract @Nullable ValueProvider<PubsubTopic> getDeadLetterTopicProvider();
651+
646652
abstract PubsubClient.PubsubClientFactory getPubsubClientFactory();
647653

648654
abstract @Nullable ValueProvider<PubsubSubscription> getSubscriptionProvider();
@@ -693,6 +699,8 @@ static Builder<PubsubMessage> newBuilder() {
693699
abstract static class Builder<T> {
694700
abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topic);
695701

702+
abstract Builder<T> setDeadLetterTopicProvider(ValueProvider<PubsubTopic> deadLetterTopic);
703+
696704
abstract Builder<T> setPubsubClientFactory(PubsubClient.PubsubClientFactory clientFactory);
697705

698706
abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
@@ -763,17 +771,62 @@ public Read<T> fromTopic(String topic) {
763771
return fromTopic(StaticValueProvider.of(topic));
764772
}
765773

766-
/** Like {@code topic()} but with a {@link ValueProvider}. */
774+
/** Like {@link Read#fromTopic(String)} but with a {@link ValueProvider}. */
767775
public Read<T> fromTopic(ValueProvider<String> topic) {
768-
if (topic.isAccessible()) {
769-
// Validate.
770-
PubsubTopic.fromPath(topic.get());
771-
}
776+
validateTopic(topic);
772777
return toBuilder()
773778
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
774779
.build();
775780
}
776781

782+
/**
783+
* Creates and returns a transform for writing read failures out to a dead-letter topic.
784+
*
785+
* <p>The message written to the dead-letter will contain three attributes:
786+
*
787+
* <ul>
788+
* <li>exceptionClassName: The type of exception that was thrown.
789+
* <li>exceptionMessage: The message in the exception
790+
* <li>pubsubMessageId: The message id of the original Pub/Sub message if it was read in,
791+
* otherwise "<null>"
792+
* </ul>
793+
*
794+
* <p>The {@link PubsubClient.PubsubClientFactory} used in the {@link Write} transform for
795+
* errors will be the same as used in the final {@link Read} transform.
796+
*
797+
* <p>If there <i>might</i> be a parsing error (or similar), then this should be set up on the
798+
* topic to avoid wasting resources and to provide more error details with the message written
799+
* to Pub/Sub. Otherwise, the Pub/Sub topic should have a dead-letter configuration set up to
800+
* avoid an infinite retry loop.
801+
*
802+
* <p>Only failures that result from the {@link Read} configuration (e.g. parsing errors) will
803+
* be sent to the dead-letter topic. Errors that occur after a successful read will need to set
804+
* up their own {@link Write} transform. Errors with delivery require configuring Pub/Sub itself
805+
* to write to the dead-letter topic after a certain number of failed attempts.
806+
*
807+
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
808+
* {@code deadLetterTopic} string.
809+
*/
810+
public Read<T> withDeadLetterTopic(String deadLetterTopic) {
811+
return withDeadLetterTopic(StaticValueProvider.of(deadLetterTopic));
812+
}
813+
814+
/** Like {@link Read#withDeadLetterTopic(String)} but with a {@link ValueProvider}. */
815+
public Read<T> withDeadLetterTopic(ValueProvider<String> deadLetterTopic) {
816+
validateTopic(deadLetterTopic);
817+
return toBuilder()
818+
.setDeadLetterTopicProvider(
819+
NestedValueProvider.of(deadLetterTopic, PubsubTopic::fromPath))
820+
.build();
821+
}
822+
823+
/** Handles validation of {@code topic}. */
824+
private static void validateTopic(ValueProvider<String> topic) {
825+
if (topic.isAccessible()) {
826+
PubsubTopic.fromPath(topic.get());
827+
}
828+
}
829+
777830
/**
778831
* The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link
779832
* PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client
@@ -881,8 +934,60 @@ public PCollection<T> expand(PBegin input) {
881934
getIdAttribute(),
882935
getNeedsAttributes(),
883936
getNeedsMessageId());
884-
PCollection<T> read =
885-
input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()));
937+
938+
PCollection<T> read;
939+
PCollection<PubsubMessage> preParse = input.apply(source);
940+
TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
941+
if (getDeadLetterTopicProvider() == null) {
942+
read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
943+
} else {
944+
Result<PCollection<T>, KV<PubsubMessage, EncodableThrowable>> result =
945+
preParse.apply(
946+
"PubsubIO.Read/Map/Parse-Incoming-Messages",
947+
MapElements.into(typeDescriptor)
948+
.via(getParseFn())
949+
.exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {}));
950+
read = result.output();
951+
952+
// Write out failures to the provided dead-letter topic.
953+
result
954+
.failures()
955+
// Since the stack trace could easily exceed Pub/Sub limits, we need to remove it from
956+
// the attributes.
957+
.apply(
958+
"PubsubIO.Read/Map/Remove-Stack-Trace-Attribute",
959+
MapElements.into(new TypeDescriptor<KV<PubsubMessage, Map<String, String>>>() {})
960+
.via(
961+
kv -> {
962+
PubsubMessage message = kv.getKey();
963+
String messageId =
964+
message.getMessageId() == null ? "<null>" : message.getMessageId();
965+
Throwable throwable = kv.getValue().throwable();
966+
967+
// In order to stay within Pub/Sub limits, we aren't adding the stack
968+
// trace to the attributes. Therefore, we need to log the throwable.
969+
LOG.error(
970+
"Error parsing Pub/Sub message with id '{}'", messageId, throwable);
971+
972+
ImmutableMap<String, String> attributes =
973+
ImmutableMap.<String, String>builder()
974+
.put("exceptionClassName", throwable.getClass().getName())
975+
.put("exceptionMessage", throwable.getMessage())
976+
.put("pubsubMessageId", messageId)
977+
.build();
978+
979+
return KV.of(kv.getKey(), attributes);
980+
}))
981+
.apply(
982+
"PubsubIO.Read/Map/Create-Dead-Letter-Payload",
983+
MapElements.into(TypeDescriptor.of(PubsubMessage.class))
984+
.via(kv -> new PubsubMessage(kv.getKey().getPayload(), kv.getValue())))
985+
.apply(
986+
writeMessages()
987+
.to(getDeadLetterTopicProvider().get().asPath())
988+
.withClientFactory(getPubsubClientFactory()));
989+
}
990+
886991
return read.setCoder(getCoder());
887992
}
888993

0 commit comments

Comments
 (0)