@@ -89,6 +89,8 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) {
89
89
}
90
90
// Elide the PCollection Node for DataSources
91
91
// DataSources can get byte samples directly, and can handle CoGBKs.
92
+ // Copying the PCollection here is fine, as the PCollection will never
93
+ // have used it's mutex yet.
92
94
u .PCol = * u .Out .(* PCollection )
93
95
u .Out = u .PCol .Out
94
96
b .units = b .units [:len (b .units )- 1 ]
@@ -234,7 +236,22 @@ func (b *builder) makePCollections(out []string) ([]Node, error) {
234
236
if err != nil {
235
237
return nil , err
236
238
}
237
- ret = append (ret , n )
239
+ // This is the cleanest place to do this check and filtering,
240
+ // since DataSinks don't know their inputs, due to the construction
241
+ // call stack.
242
+ // A Source->Sink is both uncommon and inefficent, with the Source eliding the
243
+ // collection anyway.
244
+ // TODO[BEAM-6374): Properly handle the multiplex and flatten cases.
245
+ // Right now we just stop datasink collection.
246
+ switch out := n .Out .(type ) {
247
+ case * DataSink :
248
+ // We don't remove the PCollection from units here, since we
249
+ // want to ensure it's included in snapshots.
250
+ out .PCol = n
251
+ ret = append (ret , out )
252
+ default :
253
+ ret = append (ret , n )
254
+ }
238
255
}
239
256
return ret , nil
240
257
}
@@ -465,7 +482,14 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
465
482
}
466
483
u = & LiftedCombine {Combine : cn , KeyCoder : ec .Components [0 ], WindowCoder : wc }
467
484
case urnPerKeyCombineMerge :
468
- u = & MergeAccumulators {Combine : cn }
485
+ ma := & MergeAccumulators {Combine : cn }
486
+ if eo , ok := ma .Out .(* PCollection ).Out .(* ExtractOutput ); ok {
487
+ // Strip PCollections from between MergeAccumulators and ExtractOutputs
488
+ // as it's a synthetic PCollection.
489
+ b .units = b .units [:len (b .units )- 1 ]
490
+ ma .Out = eo
491
+ }
492
+ u = ma
469
493
case urnPerKeyCombineExtract :
470
494
u = & ExtractOutput {Combine : cn }
471
495
case urnPerKeyCombineConvert :
@@ -644,7 +668,7 @@ func inputIdToIndex(id string) (int, error) {
644
668
return strconv .Atoi (strings .TrimPrefix (id , "i" ))
645
669
}
646
670
647
- // inputIdToIndex converts an index into a local input ID for a transform. Use
671
+ // indexToInputId converts an index into a local input ID for a transform. Use
648
672
// this to avoid relying on format details for input IDs.
649
673
func indexToInputId (i int ) string {
650
674
return "i" + strconv .Itoa (i )
0 commit comments