Skip to content

Commit f88ed4e

Browse files
authored
[BEAM-12643] Resolved the concurrent writes issue for parallel tests (apache#15199)
1 parent bbe7b64 commit f88ed4e

File tree

2 files changed

+146
-38
lines changed

2 files changed

+146
-38
lines changed

sdks/go/pkg/beam/core/util/hooks/hooks.go

+92-32
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,30 @@ import (
3232
"encoding/csv"
3333
"encoding/json"
3434
"strings"
35+
"sync"
3536

3637
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
3738
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
3839
"github.com/apache/beam/sdks/go/pkg/beam/log"
3940
fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
4041
)
4142

42-
var (
43-
hookRegistry = make(map[string]HookFactory)
44-
enabledHooks = make(map[string][]string)
45-
activeHooks = make(map[string]Hook)
46-
)
43+
var defaultRegistry = newRegistry()
44+
45+
type registry struct {
46+
mu sync.Mutex
47+
hookRegistry map[string]HookFactory
48+
enabledHooks map[string][]string
49+
activeHooks map[string]Hook
50+
}
51+
52+
func newRegistry() *registry {
53+
return &registry{
54+
hookRegistry: make(map[string]HookFactory),
55+
enabledHooks: make(map[string][]string),
56+
activeHooks: make(map[string]Hook),
57+
}
58+
}
4759

4860
// A Hook is a set of hooks to run at various stages of executing a
4961
// pipeline.
@@ -64,19 +76,26 @@ type InitHook func(context.Context) (context.Context, error)
6476
// HookFactory is a function that produces a Hook from the supplied arguments.
6577
type HookFactory func([]string) Hook
6678

79+
func (r *registry) RegisterHook(name string, h HookFactory) {
80+
r.mu.Lock()
81+
defer r.mu.Unlock()
82+
r.hookRegistry[name] = h
83+
}
84+
6785
// RegisterHook registers a Hook for the
6886
// supplied identifier.
6987
func RegisterHook(name string, h HookFactory) {
70-
hookRegistry[name] = h
88+
defaultRegistry.RegisterHook(name, h)
7189
}
7290

73-
// RunInitHooks runs the init hooks.
74-
func RunInitHooks(ctx context.Context) (context.Context, error) {
91+
func (r *registry) RunInitHooks(ctx context.Context) (context.Context, error) {
7592
// If an init hook fails to complete, the invariants of the
7693
// system are compromised and we can't run a workflow.
7794
// The hooks can run in any order. They should not be
7895
// interdependent or interfere with each other.
79-
for _, h := range activeHooks {
96+
r.mu.Lock()
97+
defer r.mu.Unlock()
98+
for _, h := range r.activeHooks {
8099
if h.Init != nil {
81100
var err error
82101
if ctx, err = h.Init(ctx); err != nil {
@@ -87,15 +106,21 @@ func RunInitHooks(ctx context.Context) (context.Context, error) {
87106
return ctx, nil
88107
}
89108

109+
// RunInitHooks runs the init hooks.
110+
func RunInitHooks(ctx context.Context) (context.Context, error) {
111+
return defaultRegistry.RunInitHooks(ctx)
112+
}
113+
90114
// RequestHook is called when handling a FnAPI instruction. It can return an updated
91115
// context to pass additional information to downstream callers, or return the
92116
// original context provided.
93117
type RequestHook func(context.Context, *fnpb.InstructionRequest) (context.Context, error)
94118

95-
// RunRequestHooks runs the hooks that handle a FnAPI request.
96-
func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) context.Context {
119+
func (r *registry) RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) context.Context {
120+
r.mu.Lock()
121+
defer r.mu.Unlock()
97122
// The request hooks should not modify the request.
98-
for n, h := range activeHooks {
123+
for n, h := range r.activeHooks {
99124
if h.Req != nil {
100125
var err error
101126
if ctx, err = h.Req(ctx, req); err != nil {
@@ -106,12 +131,18 @@ func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) context.
106131
return ctx
107132
}
108133

134+
// RunRequestHooks runs the hooks that handle a FnAPI request.
135+
func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) context.Context {
136+
return defaultRegistry.RunRequestHooks(ctx, req)
137+
}
138+
109139
// ResponseHook is called when sending a FnAPI instruction response.
110140
type ResponseHook func(context.Context, *fnpb.InstructionRequest, *fnpb.InstructionResponse) error
111141

112-
// RunResponseHooks runs the hooks that handle a FnAPI response.
113-
func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp *fnpb.InstructionResponse) {
114-
for n, h := range activeHooks {
142+
func (r *registry) RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp *fnpb.InstructionResponse) {
143+
r.mu.Lock()
144+
defer r.mu.Unlock()
145+
for n, h := range r.activeHooks {
115146
if h.Resp != nil {
116147
if err := h.Resp(ctx, req, resp); err != nil {
117148
log.Infof(ctx, "response hook %s failed: %v", n, err)
@@ -120,52 +151,81 @@ func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp *f
120151
}
121152
}
122153

123-
// SerializeHooksToOptions serializes the activated hooks and their configuration into a JSON string
124-
// that can be deserialized later by the runner.
125-
func SerializeHooksToOptions() {
126-
data, err := json.Marshal(enabledHooks)
154+
// RunResponseHooks runs the hooks that handle a FnAPI response.
155+
func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp *fnpb.InstructionResponse) {
156+
defaultRegistry.RunResponseHooks(ctx, req, resp)
157+
}
158+
159+
func (r *registry) SerializeHooksToOptions() {
160+
r.mu.Lock()
161+
defer r.mu.Unlock()
162+
data, err := json.Marshal(r.enabledHooks)
127163
if err != nil {
128164
// Shouldn't happen, since all the data is strings.
129165
panic(errors.Wrap(err, "Couldn't serialize hooks"))
130166
}
131167
runtime.GlobalOptions.Set("hooks", string(data))
132168
}
133169

134-
// DeserializeHooksFromOptions extracts the hook configuration information from the options and configures
135-
// the hooks with the supplied options.
136-
func DeserializeHooksFromOptions(ctx context.Context) {
170+
// SerializeHooksToOptions serializes the activated hooks and their configuration into a JSON string
171+
// that can be deserialized later by the runner.
172+
func SerializeHooksToOptions() {
173+
defaultRegistry.SerializeHooksToOptions()
174+
}
175+
176+
func (r *registry) DeserializeHooksFromOptions(ctx context.Context) {
137177
cfg := runtime.GlobalOptions.Get("hooks")
138178
if cfg == "" {
139179
log.Warn(ctx, "SerializeHooksToOptions was never called. No hooks enabled")
140180
return
141181
}
142-
if err := json.Unmarshal([]byte(cfg), &enabledHooks); err != nil {
182+
r.mu.Lock()
183+
defer r.mu.Unlock()
184+
if err := json.Unmarshal([]byte(cfg), &r.enabledHooks); err != nil {
143185
// Shouldn't happen, since all the data is strings.
144186
panic(errors.Wrapf(err, "DeserializeHooks failed on input %q", cfg))
145187
}
146188

147-
for h, opts := range enabledHooks {
148-
activeHooks[h] = hookRegistry[h](opts)
189+
for h, opts := range r.enabledHooks {
190+
r.activeHooks[h] = r.hookRegistry[h](opts)
149191
}
150192
}
151193

194+
// DeserializeHooksFromOptions extracts the hook configuration information from the options and configures
195+
// the hooks with the supplied options.
196+
func DeserializeHooksFromOptions(ctx context.Context) {
197+
defaultRegistry.DeserializeHooksFromOptions(ctx)
198+
}
199+
200+
func (r *registry) EnableHook(name string, args ...string) error {
201+
r.mu.Lock()
202+
defer r.mu.Unlock()
203+
if _, ok := r.hookRegistry[name]; !ok {
204+
return errors.Errorf("EnableHook: hook %s not found", name)
205+
}
206+
r.enabledHooks[name] = args
207+
return nil
208+
}
209+
152210
// EnableHook enables the hook to be run for the pipline. It will be
153211
// receive the supplied args when the pipeline executes. It is safe
154212
// to enable the same hook with different options, as this is necessary
155213
// if a hook wants to compose behavior.
156214
func EnableHook(name string, args ...string) error {
157-
if _, ok := hookRegistry[name]; !ok {
158-
return errors.Errorf("EnableHook: hook %s not found", name)
159-
}
160-
enabledHooks[name] = args
161-
return nil
215+
return defaultRegistry.EnableHook(name, args...)
216+
}
217+
218+
func (r *registry) IsEnabled(name string) (bool, []string) {
219+
r.mu.Lock()
220+
defer r.mu.Unlock()
221+
opts, ok := r.enabledHooks[name]
222+
return ok, opts
162223
}
163224

164225
// IsEnabled returns true and the registered options if the hook is
165226
// already enabled.
166227
func IsEnabled(name string) (bool, []string) {
167-
opts, ok := enabledHooks[name]
168-
return ok, opts
228+
return defaultRegistry.IsEnabled(name)
169229
}
170230

171231
// Encode encodes a hook name and its arguments into a single string.

sdks/go/pkg/beam/core/util/hooks/hooks_test.go

+54-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ package hooks
1717

1818
import (
1919
"context"
20+
"sync"
2021
"testing"
22+
"time"
2123

2224
fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
2325
)
@@ -31,24 +33,26 @@ const (
3133
reqValue = "reqValue"
3234
)
3335

34-
func initializeHooks() {
35-
activeHooks["test"] = Hook{
36+
func initializeHooks() *registry {
37+
var r = newRegistry()
38+
r.activeHooks["test"] = Hook{
3639
Init: func(ctx context.Context) (context.Context, error) {
3740
return context.WithValue(ctx, initKey, initValue), nil
3841
},
3942
Req: func(ctx context.Context, req *fnpb.InstructionRequest) (context.Context, error) {
4043
return context.WithValue(ctx, reqKey, reqValue), nil
4144
},
4245
}
46+
return r
4347
}
4448

4549
func TestInitContextPropagation(t *testing.T) {
46-
initializeHooks()
50+
r := initializeHooks()
4751
ctx := context.Background()
4852
var err error
4953

5054
expected := initValue
51-
ctx, err = RunInitHooks(ctx)
55+
ctx, err = r.RunInitHooks(ctx)
5256
if err != nil {
5357
t.Errorf("got %v error, wanted no error", err)
5458
}
@@ -59,13 +63,57 @@ func TestInitContextPropagation(t *testing.T) {
5963
}
6064

6165
func TestRequestContextPropagation(t *testing.T) {
62-
initializeHooks()
66+
r := initializeHooks()
6367
ctx := context.Background()
6468

6569
expected := reqValue
66-
ctx = RunRequestHooks(ctx, nil)
70+
ctx = r.RunRequestHooks(ctx, nil)
6771
actual := ctx.Value(reqKey)
6872
if actual != expected {
6973
t.Errorf("Got %s, wanted %s", actual, expected)
7074
}
7175
}
76+
77+
// TestConcurrentWrites tests if the concurrent writes are handled properly.
78+
// It uses go routines to test this on sample hook 'google_logging'.
79+
func TestConcurrentWrites(t *testing.T) {
80+
r := initializeHooks()
81+
hf := func(opts []string) Hook {
82+
return Hook{
83+
Req: func(ctx context.Context, req *fnpb.InstructionRequest) (context.Context, error) {
84+
return ctx, nil
85+
},
86+
}
87+
}
88+
r.RegisterHook("google_logging", hf)
89+
90+
var actual, expected error
91+
expected = nil
92+
93+
ch := make(chan struct{})
94+
wg := sync.WaitGroup{}
95+
96+
for i := 0; i < 5; i++ {
97+
wg.Add(1)
98+
go func() {
99+
defer wg.Done()
100+
for {
101+
select {
102+
case <-ch:
103+
// When the channel is closed, exit.
104+
return
105+
default:
106+
actual = r.EnableHook("google_logging")
107+
if actual != expected {
108+
t.Errorf("Got %s, wanted %s", actual, expected)
109+
}
110+
}
111+
}
112+
}()
113+
}
114+
// Let the goroutines execute for 5 seconds and then close the channel.
115+
time.Sleep(time.Second * 5)
116+
close(ch)
117+
// Wait for all goroutines to exit properly.
118+
wg.Wait()
119+
}

0 commit comments

Comments
 (0)