@@ -32,18 +32,30 @@ import (
32
32
"encoding/csv"
33
33
"encoding/json"
34
34
"strings"
35
+ "sync"
35
36
36
37
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
37
38
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
38
39
"github.com/apache/beam/sdks/go/pkg/beam/log"
39
40
fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
40
41
)
41
42
42
- var (
43
- hookRegistry = make (map [string ]HookFactory )
44
- enabledHooks = make (map [string ][]string )
45
- activeHooks = make (map [string ]Hook )
46
- )
43
+ var defaultRegistry = newRegistry ()
44
+
45
+ type registry struct {
46
+ mu sync.Mutex
47
+ hookRegistry map [string ]HookFactory
48
+ enabledHooks map [string ][]string
49
+ activeHooks map [string ]Hook
50
+ }
51
+
52
+ func newRegistry () * registry {
53
+ return & registry {
54
+ hookRegistry : make (map [string ]HookFactory ),
55
+ enabledHooks : make (map [string ][]string ),
56
+ activeHooks : make (map [string ]Hook ),
57
+ }
58
+ }
47
59
48
60
// A Hook is a set of hooks to run at various stages of executing a
49
61
// pipeline.
@@ -64,19 +76,26 @@ type InitHook func(context.Context) (context.Context, error)
64
76
// HookFactory is a function that produces a Hook from the supplied arguments.
65
77
type HookFactory func ([]string ) Hook
66
78
79
+ func (r * registry ) RegisterHook (name string , h HookFactory ) {
80
+ r .mu .Lock ()
81
+ defer r .mu .Unlock ()
82
+ r .hookRegistry [name ] = h
83
+ }
84
+
67
85
// RegisterHook registers a Hook for the
68
86
// supplied identifier.
69
87
func RegisterHook (name string , h HookFactory ) {
70
- hookRegistry [ name ] = h
88
+ defaultRegistry . RegisterHook ( name , h )
71
89
}
72
90
73
- // RunInitHooks runs the init hooks.
74
- func RunInitHooks (ctx context.Context ) (context.Context , error ) {
91
+ func (r * registry ) RunInitHooks (ctx context.Context ) (context.Context , error ) {
75
92
// If an init hook fails to complete, the invariants of the
76
93
// system are compromised and we can't run a workflow.
77
94
// The hooks can run in any order. They should not be
78
95
// interdependent or interfere with each other.
79
- for _ , h := range activeHooks {
96
+ r .mu .Lock ()
97
+ defer r .mu .Unlock ()
98
+ for _ , h := range r .activeHooks {
80
99
if h .Init != nil {
81
100
var err error
82
101
if ctx , err = h .Init (ctx ); err != nil {
@@ -87,15 +106,21 @@ func RunInitHooks(ctx context.Context) (context.Context, error) {
87
106
return ctx , nil
88
107
}
89
108
109
+ // RunInitHooks runs the init hooks.
110
+ func RunInitHooks (ctx context.Context ) (context.Context , error ) {
111
+ return defaultRegistry .RunInitHooks (ctx )
112
+ }
113
+
90
114
// RequestHook is called when handling a FnAPI instruction. It can return an updated
91
115
// context to pass additional information to downstream callers, or return the
92
116
// original context provided.
93
117
type RequestHook func (context.Context , * fnpb.InstructionRequest ) (context.Context , error )
94
118
95
- // RunRequestHooks runs the hooks that handle a FnAPI request.
96
- func RunRequestHooks (ctx context.Context , req * fnpb.InstructionRequest ) context.Context {
119
+ func (r * registry ) RunRequestHooks (ctx context.Context , req * fnpb.InstructionRequest ) context.Context {
120
+ r .mu .Lock ()
121
+ defer r .mu .Unlock ()
97
122
// The request hooks should not modify the request.
98
- for n , h := range activeHooks {
123
+ for n , h := range r . activeHooks {
99
124
if h .Req != nil {
100
125
var err error
101
126
if ctx , err = h .Req (ctx , req ); err != nil {
@@ -106,12 +131,18 @@ func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) context.
106
131
return ctx
107
132
}
108
133
134
+ // RunRequestHooks runs the hooks that handle a FnAPI request.
135
+ func RunRequestHooks (ctx context.Context , req * fnpb.InstructionRequest ) context.Context {
136
+ return defaultRegistry .RunRequestHooks (ctx , req )
137
+ }
138
+
109
139
// ResponseHook is called when sending a FnAPI instruction response.
110
140
type ResponseHook func (context.Context , * fnpb.InstructionRequest , * fnpb.InstructionResponse ) error
111
141
112
- // RunResponseHooks runs the hooks that handle a FnAPI response.
113
- func RunResponseHooks (ctx context.Context , req * fnpb.InstructionRequest , resp * fnpb.InstructionResponse ) {
114
- for n , h := range activeHooks {
142
+ func (r * registry ) RunResponseHooks (ctx context.Context , req * fnpb.InstructionRequest , resp * fnpb.InstructionResponse ) {
143
+ r .mu .Lock ()
144
+ defer r .mu .Unlock ()
145
+ for n , h := range r .activeHooks {
115
146
if h .Resp != nil {
116
147
if err := h .Resp (ctx , req , resp ); err != nil {
117
148
log .Infof (ctx , "response hook %s failed: %v" , n , err )
@@ -120,52 +151,81 @@ func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp *f
120
151
}
121
152
}
122
153
123
- // SerializeHooksToOptions serializes the activated hooks and their configuration into a JSON string
124
- // that can be deserialized later by the runner.
125
- func SerializeHooksToOptions () {
126
- data , err := json .Marshal (enabledHooks )
154
+ // RunResponseHooks runs the hooks that handle a FnAPI response.
155
+ func RunResponseHooks (ctx context.Context , req * fnpb.InstructionRequest , resp * fnpb.InstructionResponse ) {
156
+ defaultRegistry .RunResponseHooks (ctx , req , resp )
157
+ }
158
+
159
+ func (r * registry ) SerializeHooksToOptions () {
160
+ r .mu .Lock ()
161
+ defer r .mu .Unlock ()
162
+ data , err := json .Marshal (r .enabledHooks )
127
163
if err != nil {
128
164
// Shouldn't happen, since all the data is strings.
129
165
panic (errors .Wrap (err , "Couldn't serialize hooks" ))
130
166
}
131
167
runtime .GlobalOptions .Set ("hooks" , string (data ))
132
168
}
133
169
134
- // DeserializeHooksFromOptions extracts the hook configuration information from the options and configures
135
- // the hooks with the supplied options.
136
- func DeserializeHooksFromOptions (ctx context.Context ) {
170
+ // SerializeHooksToOptions serializes the activated hooks and their configuration into a JSON string
171
+ // that can be deserialized later by the runner.
172
+ func SerializeHooksToOptions () {
173
+ defaultRegistry .SerializeHooksToOptions ()
174
+ }
175
+
176
+ func (r * registry ) DeserializeHooksFromOptions (ctx context.Context ) {
137
177
cfg := runtime .GlobalOptions .Get ("hooks" )
138
178
if cfg == "" {
139
179
log .Warn (ctx , "SerializeHooksToOptions was never called. No hooks enabled" )
140
180
return
141
181
}
142
- if err := json .Unmarshal ([]byte (cfg ), & enabledHooks ); err != nil {
182
+ r .mu .Lock ()
183
+ defer r .mu .Unlock ()
184
+ if err := json .Unmarshal ([]byte (cfg ), & r .enabledHooks ); err != nil {
143
185
// Shouldn't happen, since all the data is strings.
144
186
panic (errors .Wrapf (err , "DeserializeHooks failed on input %q" , cfg ))
145
187
}
146
188
147
- for h , opts := range enabledHooks {
148
- activeHooks [h ] = hookRegistry [h ](opts )
189
+ for h , opts := range r . enabledHooks {
190
+ r . activeHooks [h ] = r. hookRegistry [h ](opts )
149
191
}
150
192
}
151
193
194
+ // DeserializeHooksFromOptions extracts the hook configuration information from the options and configures
195
+ // the hooks with the supplied options.
196
+ func DeserializeHooksFromOptions (ctx context.Context ) {
197
+ defaultRegistry .DeserializeHooksFromOptions (ctx )
198
+ }
199
+
200
+ func (r * registry ) EnableHook (name string , args ... string ) error {
201
+ r .mu .Lock ()
202
+ defer r .mu .Unlock ()
203
+ if _ , ok := r .hookRegistry [name ]; ! ok {
204
+ return errors .Errorf ("EnableHook: hook %s not found" , name )
205
+ }
206
+ r .enabledHooks [name ] = args
207
+ return nil
208
+ }
209
+
152
210
// EnableHook enables the hook to be run for the pipline. It will be
153
211
// receive the supplied args when the pipeline executes. It is safe
154
212
// to enable the same hook with different options, as this is necessary
155
213
// if a hook wants to compose behavior.
156
214
func EnableHook (name string , args ... string ) error {
157
- if _ , ok := hookRegistry [name ]; ! ok {
158
- return errors .Errorf ("EnableHook: hook %s not found" , name )
159
- }
160
- enabledHooks [name ] = args
161
- return nil
215
+ return defaultRegistry .EnableHook (name , args ... )
216
+ }
217
+
218
+ func (r * registry ) IsEnabled (name string ) (bool , []string ) {
219
+ r .mu .Lock ()
220
+ defer r .mu .Unlock ()
221
+ opts , ok := r .enabledHooks [name ]
222
+ return ok , opts
162
223
}
163
224
164
225
// IsEnabled returns true and the registered options if the hook is
165
226
// already enabled.
166
227
func IsEnabled (name string ) (bool , []string ) {
167
- opts , ok := enabledHooks [name ]
168
- return ok , opts
228
+ return defaultRegistry .IsEnabled (name )
169
229
}
170
230
171
231
// Encode encodes a hook name and its arguments into a single string.
0 commit comments