Skip to content

Commit ee32c23

Browse files
authored
[BEAM-11934] Remove Dataflow override of streaming WriteFiles with runner determined sharding (apache#15178)
* Remove Dataflow override of streaming WriteFiles * Update the documentation in FileIO * spotless * Fix checkStyle
1 parent 4fb05be commit ee32c23

File tree

3 files changed

+68
-134
lines changed

3 files changed

+68
-134
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static java.nio.charset.StandardCharsets.UTF_8;
2121
import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
22-
import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
2322
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
2423
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
2524
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
@@ -79,7 +78,6 @@
7978
import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
8079
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
8180
import org.apache.beam.runners.core.construction.UnconsumedReads;
82-
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
8381
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
8482
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
8583
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
@@ -103,12 +101,9 @@
103101
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
104102
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
105103
import org.apache.beam.sdk.io.BoundedSource;
106-
import org.apache.beam.sdk.io.FileBasedSink;
107104
import org.apache.beam.sdk.io.FileSystems;
108105
import org.apache.beam.sdk.io.Read;
109106
import org.apache.beam.sdk.io.UnboundedSource;
110-
import org.apache.beam.sdk.io.WriteFiles;
111-
import org.apache.beam.sdk.io.WriteFilesResult;
112107
import org.apache.beam.sdk.io.fs.ResourceId;
113108
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
114109
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
@@ -521,11 +516,6 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
521516

522517
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
523518

524-
overridesBuilder.add(
525-
PTransformOverride.of(
526-
PTransformMatchers.writeWithRunnerDeterminedSharding(),
527-
new StreamingShardedWriteFactory(options)));
528-
529519
overridesBuilder.add(
530520
PTransformOverride.of(
531521
PTransformMatchers.groupWithShardableStates(),
@@ -2110,69 +2100,6 @@ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
21102100
}
21112101
}
21122102

2113-
@VisibleForTesting
2114-
static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
2115-
implements PTransformOverrideFactory<
2116-
PCollection<UserT>,
2117-
WriteFilesResult<DestinationT>,
2118-
WriteFiles<UserT, DestinationT, OutputT>> {
2119-
2120-
// We pick 10 as a a default, as it works well with the default number of workers started
2121-
// by Dataflow.
2122-
static final int DEFAULT_NUM_SHARDS = 10;
2123-
DataflowPipelineWorkerPoolOptions options;
2124-
2125-
StreamingShardedWriteFactory(PipelineOptions options) {
2126-
this.options = options.as(DataflowPipelineWorkerPoolOptions.class);
2127-
}
2128-
2129-
@Override
2130-
public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
2131-
getReplacementTransform(
2132-
AppliedPTransform<
2133-
PCollection<UserT>,
2134-
WriteFilesResult<DestinationT>,
2135-
WriteFiles<UserT, DestinationT, OutputT>>
2136-
transform) {
2137-
// By default, if numShards is not set WriteFiles will produce one file per bundle. In
2138-
// streaming, there are large numbers of small bundles, resulting in many tiny files.
2139-
// Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
2140-
// (current_num_workers * 2 might be a better choice, but that value is not easily available
2141-
// today).
2142-
// If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
2143-
int numShards;
2144-
if (options.getMaxNumWorkers() > 0) {
2145-
numShards = options.getMaxNumWorkers() * 2;
2146-
} else if (options.getNumWorkers() > 0) {
2147-
numShards = options.getNumWorkers() * 2;
2148-
} else {
2149-
numShards = DEFAULT_NUM_SHARDS;
2150-
}
2151-
2152-
try {
2153-
List<PCollectionView<?>> sideInputs =
2154-
WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
2155-
FileBasedSink sink = WriteFilesTranslation.getSink(transform);
2156-
WriteFiles<UserT, DestinationT, OutputT> replacement =
2157-
WriteFiles.to(sink).withSideInputs(sideInputs);
2158-
if (WriteFilesTranslation.isWindowedWrites(transform)) {
2159-
replacement = replacement.withWindowedWrites();
2160-
}
2161-
return PTransformReplacement.of(
2162-
PTransformReplacements.getSingletonMainInput(transform),
2163-
replacement.withNumShards(numShards));
2164-
} catch (Exception e) {
2165-
throw new RuntimeException(e);
2166-
}
2167-
}
2168-
2169-
@Override
2170-
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
2171-
Map<TupleTag<?>, PCollection<?>> outputs, WriteFilesResult<DestinationT> newOutput) {
2172-
return ReplacementOutputs.tagged(outputs, newOutput);
2173-
}
2174-
}
2175-
21762103
@VisibleForTesting
21772104
static String getContainerImageForJob(DataflowPipelineOptions options) {
21782105
String containerImage = options.getSdkContainerImage();

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java

Lines changed: 64 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import static org.hamcrest.Matchers.hasProperty;
3131
import static org.hamcrest.Matchers.is;
3232
import static org.hamcrest.Matchers.lessThanOrEqualTo;
33-
import static org.hamcrest.Matchers.not;
3433
import static org.junit.Assert.assertEquals;
3534
import static org.junit.Assert.assertFalse;
3635
import static org.junit.Assert.assertNotNull;
@@ -90,7 +89,6 @@
9089
import org.apache.beam.runners.core.construction.Environments;
9190
import org.apache.beam.runners.core.construction.PipelineTranslation;
9291
import org.apache.beam.runners.core.construction.SdkComponents;
93-
import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
9492
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
9593
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
9694
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -99,7 +97,6 @@
9997
import org.apache.beam.sdk.Pipeline;
10098
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
10199
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
102-
import org.apache.beam.sdk.coders.VoidCoder;
103100
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
104101
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
105102
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
@@ -111,7 +108,6 @@
111108
import org.apache.beam.sdk.io.FileSystems;
112109
import org.apache.beam.sdk.io.TextIO;
113110
import org.apache.beam.sdk.io.WriteFiles;
114-
import org.apache.beam.sdk.io.WriteFilesResult;
115111
import org.apache.beam.sdk.io.fs.ResourceId;
116112
import org.apache.beam.sdk.options.ExperimentalOptions;
117113
import org.apache.beam.sdk.options.PipelineOptions;
@@ -120,8 +116,6 @@
120116
import org.apache.beam.sdk.options.StreamingOptions;
121117
import org.apache.beam.sdk.options.ValueProvider;
122118
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
123-
import org.apache.beam.sdk.runners.AppliedPTransform;
124-
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
125119
import org.apache.beam.sdk.runners.TransformHierarchy;
126120
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
127121
import org.apache.beam.sdk.state.MapState;
@@ -142,15 +136,15 @@
142136
import org.apache.beam.sdk.transforms.ParDo;
143137
import org.apache.beam.sdk.transforms.SerializableFunctions;
144138
import org.apache.beam.sdk.transforms.SimpleFunction;
145-
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
146139
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
140+
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
147141
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
148142
import org.apache.beam.sdk.transforms.windowing.Sessions;
149143
import org.apache.beam.sdk.transforms.windowing.Window;
150144
import org.apache.beam.sdk.util.ShardedKey;
151145
import org.apache.beam.sdk.values.KV;
152146
import org.apache.beam.sdk.values.PCollection;
153-
import org.apache.beam.sdk.values.PValues;
147+
import org.apache.beam.sdk.values.PCollection.IsBounded;
154148
import org.apache.beam.sdk.values.TimestampedValue;
155149
import org.apache.beam.sdk.values.WindowingStrategy;
156150
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.InvalidProtocolBufferException;
@@ -1661,19 +1655,6 @@ public void testGetContainerImageForJobFromOptionWithPlaceholder() {
16611655
}
16621656
}
16631657

1664-
@Test
1665-
public void testStreamingWriteWithNoShardingReturnsNewTransform() {
1666-
PipelineOptions options = TestPipeline.testingPipelineOptions();
1667-
options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10);
1668-
testStreamingWriteOverride(options, 20);
1669-
}
1670-
1671-
@Test
1672-
public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() {
1673-
PipelineOptions options = TestPipeline.testingPipelineOptions();
1674-
testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS);
1675-
}
1676-
16771658
private void verifyMergingStatefulParDoRejected(PipelineOptions options) throws Exception {
16781659
Pipeline p = Pipeline.create(options);
16791660

@@ -1938,40 +1919,67 @@ public void testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
19381919
verifyGroupIntoBatchesOverrideBytes(p, true, true);
19391920
}
19401921

1941-
private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
1942-
TestPipeline p = TestPipeline.fromOptions(options);
1943-
1944-
StreamingShardedWriteFactory<Object, Void, Object> factory =
1945-
new StreamingShardedWriteFactory<>(p.getOptions());
1946-
WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
1947-
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
1948-
AppliedPTransform<PCollection<Object>, WriteFilesResult<Void>, WriteFiles<Object, Void, Object>>
1949-
originalApplication =
1950-
AppliedPTransform.of(
1951-
"writefiles",
1952-
PValues.expandInput(objs),
1953-
Collections.emptyMap(),
1954-
original,
1955-
ResourceHints.create(),
1956-
p);
1957-
1958-
WriteFiles<Object, Void, Object> replacement =
1959-
(WriteFiles<Object, Void, Object>)
1960-
factory.getReplacementTransform(originalApplication).getTransform();
1961-
assertThat(replacement, not(equalTo((Object) original)));
1962-
assertThat(replacement.getNumShardsProvider().get(), equalTo(expectedNumShards));
1963-
1964-
WriteFilesResult<Void> originalResult = objs.apply(original);
1965-
WriteFilesResult<Void> replacementResult = objs.apply(replacement);
1966-
Map<PCollection<?>, ReplacementOutput> res =
1967-
factory.mapOutputs(PValues.expandOutput(originalResult), replacementResult);
1968-
assertEquals(1, res.size());
1969-
assertEquals(
1970-
originalResult.getPerDestinationOutputFilenames(),
1971-
res.get(replacementResult.getPerDestinationOutputFilenames()).getOriginal().getValue());
1922+
@Test
1923+
public void testStreamingWriteWithRunnerDeterminedSharding() throws IOException {
1924+
PipelineOptions options = buildPipelineOptions();
1925+
options.as(StreamingOptions.class).setStreaming(true);
1926+
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
1927+
ExperimentalOptions.addExperiment(dataflowOptions, "enable_streaming_engine");
1928+
Pipeline p = Pipeline.create(options);
1929+
testStreamingWriteFilesOverride(p, 0);
1930+
}
1931+
1932+
@Test
1933+
public void testStreamingWriteWithFixedNumShards() throws IOException {
1934+
PipelineOptions options = buildPipelineOptions();
1935+
options.as(StreamingOptions.class).setStreaming(true);
1936+
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
1937+
ExperimentalOptions.addExperiment(dataflowOptions, "enable_streaming_engine");
1938+
Pipeline p = Pipeline.create(options);
1939+
testStreamingWriteFilesOverride(p, 10);
1940+
}
1941+
1942+
private void testStreamingWriteFilesOverride(Pipeline p, int numFileShards) {
1943+
List<String> testValues = Arrays.asList("A", "C", "123", "foo");
1944+
PCollection<String> input = p.apply(Create.of(testValues));
1945+
WriteFiles<String, Void, String> write =
1946+
WriteFiles.<String, Void, String>to(new TestSink<>(tmpFolder.toString()))
1947+
.withWindowedWrites();
1948+
boolean withRunnerDeterminedSharding = numFileShards == 0;
1949+
if (withRunnerDeterminedSharding) {
1950+
write = write.withRunnerDeterminedSharding();
1951+
} else {
1952+
write = write.withNumShards(numFileShards);
1953+
}
1954+
input.setIsBoundedInternal(IsBounded.UNBOUNDED);
1955+
input.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))).apply(write);
1956+
p.run();
1957+
1958+
p.traverseTopologically(
1959+
new PipelineVisitor.Defaults() {
1960+
1961+
@Override
1962+
public CompositeBehavior enterCompositeTransform(Node node) {
1963+
if (!(node.getTransform() instanceof WriteFiles)) {
1964+
return CompositeBehavior.ENTER_TRANSFORM;
1965+
}
1966+
1967+
if (p.getOptions().as(StreamingOptions.class).isStreaming()) {
1968+
if (withRunnerDeterminedSharding) {
1969+
assertThat(
1970+
((WriteFiles) node.getTransform()).getNumShardsProvider(), equalTo(null));
1971+
} else {
1972+
assertThat(
1973+
((WriteFiles) node.getTransform()).getNumShardsProvider().get(),
1974+
equalTo(numFileShards));
1975+
}
1976+
}
1977+
return CompositeBehavior.ENTER_TRANSFORM;
1978+
}
1979+
});
19721980
}
19731981

1974-
private static class TestSink extends FileBasedSink<Object, Void, Object> {
1982+
private static class TestSink<UserT, OutputT> extends FileBasedSink<UserT, Void, OutputT> {
19751983

19761984
@Override
19771985
public void validate(PipelineOptions options) {}
@@ -2001,10 +2009,10 @@ public ResourceId windowedFilename(
20012009
}
20022010

20032011
@Override
2004-
public WriteOperation<Void, Object> createWriteOperation() {
2005-
return new WriteOperation<Void, Object>(this) {
2012+
public WriteOperation<Void, OutputT> createWriteOperation() {
2013+
return new WriteOperation<Void, OutputT>(this) {
20062014
@Override
2007-
public Writer<Void, Object> createWriter() {
2015+
public Writer<Void, OutputT> createWriter() {
20082016
throw new UnsupportedOperationException();
20092017
}
20102018
};

sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,10 @@
159159
* <li><b>How many shards are generated per pane:</b> This is controlled by <i>sharding</i>, using
160160
* {@link Write#withNumShards} or {@link Write#withSharding}. The default is runner-specific,
161161
* so the number of shards will vary based on runner behavior, though at least 1 shard will
162-
* always be produced for every non-empty pane. Note that setting a fixed number of shards can
163-
* hurt performance: it adds an additional {@link GroupByKey} to the pipeline. However, it is
164-
* required to set it when writing an unbounded {@link PCollection} due to <a
165-
* href="https://issues.apache.org/jira/browse/BEAM-1438">BEAM-1438</a> and similar behavior
166-
* in other runners.
162+
* always be produced for every non-empty pane. Runner-determined sharding is available for
163+
* both bounded and unbounded data; support for unbounded data is limited (<a
164+
* href="https://issues.apache.org/jira/browse/BEAM-12040">BEAM-12040</a>) and depends on the
165+
* runners.
167166
* <li><b>How the shards are named:</b> This is controlled by a {@link Write.FileNaming}:
168167
* filenames can depend on a variety of inputs, e.g. the window, the pane, total number of
169168
* shards, the current file's shard index, and compression. Controlling the file naming is

0 commit comments

Comments
 (0)