Skip to content

Commit fea8fa5

Browse files
authored
[BEAM-6374] Elide collecting unnecessary pcollection metrics (apache#15358)
* [BEAM-6374] Elide collecting unnecessary pcollection metrics * [BEAM-6374] Guard against multiplex/flatten case.
1 parent 0228419 commit fea8fa5

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

sdks/go/pkg/beam/core/runtime/exec/datasink.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,41 +20,54 @@ import (
2020
"context"
2121
"fmt"
2222
"io"
23+
"sync/atomic"
2324

2425
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
2526
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
2627
)
2728

28-
// DataSink is a Node.
29+
// DataSink is a Node that writes element data to the data service..
2930
type DataSink struct {
3031
UID UnitID
3132
SID StreamID
3233
Coder *coder.Coder
34+
PCol *PCollection // Handles size metrics.
3335

3436
enc ElementEncoder
3537
wEnc WindowEncoder
3638
w io.WriteCloser
3739
}
3840

41+
// ID returns the debug ID.
3942
func (n *DataSink) ID() UnitID {
4043
return n.UID
4144
}
4245

46+
// Up initializes the element and window encoders.
4347
func (n *DataSink) Up(ctx context.Context) error {
4448
n.enc = MakeElementEncoder(coder.SkipW(n.Coder))
4549
n.wEnc = MakeWindowEncoder(n.Coder.Window)
4650
return nil
4751
}
4852

53+
// StartBundle opens the writer to the data service.
4954
func (n *DataSink) StartBundle(ctx context.Context, id string, data DataContext) error {
5055
w, err := data.Data.OpenWrite(ctx, n.SID)
5156
if err != nil {
5257
return err
5358
}
5459
n.w = w
60+
// TODO[BEAM-6374): Properly handle the multiplex and flatten cases.
61+
// Right now we just stop datasink collection.
62+
if n.PCol != nil {
63+
atomic.StoreInt64(&n.PCol.elementCount, 0)
64+
n.PCol.resetSize()
65+
}
5566
return nil
5667
}
5768

69+
// ProcessElement encodes the windowed value header for the element, followed by the element,
70+
// emitting it to the data service.
5871
func (n *DataSink) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error {
5972
// Marshal the pieces into a temporary buffer since they must be transmitted on FnAPI as a single
6073
// unit.
@@ -66,16 +79,25 @@ func (n *DataSink) ProcessElement(ctx context.Context, value *FullValue, values
6679
if err := n.enc.Encode(value, &b); err != nil {
6780
return errors.WithContextf(err, "encoding element %v with coder %v", value, n.enc)
6881
}
69-
if _, err := n.w.Write(b.Bytes()); err != nil {
82+
byteCount, err := n.w.Write(b.Bytes())
83+
if err != nil {
7084
return err
7185
}
86+
// TODO[BEAM-6374): Properly handle the multiplex and flatten cases.
87+
// Right now we just stop datasink collection.
88+
if n.PCol != nil {
89+
atomic.AddInt64(&n.PCol.elementCount, 1)
90+
n.PCol.addSize(int64(byteCount))
91+
}
7292
return nil
7393
}
7494

95+
// FinishBundle closes the write to the data channel.
7596
func (n *DataSink) FinishBundle(ctx context.Context) error {
7697
return n.w.Close()
7798
}
7899

100+
// Down is a no-op.
79101
func (n *DataSink) Down(ctx context.Context) error {
80102
return nil
81103
}

sdks/go/pkg/beam/core/runtime/exec/translate.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) {
8989
}
9090
// Elide the PCollection Node for DataSources
9191
// DataSources can get byte samples directly, and can handle CoGBKs.
92+
// Copying the PCollection here is fine, as the PCollection will never
93+
// have used it's mutex yet.
9294
u.PCol = *u.Out.(*PCollection)
9395
u.Out = u.PCol.Out
9496
b.units = b.units[:len(b.units)-1]
@@ -234,7 +236,22 @@ func (b *builder) makePCollections(out []string) ([]Node, error) {
234236
if err != nil {
235237
return nil, err
236238
}
237-
ret = append(ret, n)
239+
// This is the cleanest place to do this check and filtering,
240+
// since DataSinks don't know their inputs, due to the construction
241+
// call stack.
242+
// A Source->Sink is both uncommon and inefficent, with the Source eliding the
243+
// collection anyway.
244+
// TODO[BEAM-6374): Properly handle the multiplex and flatten cases.
245+
// Right now we just stop datasink collection.
246+
switch out := n.Out.(type) {
247+
case *DataSink:
248+
// We don't remove the PCollection from units here, since we
249+
// want to ensure it's included in snapshots.
250+
out.PCol = n
251+
ret = append(ret, out)
252+
default:
253+
ret = append(ret, n)
254+
}
238255
}
239256
return ret, nil
240257
}
@@ -465,7 +482,14 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
465482
}
466483
u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0], WindowCoder: wc}
467484
case urnPerKeyCombineMerge:
468-
u = &MergeAccumulators{Combine: cn}
485+
ma := &MergeAccumulators{Combine: cn}
486+
if eo, ok := ma.Out.(*PCollection).Out.(*ExtractOutput); ok {
487+
// Strip PCollections from between MergeAccumulators and ExtractOutputs
488+
// as it's a synthetic PCollection.
489+
b.units = b.units[:len(b.units)-1]
490+
ma.Out = eo
491+
}
492+
u = ma
469493
case urnPerKeyCombineExtract:
470494
u = &ExtractOutput{Combine: cn}
471495
case urnPerKeyCombineConvert:
@@ -644,7 +668,7 @@ func inputIdToIndex(id string) (int, error) {
644668
return strconv.Atoi(strings.TrimPrefix(id, "i"))
645669
}
646670

647-
// inputIdToIndex converts an index into a local input ID for a transform. Use
671+
// indexToInputId converts an index into a local input ID for a transform. Use
648672
// this to avoid relying on format details for input IDs.
649673
func indexToInputId(i int) string {
650674
return "i" + strconv.Itoa(i)

0 commit comments

Comments
 (0)