Skip to content

Commit 69822e4

Browse files
authored
[BEAM-7195] BQ BatchLoads doesn't always create new tables (apache#14238)
1 parent 0e12dd0 commit 69822e4

File tree

6 files changed

+222
-85
lines changed

6 files changed

+222
-85
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@
3030
import org.apache.beam.sdk.coders.Coder;
3131
import org.apache.beam.sdk.coders.IterableCoder;
3232
import org.apache.beam.sdk.coders.KvCoder;
33-
import org.apache.beam.sdk.coders.ListCoder;
3433
import org.apache.beam.sdk.coders.NullableCoder;
3534
import org.apache.beam.sdk.coders.ShardedKeyCoder;
36-
import org.apache.beam.sdk.coders.StringUtf8Coder;
3735
import org.apache.beam.sdk.coders.VoidCoder;
3836
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
3937
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -147,8 +145,8 @@ class BatchLoads<DestinationT, ElementT>
147145
private ValueProvider<String> loadJobProjectId;
148146
private final Coder<ElementT> elementCoder;
149147
private final RowWriterFactory<ElementT, DestinationT> rowWriterFactory;
150-
private String kmsKey;
151-
private boolean clusteringEnabled;
148+
private final String kmsKey;
149+
private final boolean clusteringEnabled;
152150

153151
// The maximum number of times to retry failed load or copy jobs.
154152
private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS;
@@ -274,6 +272,8 @@ public void validate(PipelineOptions options) {
274272
private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
275273
Pipeline p = input.getPipeline();
276274
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
275+
final PCollectionView<String> tempLoadJobIdPrefixView =
276+
createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD);
277277
final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
278278
final PCollectionView<String> tempFilePrefixView =
279279
createTempFilePrefixView(p, loadJobIdPrefixView);
@@ -321,20 +321,20 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
321321
.plusDelayOf(triggeringFrequency)))
322322
.discardingFiredPanes());
323323

324-
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
324+
TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag =
325325
new TupleTag<>("multiPartitionsTag");
326-
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
326+
TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag =
327327
new TupleTag<>("singlePartitionTag");
328328

329329
// If we have non-default triggered output, we can't use the side-input technique used in
330330
// expandUntriggered. Instead make the result list a main input. Apply a GroupByKey first for
331331
// determinism.
332332
PCollectionTuple partitions =
333333
results
334-
.apply("AttachSingletonKey", WithKeys.of((Void) null))
334+
.apply("AttachDestinationKey", WithKeys.of(result -> result.destination))
335335
.setCoder(
336-
KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
337-
.apply("GroupOntoSingleton", GroupByKey.create())
336+
KvCoder.of(destinationCoder, WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
337+
.apply("GroupFilesByDestination", GroupByKey.create())
338338
.apply("ExtractResultValues", Values.create())
339339
.apply(
340340
"WritePartitionTriggered",
@@ -350,14 +350,14 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
350350
rowWriterFactory))
351351
.withSideInputs(tempFilePrefixView)
352352
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
353-
PCollection<KV<TableDestination, String>> tempTables =
354-
writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
353+
PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
354+
writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);
355355

356356
tempTables
357357
// Now that the load job has happened, we want the rename to happen immediately.
358358
.apply(
359359
"Window Into Global Windows",
360-
Window.<KV<TableDestination, String>>into(new GlobalWindows())
360+
Window.<KV<TableDestination, WriteTables.Result>>into(new GlobalWindows())
361361
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
362362
.apply("Add Void Key", WithKeys.of((Void) null))
363363
.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
@@ -382,6 +382,9 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
382382
public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> input) {
383383
Pipeline p = input.getPipeline();
384384
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
385+
final PCollectionView<String> tempLoadJobIdPrefixView =
386+
createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD);
387+
final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
385388
final PCollectionView<String> tempFilePrefixView =
386389
createTempFilePrefixView(p, loadJobIdPrefixView);
387390
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
@@ -395,10 +398,10 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
395398
? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow, tempFilePrefixView)
396399
: writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView);
397400

398-
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
399-
new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
400-
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
401-
new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
401+
TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag =
402+
new TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>("multiPartitionsTag") {};
403+
TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag =
404+
new TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>("singlePartitionTag") {};
402405

403406
// This transform will look at the set of files written for each table, and if any table has
404407
// too many files or bytes, will partition that table's files into multiple partitions for
@@ -421,8 +424,8 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
421424
rowWriterFactory))
422425
.withSideInputs(tempFilePrefixView)
423426
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
424-
PCollection<KV<TableDestination, String>> tempTables =
425-
writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
427+
PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
428+
writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);
426429

427430
tempTables
428431
.apply("ReifyRenameInput", new ReifyAsIterable<>())
@@ -431,7 +434,7 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
431434
ParDo.of(
432435
new WriteRename(
433436
bigQueryServices,
434-
loadJobIdPrefixView,
437+
copyJobIdPrefixView,
435438
writeDisposition,
436439
createDisposition,
437440
maxRetryJobs,
@@ -637,23 +640,22 @@ public KV<DestinationT, Iterable<ElementT>> apply(
637640
.apply(
638641
"WriteGroupedRecords",
639642
ParDo.of(
640-
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
641-
tempFilePrefix, maxFileSize, rowWriterFactory))
643+
new WriteGroupedRecordsToFiles<>(tempFilePrefix, maxFileSize, rowWriterFactory))
642644
.withSideInputs(tempFilePrefix))
643645
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
644646
}
645647

646648
// Take in a list of files and write them to temporary tables.
647-
private PCollection<KV<TableDestination, String>> writeTempTables(
648-
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
649+
private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
650+
PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input,
649651
PCollectionView<String> jobIdTokenView) {
650652
List<PCollectionView<?>> sideInputs = Lists.newArrayList(jobIdTokenView);
651653
sideInputs.addAll(dynamicDestinations.getSideInputs());
652654

653-
Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
655+
Coder<KV<ShardedKey<DestinationT>, WritePartition.Result>> partitionsCoder =
654656
KvCoder.of(
655657
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
656-
ListCoder.of(StringUtf8Coder.of()));
658+
WritePartition.ResultCoder.INSTANCE);
657659

658660
// If the final destination table exists already (and we're appending to it), then the temp
659661
// tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
@@ -695,20 +697,24 @@ private PCollection<KV<TableDestination, String>> writeTempTables(
695697
rowWriterFactory.getSourceFormat(),
696698
useAvroLogicalTypes,
697699
schemaUpdateOptions))
698-
.setCoder(KvCoder.of(tableDestinationCoder, StringUtf8Coder.of()));
700+
.setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
699701
}
700702

701703
// In the case where the files fit into a single load job, there's no need to write temporary
702704
// tables and rename. We can load these files directly into the target BigQuery table.
703705
void writeSinglePartition(
704-
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
706+
PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input,
705707
PCollectionView<String> loadJobIdPrefixView) {
706708
List<PCollectionView<?>> sideInputs = Lists.newArrayList(loadJobIdPrefixView);
707709
sideInputs.addAll(dynamicDestinations.getSideInputs());
708-
Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
710+
711+
Coder<TableDestination> tableDestinationCoder =
712+
clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();
713+
714+
Coder<KV<ShardedKey<DestinationT>, WritePartition.Result>> partitionsCoder =
709715
KvCoder.of(
710716
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
711-
ListCoder.of(StringUtf8Coder.of()));
717+
WritePartition.ResultCoder.INSTANCE);
712718
// Write single partition to final table
713719
input
714720
.setCoder(partitionsCoder)
@@ -731,7 +737,8 @@ void writeSinglePartition(
731737
kmsKey,
732738
rowWriterFactory.getSourceFormat(),
733739
useAvroLogicalTypes,
734-
schemaUpdateOptions));
740+
schemaUpdateOptions))
741+
.setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
735742
}
736743

737744
private WriteResult writeResult(Pipeline p) {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ static String createJobIdWithDestination(
6969

7070
public enum JobType {
7171
LOAD,
72+
TEMP_TABLE_LOAD,
7273
COPY,
7374
EXPORT,
7475
QUERY,

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,17 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import com.google.auto.value.AutoValue;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.OutputStream;
2024
import java.util.List;
2125
import java.util.Map;
26+
import org.apache.beam.sdk.coders.AtomicCoder;
27+
import org.apache.beam.sdk.coders.BooleanCoder;
28+
import org.apache.beam.sdk.coders.Coder;
29+
import org.apache.beam.sdk.coders.ListCoder;
30+
import org.apache.beam.sdk.coders.StringUtf8Coder;
2231
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
2332
import org.apache.beam.sdk.transforms.DoFn;
2433
import org.apache.beam.sdk.values.KV;
@@ -39,16 +48,42 @@
3948
class WritePartition<DestinationT>
4049
extends DoFn<
4150
Iterable<WriteBundlesToFiles.Result<DestinationT>>,
42-
KV<ShardedKey<DestinationT>, List<String>>> {
51+
KV<ShardedKey<DestinationT>, WritePartition.Result>> {
52+
@AutoValue
53+
abstract static class Result {
54+
public abstract List<String> getFilenames();
55+
56+
abstract Boolean isFirstPane();
57+
}
58+
59+
static class ResultCoder extends AtomicCoder<Result> {
60+
private static final Coder<List<String>> FILENAMES_CODER = ListCoder.of(StringUtf8Coder.of());
61+
private static final Coder<Boolean> FIRST_PANE_CODER = BooleanCoder.of();
62+
static final ResultCoder INSTANCE = new ResultCoder();
63+
64+
@Override
65+
public void encode(Result value, OutputStream outStream) throws IOException {
66+
FILENAMES_CODER.encode(value.getFilenames(), outStream);
67+
FIRST_PANE_CODER.encode(value.isFirstPane(), outStream);
68+
}
69+
70+
@Override
71+
public Result decode(InputStream inStream) throws IOException {
72+
return new AutoValue_WritePartition_Result(
73+
FILENAMES_CODER.decode(inStream), FIRST_PANE_CODER.decode(inStream));
74+
}
75+
}
76+
4377
private final boolean singletonTable;
4478
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
4579
private final PCollectionView<String> tempFilePrefix;
4680
private final int maxNumFiles;
4781
private final long maxSizeBytes;
4882
private final RowWriterFactory<?, DestinationT> rowWriterFactory;
4983

50-
private @Nullable TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag;
51-
private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag;
84+
private @Nullable TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>
85+
multiPartitionsTag;
86+
private TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag;
5287

5388
private static class PartitionData {
5489
private int numFiles = 0;
@@ -131,8 +166,8 @@ void addPartition(PartitionData partition) {
131166
PCollectionView<String> tempFilePrefix,
132167
int maxNumFiles,
133168
long maxSizeBytes,
134-
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag,
135-
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag,
169+
TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag,
170+
TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag,
136171
RowWriterFactory<?, DestinationT> rowWriterFactory) {
137172
this.singletonTable = singletonTable;
138173
this.dynamicDestinations = dynamicDestinations;
@@ -147,7 +182,6 @@ void addPartition(PartitionData partition) {
147182
@ProcessElement
148183
public void processElement(ProcessContext c) throws Exception {
149184
List<WriteBundlesToFiles.Result<DestinationT>> results = Lists.newArrayList(c.element());
150-
151185
// If there are no elements to write _and_ the user specified a constant output table, then
152186
// generate an empty table of that name.
153187
if (results.isEmpty() && singletonTable) {
@@ -161,7 +195,8 @@ public void processElement(ProcessContext c) throws Exception {
161195
BigQueryRowWriter.Result writerResult = writer.getResult();
162196

163197
results.add(
164-
new Result<>(writerResult.resourceId.toString(), writerResult.byteSize, destination));
198+
new WriteBundlesToFiles.Result<>(
199+
writerResult.resourceId.toString(), writerResult.byteSize, destination));
165200
}
166201

167202
Map<DestinationT, DestinationData> currentResults = Maps.newHashMap();
@@ -190,11 +225,16 @@ public void processElement(ProcessContext c) throws Exception {
190225
// In the fast-path case where we only output one table, the transform loads it directly
191226
// to the final table. In this case, we output on a special TupleTag so the enclosing
192227
// transform knows to skip the rename step.
193-
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> outputTag =
228+
TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> outputTag =
194229
(destinationData.getPartitions().size() == 1) ? singlePartitionTag : multiPartitionsTag;
195230
for (int i = 0; i < destinationData.getPartitions().size(); ++i) {
196231
PartitionData partitionData = destinationData.getPartitions().get(i);
197-
c.output(outputTag, KV.of(ShardedKey.of(destination, i + 1), partitionData.getFilenames()));
232+
c.output(
233+
outputTag,
234+
KV.of(
235+
ShardedKey.of(destination, i + 1),
236+
new AutoValue_WritePartition_Result(
237+
partitionData.getFilenames(), c.pane().isFirst())));
198238
}
199239
}
200240
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.beam.sdk.values.KV;
4040
import org.apache.beam.sdk.values.PCollectionView;
4141
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
42+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
4243
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
4344
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
4445
import org.slf4j.Logger;
@@ -51,7 +52,7 @@
5152
@SuppressWarnings({
5253
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
5354
})
54-
class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
55+
class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result>>, Void> {
5556
private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);
5657

5758
private final BigQueryServices bqServices;
@@ -116,12 +117,15 @@ public void onTeardown() {
116117
}
117118

118119
@ProcessElement
119-
public void processElement(ProcessContext c) throws Exception {
120-
Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
121-
for (KV<TableDestination, String> entry : c.element()) {
120+
public void processElement(
121+
@Element Iterable<KV<TableDestination, WriteTables.Result>> element, ProcessContext c)
122+
throws Exception {
123+
Multimap<TableDestination, WriteTables.Result> tempTables = ArrayListMultimap.create();
124+
for (KV<TableDestination, WriteTables.Result> entry : element) {
122125
tempTables.put(entry.getKey(), entry.getValue());
123126
}
124-
for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) {
127+
for (Map.Entry<TableDestination, Collection<WriteTables.Result>> entry :
128+
tempTables.asMap().entrySet()) {
125129
// Process each destination table.
126130
// Do not copy if no temp tables are provided.
127131
if (!entry.getValue().isEmpty()) {
@@ -165,17 +169,27 @@ private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws
165169
}
166170

167171
private PendingJobData startWriteRename(
168-
TableDestination finalTableDestination, Iterable<String> tempTableNames, ProcessContext c)
172+
TableDestination finalTableDestination,
173+
Iterable<WriteTables.Result> tempTableNames,
174+
ProcessContext c)
169175
throws Exception {
176+
// The pane may have advanced either here due to triggering or due to an upstream trigger. We
177+
// check the upstream
178+
// trigger to handle the case where an earlier pane triggered the single-partition path. If this
179+
// happened, then the
180+
// table will already exist so we want to append to the table.
181+
boolean isFirstPane =
182+
Iterables.getFirst(tempTableNames, null).isFirstPane() && c.pane().isFirst();
170183
WriteDisposition writeDisposition =
171-
(c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
184+
isFirstPane ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
172185
CreateDisposition createDisposition =
173-
(c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
186+
isFirstPane ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
174187
List<TableReference> tempTables =
175188
StreamSupport.stream(tempTableNames.spliterator(), false)
176-
.map(table -> BigQueryHelpers.fromJsonString(table, TableReference.class))
189+
.map(
190+
result ->
191+
BigQueryHelpers.fromJsonString(result.getTableName(), TableReference.class))
177192
.collect(Collectors.toList());
178-
;
179193

180194
// Make sure each destination table gets a unique job id.
181195
String jobIdPrefix =

0 commit comments

Comments
 (0)