|
30 | 30 | import static org.hamcrest.Matchers.hasProperty;
|
31 | 31 | import static org.hamcrest.Matchers.is;
|
32 | 32 | import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
33 |
| -import static org.hamcrest.Matchers.not; |
34 | 33 | import static org.junit.Assert.assertEquals;
|
35 | 34 | import static org.junit.Assert.assertFalse;
|
36 | 35 | import static org.junit.Assert.assertNotNull;
|
|
90 | 89 | import org.apache.beam.runners.core.construction.Environments;
|
91 | 90 | import org.apache.beam.runners.core.construction.PipelineTranslation;
|
92 | 91 | import org.apache.beam.runners.core.construction.SdkComponents;
|
93 |
| -import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory; |
94 | 92 | import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
|
95 | 93 | import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
|
96 | 94 | import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
|
|
99 | 97 | import org.apache.beam.sdk.Pipeline;
|
100 | 98 | import org.apache.beam.sdk.Pipeline.PipelineVisitor;
|
101 | 99 | import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
|
102 |
| -import org.apache.beam.sdk.coders.VoidCoder; |
103 | 100 | import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
|
104 | 101 | import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
|
105 | 102 | import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
|
|
111 | 108 | import org.apache.beam.sdk.io.FileSystems;
|
112 | 109 | import org.apache.beam.sdk.io.TextIO;
|
113 | 110 | import org.apache.beam.sdk.io.WriteFiles;
|
114 |
| -import org.apache.beam.sdk.io.WriteFilesResult; |
115 | 111 | import org.apache.beam.sdk.io.fs.ResourceId;
|
116 | 112 | import org.apache.beam.sdk.options.ExperimentalOptions;
|
117 | 113 | import org.apache.beam.sdk.options.PipelineOptions;
|
|
120 | 116 | import org.apache.beam.sdk.options.StreamingOptions;
|
121 | 117 | import org.apache.beam.sdk.options.ValueProvider;
|
122 | 118 | import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
123 |
| -import org.apache.beam.sdk.runners.AppliedPTransform; |
124 |
| -import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; |
125 | 119 | import org.apache.beam.sdk.runners.TransformHierarchy;
|
126 | 120 | import org.apache.beam.sdk.runners.TransformHierarchy.Node;
|
127 | 121 | import org.apache.beam.sdk.state.MapState;
|
|
142 | 136 | import org.apache.beam.sdk.transforms.ParDo;
|
143 | 137 | import org.apache.beam.sdk.transforms.SerializableFunctions;
|
144 | 138 | import org.apache.beam.sdk.transforms.SimpleFunction;
|
145 |
| -import org.apache.beam.sdk.transforms.resourcehints.ResourceHints; |
146 | 139 | import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
|
| 140 | +import org.apache.beam.sdk.transforms.windowing.FixedWindows; |
147 | 141 | import org.apache.beam.sdk.transforms.windowing.PaneInfo;
|
148 | 142 | import org.apache.beam.sdk.transforms.windowing.Sessions;
|
149 | 143 | import org.apache.beam.sdk.transforms.windowing.Window;
|
150 | 144 | import org.apache.beam.sdk.util.ShardedKey;
|
151 | 145 | import org.apache.beam.sdk.values.KV;
|
152 | 146 | import org.apache.beam.sdk.values.PCollection;
|
153 |
| -import org.apache.beam.sdk.values.PValues; |
| 147 | +import org.apache.beam.sdk.values.PCollection.IsBounded; |
154 | 148 | import org.apache.beam.sdk.values.TimestampedValue;
|
155 | 149 | import org.apache.beam.sdk.values.WindowingStrategy;
|
156 | 150 | import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.InvalidProtocolBufferException;
|
@@ -1661,19 +1655,6 @@ public void testGetContainerImageForJobFromOptionWithPlaceholder() {
|
1661 | 1655 | }
|
1662 | 1656 | }
|
1663 | 1657 |
|
1664 |
| - @Test |
1665 |
| - public void testStreamingWriteWithNoShardingReturnsNewTransform() { |
1666 |
| - PipelineOptions options = TestPipeline.testingPipelineOptions(); |
1667 |
| - options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10); |
1668 |
| - testStreamingWriteOverride(options, 20); |
1669 |
| - } |
1670 |
| - |
1671 |
| - @Test |
1672 |
| - public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() { |
1673 |
| - PipelineOptions options = TestPipeline.testingPipelineOptions(); |
1674 |
| - testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS); |
1675 |
| - } |
1676 |
| - |
1677 | 1658 | private void verifyMergingStatefulParDoRejected(PipelineOptions options) throws Exception {
|
1678 | 1659 | Pipeline p = Pipeline.create(options);
|
1679 | 1660 |
|
@@ -1938,40 +1919,67 @@ public void testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
|
1938 | 1919 | verifyGroupIntoBatchesOverrideBytes(p, true, true);
|
1939 | 1920 | }
|
1940 | 1921 |
|
1941 |
| - private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) { |
1942 |
| - TestPipeline p = TestPipeline.fromOptions(options); |
1943 |
| - |
1944 |
| - StreamingShardedWriteFactory<Object, Void, Object> factory = |
1945 |
| - new StreamingShardedWriteFactory<>(p.getOptions()); |
1946 |
| - WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString())); |
1947 |
| - PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); |
1948 |
| - AppliedPTransform<PCollection<Object>, WriteFilesResult<Void>, WriteFiles<Object, Void, Object>> |
1949 |
| - originalApplication = |
1950 |
| - AppliedPTransform.of( |
1951 |
| - "writefiles", |
1952 |
| - PValues.expandInput(objs), |
1953 |
| - Collections.emptyMap(), |
1954 |
| - original, |
1955 |
| - ResourceHints.create(), |
1956 |
| - p); |
1957 |
| - |
1958 |
| - WriteFiles<Object, Void, Object> replacement = |
1959 |
| - (WriteFiles<Object, Void, Object>) |
1960 |
| - factory.getReplacementTransform(originalApplication).getTransform(); |
1961 |
| - assertThat(replacement, not(equalTo((Object) original))); |
1962 |
| - assertThat(replacement.getNumShardsProvider().get(), equalTo(expectedNumShards)); |
1963 |
| - |
1964 |
| - WriteFilesResult<Void> originalResult = objs.apply(original); |
1965 |
| - WriteFilesResult<Void> replacementResult = objs.apply(replacement); |
1966 |
| - Map<PCollection<?>, ReplacementOutput> res = |
1967 |
| - factory.mapOutputs(PValues.expandOutput(originalResult), replacementResult); |
1968 |
| - assertEquals(1, res.size()); |
1969 |
| - assertEquals( |
1970 |
| - originalResult.getPerDestinationOutputFilenames(), |
1971 |
| - res.get(replacementResult.getPerDestinationOutputFilenames()).getOriginal().getValue()); |
| 1922 | + @Test |
| 1923 | + public void testStreamingWriteWithRunnerDeterminedSharding() throws IOException { |
| 1924 | + PipelineOptions options = buildPipelineOptions(); |
| 1925 | + options.as(StreamingOptions.class).setStreaming(true); |
| 1926 | + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); |
| 1927 | + ExperimentalOptions.addExperiment(dataflowOptions, "enable_streaming_engine"); |
| 1928 | + Pipeline p = Pipeline.create(options); |
| 1929 | + testStreamingWriteFilesOverride(p, 0); |
| 1930 | + } |
| 1931 | + |
| 1932 | + @Test |
| 1933 | + public void testStreamingWriteWithFixedNumShards() throws IOException { |
| 1934 | + PipelineOptions options = buildPipelineOptions(); |
| 1935 | + options.as(StreamingOptions.class).setStreaming(true); |
| 1936 | + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); |
| 1937 | + ExperimentalOptions.addExperiment(dataflowOptions, "enable_streaming_engine"); |
| 1938 | + Pipeline p = Pipeline.create(options); |
| 1939 | + testStreamingWriteFilesOverride(p, 10); |
| 1940 | + } |
| 1941 | + |
| 1942 | + private void testStreamingWriteFilesOverride(Pipeline p, int numFileShards) { |
| 1943 | + List<String> testValues = Arrays.asList("A", "C", "123", "foo"); |
| 1944 | + PCollection<String> input = p.apply(Create.of(testValues)); |
| 1945 | + WriteFiles<String, Void, String> write = |
| 1946 | + WriteFiles.<String, Void, String>to(new TestSink<>(tmpFolder.toString())) |
| 1947 | + .withWindowedWrites(); |
| 1948 | + boolean withRunnerDeterminedSharding = numFileShards == 0; |
| 1949 | + if (withRunnerDeterminedSharding) { |
| 1950 | + write = write.withRunnerDeterminedSharding(); |
| 1951 | + } else { |
| 1952 | + write = write.withNumShards(numFileShards); |
| 1953 | + } |
| 1954 | + input.setIsBoundedInternal(IsBounded.UNBOUNDED); |
| 1955 | + input.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))).apply(write); |
| 1956 | + p.run(); |
| 1957 | + |
| 1958 | + p.traverseTopologically( |
| 1959 | + new PipelineVisitor.Defaults() { |
| 1960 | + |
| 1961 | + @Override |
| 1962 | + public CompositeBehavior enterCompositeTransform(Node node) { |
| 1963 | + if (!(node.getTransform() instanceof WriteFiles)) { |
| 1964 | + return CompositeBehavior.ENTER_TRANSFORM; |
| 1965 | + } |
| 1966 | + |
| 1967 | + if (p.getOptions().as(StreamingOptions.class).isStreaming()) { |
| 1968 | + if (withRunnerDeterminedSharding) { |
| 1969 | + assertThat( |
| 1970 | + ((WriteFiles) node.getTransform()).getNumShardsProvider(), equalTo(null)); |
| 1971 | + } else { |
| 1972 | + assertThat( |
| 1973 | + ((WriteFiles) node.getTransform()).getNumShardsProvider().get(), |
| 1974 | + equalTo(numFileShards)); |
| 1975 | + } |
| 1976 | + } |
| 1977 | + return CompositeBehavior.ENTER_TRANSFORM; |
| 1978 | + } |
| 1979 | + }); |
1972 | 1980 | }
|
1973 | 1981 |
|
1974 |
| - private static class TestSink extends FileBasedSink<Object, Void, Object> { |
| 1982 | + private static class TestSink<UserT, OutputT> extends FileBasedSink<UserT, Void, OutputT> { |
1975 | 1983 |
|
1976 | 1984 | @Override
|
1977 | 1985 | public void validate(PipelineOptions options) {}
|
@@ -2001,10 +2009,10 @@ public ResourceId windowedFilename(
|
2001 | 2009 | }
|
2002 | 2010 |
|
2003 | 2011 | @Override
|
2004 |
| - public WriteOperation<Void, Object> createWriteOperation() { |
2005 |
| - return new WriteOperation<Void, Object>(this) { |
| 2012 | + public WriteOperation<Void, OutputT> createWriteOperation() { |
| 2013 | + return new WriteOperation<Void, OutputT>(this) { |
2006 | 2014 | @Override
|
2007 |
| - public Writer<Void, Object> createWriter() { |
| 2015 | + public Writer<Void, OutputT> createWriter() { |
2008 | 2016 | throw new UnsupportedOperationException();
|
2009 | 2017 | }
|
2010 | 2018 | };
|
|
0 commit comments