|
62 | 62 | import org.apache.beam.sdk.transforms.ParDo;
|
63 | 63 | import org.apache.beam.sdk.transforms.SerializableFunction;
|
64 | 64 | import org.apache.beam.sdk.transforms.SimpleFunction;
|
| 65 | +import org.apache.beam.sdk.transforms.WithFailures; |
| 66 | +import org.apache.beam.sdk.transforms.WithFailures.Result; |
65 | 67 | import org.apache.beam.sdk.transforms.display.DisplayData;
|
66 | 68 | import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
|
67 | 69 | import org.apache.beam.sdk.util.CoderUtils;
|
| 70 | +import org.apache.beam.sdk.values.EncodableThrowable; |
| 71 | +import org.apache.beam.sdk.values.KV; |
68 | 72 | import org.apache.beam.sdk.values.PBegin;
|
69 | 73 | import org.apache.beam.sdk.values.PCollection;
|
70 | 74 | import org.apache.beam.sdk.values.PDone;
|
@@ -643,6 +647,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
|
643 | 647 |
|
644 | 648 | abstract @Nullable ValueProvider<PubsubTopic> getTopicProvider();
|
645 | 649 |
|
| 650 | + abstract @Nullable ValueProvider<PubsubTopic> getDeadLetterTopicProvider(); |
| 651 | + |
646 | 652 | abstract PubsubClient.PubsubClientFactory getPubsubClientFactory();
|
647 | 653 |
|
648 | 654 | abstract @Nullable ValueProvider<PubsubSubscription> getSubscriptionProvider();
|
@@ -693,6 +699,8 @@ static Builder<PubsubMessage> newBuilder() {
|
693 | 699 | abstract static class Builder<T> {
|
694 | 700 | abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topic);
|
695 | 701 |
|
| 702 | + abstract Builder<T> setDeadLetterTopicProvider(ValueProvider<PubsubTopic> deadLetterTopic); |
| 703 | + |
696 | 704 | abstract Builder<T> setPubsubClientFactory(PubsubClient.PubsubClientFactory clientFactory);
|
697 | 705 |
|
698 | 706 | abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
|
@@ -763,17 +771,62 @@ public Read<T> fromTopic(String topic) {
|
763 | 771 | return fromTopic(StaticValueProvider.of(topic));
|
764 | 772 | }
|
765 | 773 |
|
766 |
| - /** Like {@code topic()} but with a {@link ValueProvider}. */ |
| 774 | + /** Like {@link Read#fromTopic(String)} but with a {@link ValueProvider}. */ |
767 | 775 | public Read<T> fromTopic(ValueProvider<String> topic) {
|
768 |
| - if (topic.isAccessible()) { |
769 |
| - // Validate. |
770 |
| - PubsubTopic.fromPath(topic.get()); |
771 |
| - } |
| 776 | + validateTopic(topic); |
772 | 777 | return toBuilder()
|
773 | 778 | .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
|
774 | 779 | .build();
|
775 | 780 | }
|
776 | 781 |
|
| 782 | + /** |
| 783 | + * Creates and returns a transform for writing read failures out to a dead-letter topic. |
| 784 | + * |
| 785 | + * <p>The message written to the dead-letter will contain three attributes: |
| 786 | + * |
| 787 | + * <ul> |
| 788 | + * <li>exceptionClassName: The type of exception that was thrown. |
| 789 | + * <li>exceptionMessage: The message in the exception |
| 790 | + * <li>pubsubMessageId: The message id of the original Pub/Sub message if it was read in, |
| 791 | + * otherwise "<null>" |
| 792 | + * </ul> |
| 793 | + * |
| 794 | + * <p>The {@link PubsubClient.PubsubClientFactory} used in the {@link Write} transform for |
| 795 | + * errors will be the same as used in the final {@link Read} transform. |
| 796 | + * |
| 797 | + * <p>If there <i>might</i> be a parsing error (or similar), then this should be set up on the |
| 798 | + * topic to avoid wasting resources and to provide more error details with the message written |
| 799 | + * to Pub/Sub. Otherwise, the Pub/Sub topic should have a dead-letter configuration set up to |
| 800 | + * avoid an infinite retry loop. |
| 801 | + * |
| 802 | + * <p>Only failures that result from the {@link Read} configuration (e.g. parsing errors) will |
| 803 | + * be sent to the dead-letter topic. Errors that occur after a successful read will need to set |
| 804 | + * up their own {@link Write} transform. Errors with delivery require configuring Pub/Sub itself |
| 805 | + * to write to the dead-letter topic after a certain number of failed attempts. |
| 806 | + * |
| 807 | + * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the |
| 808 | + * {@code deadLetterTopic} string. |
| 809 | + */ |
| 810 | + public Read<T> withDeadLetterTopic(String deadLetterTopic) { |
| 811 | + return withDeadLetterTopic(StaticValueProvider.of(deadLetterTopic)); |
| 812 | + } |
| 813 | + |
| 814 | + /** Like {@link Read#withDeadLetterTopic(String)} but with a {@link ValueProvider}. */ |
| 815 | + public Read<T> withDeadLetterTopic(ValueProvider<String> deadLetterTopic) { |
| 816 | + validateTopic(deadLetterTopic); |
| 817 | + return toBuilder() |
| 818 | + .setDeadLetterTopicProvider( |
| 819 | + NestedValueProvider.of(deadLetterTopic, PubsubTopic::fromPath)) |
| 820 | + .build(); |
| 821 | + } |
| 822 | + |
| 823 | + /** Handles validation of {@code topic}. */ |
| 824 | + private static void validateTopic(ValueProvider<String> topic) { |
| 825 | + if (topic.isAccessible()) { |
| 826 | + PubsubTopic.fromPath(topic.get()); |
| 827 | + } |
| 828 | + } |
| 829 | + |
777 | 830 | /**
|
778 | 831 | * The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link
|
779 | 832 | * PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client
|
@@ -881,8 +934,60 @@ public PCollection<T> expand(PBegin input) {
|
881 | 934 | getIdAttribute(),
|
882 | 935 | getNeedsAttributes(),
|
883 | 936 | getNeedsMessageId());
|
884 |
| - PCollection<T> read = |
885 |
| - input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn())); |
| 937 | + |
| 938 | + PCollection<T> read; |
| 939 | + PCollection<PubsubMessage> preParse = input.apply(source); |
| 940 | + TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {}; |
| 941 | + if (getDeadLetterTopicProvider() == null) { |
| 942 | + read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn())); |
| 943 | + } else { |
| 944 | + Result<PCollection<T>, KV<PubsubMessage, EncodableThrowable>> result = |
| 945 | + preParse.apply( |
| 946 | + "PubsubIO.Read/Map/Parse-Incoming-Messages", |
| 947 | + MapElements.into(typeDescriptor) |
| 948 | + .via(getParseFn()) |
| 949 | + .exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {})); |
| 950 | + read = result.output(); |
| 951 | + |
| 952 | + // Write out failures to the provided dead-letter topic. |
| 953 | + result |
| 954 | + .failures() |
| 955 | + // Since the stack trace could easily exceed Pub/Sub limits, we need to remove it from |
| 956 | + // the attributes. |
| 957 | + .apply( |
| 958 | + "PubsubIO.Read/Map/Remove-Stack-Trace-Attribute", |
| 959 | + MapElements.into(new TypeDescriptor<KV<PubsubMessage, Map<String, String>>>() {}) |
| 960 | + .via( |
| 961 | + kv -> { |
| 962 | + PubsubMessage message = kv.getKey(); |
| 963 | + String messageId = |
| 964 | + message.getMessageId() == null ? "<null>" : message.getMessageId(); |
| 965 | + Throwable throwable = kv.getValue().throwable(); |
| 966 | + |
| 967 | + // In order to stay within Pub/Sub limits, we aren't adding the stack |
| 968 | + // trace to the attributes. Therefore, we need to log the throwable. |
| 969 | + LOG.error( |
| 970 | + "Error parsing Pub/Sub message with id '{}'", messageId, throwable); |
| 971 | + |
| 972 | + ImmutableMap<String, String> attributes = |
| 973 | + ImmutableMap.<String, String>builder() |
| 974 | + .put("exceptionClassName", throwable.getClass().getName()) |
| 975 | + .put("exceptionMessage", throwable.getMessage()) |
| 976 | + .put("pubsubMessageId", messageId) |
| 977 | + .build(); |
| 978 | + |
| 979 | + return KV.of(kv.getKey(), attributes); |
| 980 | + })) |
| 981 | + .apply( |
| 982 | + "PubsubIO.Read/Map/Create-Dead-Letter-Payload", |
| 983 | + MapElements.into(TypeDescriptor.of(PubsubMessage.class)) |
| 984 | + .via(kv -> new PubsubMessage(kv.getKey().getPayload(), kv.getValue()))) |
| 985 | + .apply( |
| 986 | + writeMessages() |
| 987 | + .to(getDeadLetterTopicProvider().get().asPath()) |
| 988 | + .withClientFactory(getPubsubClientFactory())); |
| 989 | + } |
| 990 | + |
886 | 991 | return read.setCoder(getCoder());
|
887 | 992 | }
|
888 | 993 |
|
|
0 commit comments