Skip to content

Commit 76d0dcb

Browse files
authored
[BEAM-11218] ptest allows to obtain a pipeline result (apache#15364)
1 parent 13754d4 commit 76d0dcb

File tree

5 files changed

+104
-7
lines changed

5 files changed

+104
-7
lines changed

sdks/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@ require (
3030
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
3131
github.com/golang/protobuf v1.5.2 // TODO(danoliveira): Fully replace this with google.golang.org/protobuf
3232
github.com/golang/snappy v0.0.4 // indirect
33+
github.com/google/btree v1.0.0 // indirect
3334
github.com/google/go-cmp v0.5.6
3435
github.com/google/martian/v3 v3.2.1 // indirect
3536
github.com/google/uuid v1.3.0
37+
github.com/hashicorp/golang-lru v0.5.1 // indirect
3638
github.com/kr/text v0.2.0 // indirect
3739
github.com/linkedin/goavro v2.1.0+incompatible
3840
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
3941
github.com/nightlyone/lockfile v1.0.0
4042
github.com/spf13/cobra v1.2.1
43+
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
4144
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6
4245
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914
4346
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect

sdks/go/pkg/beam/core/metrics/metrics.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"context"
5151
"fmt"
5252
"hash/fnv"
53+
"sort"
5354
"sync"
5455
"sync/atomic"
5556
"time"
@@ -641,3 +642,68 @@ func MergeGauges(
641642
}
642643
return res
643644
}
645+
646+
// MetricsExtractor extracts the metrics.Results from Store using ctx.
647+
// This is same as what metrics.dumperExtractor and metrics.dumpTo would do together.
648+
func MetricsExtractor(ctx context.Context) Results {
649+
store := GetStore(ctx)
650+
m := make(map[Labels]interface{})
651+
e := &Extractor{
652+
SumInt64: func(l Labels, v int64) {
653+
m[l] = &counter{value: v}
654+
},
655+
DistributionInt64: func(l Labels, count, sum, min, max int64) {
656+
m[l] = &distribution{count: count, sum: sum, min: min, max: max}
657+
},
658+
GaugeInt64: func(l Labels, v int64, t time.Time) {
659+
m[l] = &gauge{v: v, t: t}
660+
},
661+
}
662+
e.ExtractFrom(store)
663+
664+
var ls []Labels
665+
for l := range m {
666+
ls = append(ls, l)
667+
}
668+
669+
sort.Slice(ls, func(i, j int) bool {
670+
if ls[i].transform < ls[j].transform {
671+
return true
672+
}
673+
tEq := ls[i].transform == ls[j].transform
674+
if tEq && ls[i].namespace < ls[j].namespace {
675+
return true
676+
}
677+
nsEq := ls[i].namespace == ls[j].namespace
678+
if tEq && nsEq && ls[i].name < ls[j].name {
679+
return true
680+
}
681+
return false
682+
})
683+
684+
r := Results{counters: []CounterResult{}, distributions: []DistributionResult{}, gauges: []GaugeResult{}}
685+
for _, l := range ls {
686+
key := StepKey{Step: l.transform, Name: l.name, Namespace: l.namespace}
687+
switch opt := m[l]; opt.(type) {
688+
case *counter:
689+
attempted := make(map[StepKey]int64)
690+
committed := make(map[StepKey]int64)
691+
attempted[key] = 0
692+
committed[key] = opt.(*counter).value
693+
r.counters = append(r.counters, MergeCounters(attempted, committed)...)
694+
case *distribution:
695+
attempted := make(map[StepKey]DistributionValue)
696+
committed := make(map[StepKey]DistributionValue)
697+
attempted[key] = DistributionValue{}
698+
committed[key] = DistributionValue{opt.(*distribution).count, opt.(*distribution).sum, opt.(*distribution).min, opt.(*distribution).max}
699+
r.distributions = append(r.distributions, MergeDistributions(attempted, committed)...)
700+
case *gauge:
701+
attempted := make(map[StepKey]GaugeValue)
702+
committed := make(map[StepKey]GaugeValue)
703+
attempted[key] = GaugeValue{}
704+
committed[key] = GaugeValue{opt.(*gauge).v, opt.(*gauge).t}
705+
r.gauges = append(r.gauges, MergeGauges(attempted, committed)...)
706+
}
707+
}
708+
return r
709+
}

sdks/go/pkg/beam/runners/direct/direct.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,26 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
7474
if err = plan.Down(ctx); err != nil {
7575
return nil, err
7676
}
77-
// TODO(lostluck) 2020/01/24: What's the right way to expose the
78-
// metrics store for the direct runner?
79-
metrics.DumpToLog(ctx)
80-
return nil, nil
77+
78+
return newDirectPipelineResult(ctx)
79+
}
80+
81+
type directPipelineResult struct {
82+
jobID string
83+
metrics *metrics.Results
84+
}
85+
86+
func newDirectPipelineResult(ctx context.Context) (*directPipelineResult, error) {
87+
metrics := metrics.MetricsExtractor(ctx)
88+
return &directPipelineResult{metrics: &metrics}, nil
89+
}
90+
91+
func (pr directPipelineResult) Metrics() metrics.Results {
92+
return *pr.metrics
93+
}
94+
95+
func (pr directPipelineResult) JobID() string {
96+
return pr.jobID
8197
}
8298

8399
// Compile translates a pipeline to a multi-bundle execution plan.

sdks/go/pkg/beam/testing/ptest/ptest.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,23 @@ func Run(p *beam.Pipeline) error {
8181
return err
8282
}
8383

84+
// RunWithMetrics runs a pipeline for testing with that returns metrics.Results
85+
// in the form of Pipeline Result
86+
func RunWithMetrics(p *beam.Pipeline) (beam.PipelineResult, error) {
87+
if *Runner == "" {
88+
*Runner = defaultRunner
89+
}
90+
return beam.Run(context.Background(), *Runner, p)
91+
}
92+
8493
// RunAndValidate runs a pipeline for testing and validates the result, failing
8594
// the test if the pipeline fails.
86-
func RunAndValidate(t *testing.T, p *beam.Pipeline) {
87-
if err := Run(p); err != nil {
95+
func RunAndValidate(t *testing.T, p *beam.Pipeline) beam.PipelineResult {
96+
pr, err := RunWithMetrics(p)
97+
if err != nil {
8898
t.Fatalf("Failed to execute job: %v", err)
8999
}
100+
return pr
90101
}
91102

92103
// Main is an implementation of testing's TestMain to permit testing

sdks/go/test/integration/wordcount/wordcount_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ func TestWordCount(t *testing.T) {
8484
memfs.Write(filename, []byte(strings.Join(test.lines, "\n")))
8585

8686
p := WordCount(filename, test.hash, test.words)
87-
if err := ptest.Run(p); err != nil {
87+
_, err := ptest.RunWithMetrics(p)
88+
if err != nil {
8889
t.Errorf("WordCount(\"%v\") failed: %v", strings.Join(test.lines, "|"), err)
8990
}
9091
}

0 commit comments

Comments
 (0)