30
30
import org .apache .beam .sdk .coders .Coder ;
31
31
import org .apache .beam .sdk .coders .IterableCoder ;
32
32
import org .apache .beam .sdk .coders .KvCoder ;
33
- import org .apache .beam .sdk .coders .ListCoder ;
34
33
import org .apache .beam .sdk .coders .NullableCoder ;
35
34
import org .apache .beam .sdk .coders .ShardedKeyCoder ;
36
- import org .apache .beam .sdk .coders .StringUtf8Coder ;
37
35
import org .apache .beam .sdk .coders .VoidCoder ;
38
36
import org .apache .beam .sdk .extensions .gcp .util .gcsfs .GcsPath ;
39
37
import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO .Write .CreateDisposition ;
@@ -147,8 +145,8 @@ class BatchLoads<DestinationT, ElementT>
147
145
private ValueProvider <String > loadJobProjectId ;
148
146
private final Coder <ElementT > elementCoder ;
149
147
private final RowWriterFactory <ElementT , DestinationT > rowWriterFactory ;
150
- private String kmsKey ;
151
- private boolean clusteringEnabled ;
148
+ private final String kmsKey ;
149
+ private final boolean clusteringEnabled ;
152
150
153
151
// The maximum number of times to retry failed load or copy jobs.
154
152
private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS ;
@@ -274,6 +272,8 @@ public void validate(PipelineOptions options) {
274
272
private WriteResult expandTriggered (PCollection <KV <DestinationT , ElementT >> input ) {
275
273
Pipeline p = input .getPipeline ();
276
274
final PCollectionView <String > loadJobIdPrefixView = createJobIdPrefixView (p , JobType .LOAD );
275
+ final PCollectionView <String > tempLoadJobIdPrefixView =
276
+ createJobIdPrefixView (p , JobType .TEMP_TABLE_LOAD );
277
277
final PCollectionView <String > copyJobIdPrefixView = createJobIdPrefixView (p , JobType .COPY );
278
278
final PCollectionView <String > tempFilePrefixView =
279
279
createTempFilePrefixView (p , loadJobIdPrefixView );
@@ -321,20 +321,20 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
321
321
.plusDelayOf (triggeringFrequency )))
322
322
.discardingFiredPanes ());
323
323
324
- TupleTag <KV <ShardedKey <DestinationT >, List < String > >> multiPartitionsTag =
324
+ TupleTag <KV <ShardedKey <DestinationT >, WritePartition . Result >> multiPartitionsTag =
325
325
new TupleTag <>("multiPartitionsTag" );
326
- TupleTag <KV <ShardedKey <DestinationT >, List < String > >> singlePartitionTag =
326
+ TupleTag <KV <ShardedKey <DestinationT >, WritePartition . Result >> singlePartitionTag =
327
327
new TupleTag <>("singlePartitionTag" );
328
328
329
329
// If we have non-default triggered output, we can't use the side-input technique used in
330
330
// expandUntriggered. Instead make the result list a main input. Apply a GroupByKey first for
331
331
// determinism.
332
332
PCollectionTuple partitions =
333
333
results
334
- .apply ("AttachSingletonKey " , WithKeys .of (( Void ) null ))
334
+ .apply ("AttachDestinationKey " , WithKeys .of (result -> result . destination ))
335
335
.setCoder (
336
- KvCoder .of (VoidCoder . of () , WriteBundlesToFiles .ResultCoder .of (destinationCoder )))
337
- .apply ("GroupOntoSingleton " , GroupByKey .create ())
336
+ KvCoder .of (destinationCoder , WriteBundlesToFiles .ResultCoder .of (destinationCoder )))
337
+ .apply ("GroupFilesByDestination " , GroupByKey .create ())
338
338
.apply ("ExtractResultValues" , Values .create ())
339
339
.apply (
340
340
"WritePartitionTriggered" ,
@@ -350,14 +350,14 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
350
350
rowWriterFactory ))
351
351
.withSideInputs (tempFilePrefixView )
352
352
.withOutputTags (multiPartitionsTag , TupleTagList .of (singlePartitionTag )));
353
- PCollection <KV <TableDestination , String >> tempTables =
354
- writeTempTables (partitions .get (multiPartitionsTag ), loadJobIdPrefixView );
353
+ PCollection <KV <TableDestination , WriteTables . Result >> tempTables =
354
+ writeTempTables (partitions .get (multiPartitionsTag ), tempLoadJobIdPrefixView );
355
355
356
356
tempTables
357
357
// Now that the load job has happened, we want the rename to happen immediately.
358
358
.apply (
359
359
"Window Into Global Windows" ,
360
- Window .<KV <TableDestination , String >>into (new GlobalWindows ())
360
+ Window .<KV <TableDestination , WriteTables . Result >>into (new GlobalWindows ())
361
361
.triggering (Repeatedly .forever (AfterPane .elementCountAtLeast (1 ))))
362
362
.apply ("Add Void Key" , WithKeys .of ((Void ) null ))
363
363
.setCoder (KvCoder .of (VoidCoder .of (), tempTables .getCoder ()))
@@ -382,6 +382,9 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
382
382
public WriteResult expandUntriggered (PCollection <KV <DestinationT , ElementT >> input ) {
383
383
Pipeline p = input .getPipeline ();
384
384
final PCollectionView <String > loadJobIdPrefixView = createJobIdPrefixView (p , JobType .LOAD );
385
+ final PCollectionView <String > tempLoadJobIdPrefixView =
386
+ createJobIdPrefixView (p , JobType .TEMP_TABLE_LOAD );
387
+ final PCollectionView <String > copyJobIdPrefixView = createJobIdPrefixView (p , JobType .COPY );
385
388
final PCollectionView <String > tempFilePrefixView =
386
389
createTempFilePrefixView (p , loadJobIdPrefixView );
387
390
PCollection <KV <DestinationT , ElementT >> inputInGlobalWindow =
@@ -395,10 +398,10 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
395
398
? writeDynamicallyShardedFilesUntriggered (inputInGlobalWindow , tempFilePrefixView )
396
399
: writeStaticallyShardedFiles (inputInGlobalWindow , tempFilePrefixView );
397
400
398
- TupleTag <KV <ShardedKey <DestinationT >, List < String > >> multiPartitionsTag =
399
- new TupleTag <KV <ShardedKey <DestinationT >, List < String > >>("multiPartitionsTag" ) {};
400
- TupleTag <KV <ShardedKey <DestinationT >, List < String > >> singlePartitionTag =
401
- new TupleTag <KV <ShardedKey <DestinationT >, List < String > >>("singlePartitionTag" ) {};
401
+ TupleTag <KV <ShardedKey <DestinationT >, WritePartition . Result >> multiPartitionsTag =
402
+ new TupleTag <KV <ShardedKey <DestinationT >, WritePartition . Result >>("multiPartitionsTag" ) {};
403
+ TupleTag <KV <ShardedKey <DestinationT >, WritePartition . Result >> singlePartitionTag =
404
+ new TupleTag <KV <ShardedKey <DestinationT >, WritePartition . Result >>("singlePartitionTag" ) {};
402
405
403
406
// This transform will look at the set of files written for each table, and if any table has
404
407
// too many files or bytes, will partition that table's files into multiple partitions for
@@ -421,8 +424,8 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
421
424
rowWriterFactory ))
422
425
.withSideInputs (tempFilePrefixView )
423
426
.withOutputTags (multiPartitionsTag , TupleTagList .of (singlePartitionTag )));
424
- PCollection <KV <TableDestination , String >> tempTables =
425
- writeTempTables (partitions .get (multiPartitionsTag ), loadJobIdPrefixView );
427
+ PCollection <KV <TableDestination , WriteTables . Result >> tempTables =
428
+ writeTempTables (partitions .get (multiPartitionsTag ), tempLoadJobIdPrefixView );
426
429
427
430
tempTables
428
431
.apply ("ReifyRenameInput" , new ReifyAsIterable <>())
@@ -431,7 +434,7 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
431
434
ParDo .of (
432
435
new WriteRename (
433
436
bigQueryServices ,
434
- loadJobIdPrefixView ,
437
+ copyJobIdPrefixView ,
435
438
writeDisposition ,
436
439
createDisposition ,
437
440
maxRetryJobs ,
@@ -637,23 +640,22 @@ public KV<DestinationT, Iterable<ElementT>> apply(
637
640
.apply (
638
641
"WriteGroupedRecords" ,
639
642
ParDo .of (
640
- new WriteGroupedRecordsToFiles <DestinationT , ElementT >(
641
- tempFilePrefix , maxFileSize , rowWriterFactory ))
643
+ new WriteGroupedRecordsToFiles <>(tempFilePrefix , maxFileSize , rowWriterFactory ))
642
644
.withSideInputs (tempFilePrefix ))
643
645
.setCoder (WriteBundlesToFiles .ResultCoder .of (destinationCoder ));
644
646
}
645
647
646
648
// Take in a list of files and write them to temporary tables.
647
- private PCollection <KV <TableDestination , String >> writeTempTables (
648
- PCollection <KV <ShardedKey <DestinationT >, List < String > >> input ,
649
+ private PCollection <KV <TableDestination , WriteTables . Result >> writeTempTables (
650
+ PCollection <KV <ShardedKey <DestinationT >, WritePartition . Result >> input ,
649
651
PCollectionView <String > jobIdTokenView ) {
650
652
List <PCollectionView <?>> sideInputs = Lists .newArrayList (jobIdTokenView );
651
653
sideInputs .addAll (dynamicDestinations .getSideInputs ());
652
654
653
- Coder <KV <ShardedKey <DestinationT >, List < String > >> partitionsCoder =
655
+ Coder <KV <ShardedKey <DestinationT >, WritePartition . Result >> partitionsCoder =
654
656
KvCoder .of (
655
657
ShardedKeyCoder .of (NullableCoder .of (destinationCoder )),
656
- ListCoder . of ( StringUtf8Coder . of ()) );
658
+ WritePartition . ResultCoder . INSTANCE );
657
659
658
660
// If the final destination table exists already (and we're appending to it), then the temp
659
661
// tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
@@ -695,20 +697,24 @@ private PCollection<KV<TableDestination, String>> writeTempTables(
695
697
rowWriterFactory .getSourceFormat (),
696
698
useAvroLogicalTypes ,
697
699
schemaUpdateOptions ))
698
- .setCoder (KvCoder .of (tableDestinationCoder , StringUtf8Coder . of () ));
700
+ .setCoder (KvCoder .of (tableDestinationCoder , WriteTables . ResultCoder . INSTANCE ));
699
701
}
700
702
701
703
// In the case where the files fit into a single load job, there's no need to write temporary
702
704
// tables and rename. We can load these files directly into the target BigQuery table.
703
705
void writeSinglePartition (
704
- PCollection <KV <ShardedKey <DestinationT >, List < String > >> input ,
706
+ PCollection <KV <ShardedKey <DestinationT >, WritePartition . Result >> input ,
705
707
PCollectionView <String > loadJobIdPrefixView ) {
706
708
List <PCollectionView <?>> sideInputs = Lists .newArrayList (loadJobIdPrefixView );
707
709
sideInputs .addAll (dynamicDestinations .getSideInputs ());
708
- Coder <KV <ShardedKey <DestinationT >, List <String >>> partitionsCoder =
710
+
711
+ Coder <TableDestination > tableDestinationCoder =
712
+ clusteringEnabled ? TableDestinationCoderV3 .of () : TableDestinationCoderV2 .of ();
713
+
714
+ Coder <KV <ShardedKey <DestinationT >, WritePartition .Result >> partitionsCoder =
709
715
KvCoder .of (
710
716
ShardedKeyCoder .of (NullableCoder .of (destinationCoder )),
711
- ListCoder . of ( StringUtf8Coder . of ()) );
717
+ WritePartition . ResultCoder . INSTANCE );
712
718
// Write single partition to final table
713
719
input
714
720
.setCoder (partitionsCoder )
@@ -731,7 +737,8 @@ void writeSinglePartition(
731
737
kmsKey ,
732
738
rowWriterFactory .getSourceFormat (),
733
739
useAvroLogicalTypes ,
734
- schemaUpdateOptions ));
740
+ schemaUpdateOptions ))
741
+ .setCoder (KvCoder .of (tableDestinationCoder , WriteTables .ResultCoder .INSTANCE ));
735
742
}
736
743
737
744
private WriteResult writeResult (Pipeline p ) {
0 commit comments