Skip to content

Commit d3b4c49

Browse files
committed
refactor: clean up the watch server handlers
- Always stop the watcher when done reading events from the result channel. ListResource already stops the watch, but it's multiple layers above, which means unit tests of the internal methods need to replicate that behavior. So this change simplifies testing and ensures the watcher is stopped at least once. This should also make it easier to simplify the WatchServer in the future. - Avoid calling Done & ResultChan multiple times, to reduce locking and memory allocations. - Replace the TimeoutFactory with a Context, to reduce complexity. - Use the -Ch suffix for channel variables. - Add a done channel to the WatchServer to handle cleanup: stop the context and the storage watcher. - Fix some flaky tests that were trying to use SimpleStorage.fakeWatcher before Watch was called. - Validate url.Parse does not error in tests.
1 parent ee2d421 commit d3b4c49

File tree

6 files changed

+247
-151
lines changed

6 files changed

+247
-151
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2038,17 +2038,23 @@ func TestWatchTable(t *testing.T) {
20382038
t.Fatalf("%d: unexpected response: %#v", i, resp)
20392039
}
20402040

2041+
var watcher *watch.FakeWatcher
2042+
for watcher == nil {
2043+
watcher = simpleStorage.Watcher()
2044+
time.Sleep(time.Millisecond)
2045+
}
2046+
20412047
go func() {
2042-
defer simpleStorage.fakeWatch.Stop()
2043-
test.send(simpleStorage.fakeWatch)
2048+
defer watcher.Stop()
2049+
test.send(watcher)
20442050
}()
20452051

20462052
body, err := ioutil.ReadAll(resp.Body)
20472053
if err != nil {
20482054
t.Fatal(err)
20492055
}
20502056
t.Logf("Body:\n%s", string(body))
2051-
d := watcher(resp.Header.Get("Content-Type"), ioutil.NopCloser(bytes.NewReader(body)))
2057+
d := newDecoder(resp.Header.Get("Content-Type"), ioutil.NopCloser(bytes.NewReader(body)))
20522058
var actual []*metav1.WatchEvent
20532059
for {
20542060
var event metav1.WatchEvent
@@ -2068,7 +2074,7 @@ func TestWatchTable(t *testing.T) {
20682074
}
20692075
}
20702076

2071-
func watcher(mediaType string, r io.ReadCloser) streaming.Decoder {
2077+
func newDecoder(mediaType string, r io.ReadCloser) streaming.Decoder {
20722078
info, ok := runtime.SerializerInfoForMediaType(metainternalversionscheme.Codecs.SupportedMediaTypes(), mediaType)
20732079
if !ok || info.StreamSerializer == nil {
20742080
panic(info)

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/apimachinery/pkg/fields"
3737
"k8s.io/apimachinery/pkg/runtime"
3838
"k8s.io/apimachinery/pkg/runtime/schema"
39+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3940
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
4041
"k8s.io/apiserver/pkg/endpoints/metrics"
4142
"k8s.io/apiserver/pkg/endpoints/request"
@@ -189,6 +190,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
189190
hasName = false
190191
}
191192
ctx = request.WithNamespace(ctx, namespace)
193+
req = req.WithContext(ctx)
192194

193195
opts := metainternalversion.ListOptions{}
194196
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
@@ -276,33 +278,22 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
276278
}
277279

278280
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
279-
ctx, cancel := context.WithTimeout(ctx, timeout)
280-
defer func() { cancel() }()
281-
watcher, err := rw.Watch(ctx, &opts)
282-
if err != nil {
283-
scope.err(err, w, req)
284-
return
285-
}
286-
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts), emptyVersionedList)
281+
handler, err := serveWatchHandler(ctx, req, w, rw, &opts, scope, outputMediaType, timeout, emptyVersionedList)
287282
if err != nil {
283+
utilruntime.HandleError(err)
288284
scope.err(err, w, req)
289285
return
290286
}
291-
// Invalidate cancel() to defer until serve() is complete.
292-
deferredCancel := cancel
293-
cancel = func() {}
294287

295288
serve := func() {
296-
defer deferredCancel()
297289
requestInfo, _ := request.RequestInfoFrom(ctx)
298290
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
299-
defer watcher.Stop()
300291
handler.ServeHTTP(w, req)
301292
})
302293
}
303294

304295
// Run watch serving in a separate goroutine to allow freeing current stack memory
305-
t := routine.TaskFrom(req.Context())
296+
t := routine.TaskFrom(ctx)
306297
if t != nil {
307298
t.Func = serve
308299
} else {

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go

Lines changed: 94 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"golang.org/x/net/websocket"
2727

2828
"k8s.io/apimachinery/pkg/api/errors"
29+
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/apimachinery/pkg/runtime"
3132
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
@@ -35,6 +36,7 @@ import (
3536
"k8s.io/apiserver/pkg/endpoints/metrics"
3637
apirequest "k8s.io/apiserver/pkg/endpoints/request"
3738
"k8s.io/apiserver/pkg/features"
39+
"k8s.io/apiserver/pkg/registry/rest"
3840
"k8s.io/apiserver/pkg/storage"
3941
utilfeature "k8s.io/apiserver/pkg/util/feature"
4042
)
@@ -64,7 +66,14 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
6466

6567
// serveWatchHandler returns a handle to serve a watch response.
6668
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
67-
func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string, initialEventsListBlueprint runtime.Object) (http.Handler, error) {
69+
func serveWatchHandler(ctx context.Context, req *http.Request, w http.ResponseWriter, rw rest.Watcher, watchOpts *metainternalversion.ListOptions, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, timeout time.Duration, initialEventsListBlueprint runtime.Object) (http.Handler, error) {
70+
// Start the server-side request timeout clock now.
71+
// Use a separate context so that timeout doesn't send a DeadlineExceeded error back to the client.
72+
// TODO(karlkfi): The watch server probably SHOULD send a DeadlineExceeded error, but historically there was a race condition between DeadlineExceeded & EOF.
73+
timeoutCtx, timeoutCancel := withOptionalTimeout(ctx, timeout)
74+
defer func() { timeoutCancel() }()
75+
// TODO: req = req.WithContext(ctx) IFF we use the same context
76+
6877
options, err := optionsForTransform(mediaTypeOptions, req)
6978
if err != nil {
7079
return nil, err
@@ -102,8 +111,6 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
102111
mediaType += ";stream=watch"
103112
}
104113

105-
ctx := req.Context()
106-
107114
// locate the appropriate embedded encoder based on the transform
108115
var negotiatedEncoder runtime.Encoder
109116
contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req)
@@ -153,13 +160,34 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
153160
}
154161

155162
var serverShuttingDownCh <-chan struct{}
156-
if signals := apirequest.ServerShutdownSignalFrom(req.Context()); signals != nil {
163+
if signals := apirequest.ServerShutdownSignalFrom(ctx); signals != nil {
157164
serverShuttingDownCh = signals.ShuttingDown()
158165
}
159166

167+
// Start watching the resource storage.
168+
doneCh := make(chan struct{})
169+
watcher, err := rw.Watch(ctx, watchOpts)
170+
if err != nil {
171+
utilruntime.HandleError(err)
172+
return nil, err
173+
}
174+
// Invalidate timeoutCancel() to defer until ServeHTTP() is done.
175+
deferredTimeoutCancel := timeoutCancel
176+
timeoutCancel = func() {}
177+
// Cleanup after ServeHTTP() is done.
178+
go func() {
179+
defer watcher.Stop()
180+
defer deferredTimeoutCancel()
181+
for range doneCh {
182+
}
183+
}()
184+
160185
server := &WatchServer{
161-
Watching: watcher,
162-
Scope: scope,
186+
RequestContext: ctx,
187+
TimeoutContext: timeoutCtx,
188+
Scope: scope,
189+
Watcher: watcher,
190+
DoneChannel: doneCh,
163191

164192
UseTextFraming: useTextFraming,
165193
MediaType: mediaType,
@@ -170,10 +198,9 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
170198
watchListTransformerFn: newWatchListTransformer(initialEventsListBlueprint, mediaTypeOptions.Convert, negotiatedEncoder).transform,
171199

172200
MemoryAllocator: memoryAllocator,
173-
TimeoutFactory: &realTimeoutFactory{timeout},
174201
ServerShuttingDownCh: serverShuttingDownCh,
175202

176-
metricsScope: metricsScope,
203+
metricsScope: metrics.CleanListScope(ctx, watchOpts),
177204
}
178205

179206
if wsstream.IsWebSocketRequest(req) {
@@ -183,11 +210,26 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
183210
return http.HandlerFunc(server.HandleHTTP), nil
184211
}
185212

213+
func withOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
214+
if timeout > 0 {
215+
return context.WithTimeout(parent, timeout)
216+
}
217+
return context.WithCancel(parent)
218+
}
219+
186220
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
187221
type WatchServer struct {
188-
Watching watch.Interface
189-
Scope *RequestScope
190-
222+
// RequestContext of the request
223+
RequestContext context.Context
224+
// TimeoutContext of the request
225+
TimeoutContext context.Context
226+
// Scope of the request
227+
Scope *RequestScope
228+
// Watcher of the resource storage
229+
Watcher watch.Interface
230+
// DoneChannel is closed by the handler before returning, to allow the
231+
// caller to clean up the WatchServer.
232+
DoneChannel chan<- struct{}
191233
// true if websocket messages should use text framing (as opposed to binary framing)
192234
UseTextFraming bool
193235
// the media type this watch is being served with
@@ -204,7 +246,6 @@ type WatchServer struct {
204246
watchListTransformerFn watchListTransformerFunction
205247

206248
MemoryAllocator runtime.MemoryAllocator
207-
TimeoutFactory TimeoutFactory
208249
ServerShuttingDownCh <-chan struct{}
209250

210251
metricsScope string
@@ -213,6 +254,7 @@ type WatchServer struct {
213254
// HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
214255
// or over a websocket connection.
215256
func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
257+
defer close(s.DoneChannel)
216258
defer func() {
217259
if s.MemoryAllocator != nil {
218260
runtime.AllocatorPool.Put(s.MemoryAllocator)
@@ -236,20 +278,25 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
236278
return
237279
}
238280

239-
// ensure the connection times out
240-
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
241-
defer cleanup()
281+
// Ensure the for loop stops when the context is done (cancel or timeout).
282+
ctx, cancel := context.WithCancel(s.RequestContext)
283+
// Ensure the watch encoder context is stopped when the handler returns.
284+
defer cancel()
242285

243286
// begin the stream
244287
w.Header().Set("Content-Type", s.MediaType)
245288
w.Header().Set("Transfer-Encoding", "chunked")
246289
w.WriteHeader(http.StatusOK)
247290
flusher.Flush()
248291

249-
kind := s.Scope.Kind
250-
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
251-
ch := s.Watching.ResultChan()
252-
done := req.Context().Done()
292+
gvk := s.Scope.Kind
293+
watchEncoder := newWatchEncoder(ctx, gvk, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
294+
295+
// Avoid calling Done & ResultChan multiple times,
296+
// to reduce locking, unlocking, and memory allocations.
297+
reqDoneCh := ctx.Done()
298+
timeoutCh := s.TimeoutContext.Done()
299+
resultCh := s.Watcher.ResultChan()
253300

254301
for {
255302
select {
@@ -262,16 +309,16 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
262309
// client(s) try to reestablish the WATCH on the other
263310
// available apiserver instance(s).
264311
return
265-
case <-done:
266-
return
267312
case <-timeoutCh:
268313
return
269-
case event, ok := <-ch:
314+
case <-reqDoneCh:
315+
return
316+
case event, ok := <-resultCh:
270317
if !ok {
271318
// End of results.
272319
return
273320
}
274-
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
321+
metrics.WatchEvents.WithContext(ctx).WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc()
275322
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event)
276323

277324
if err := watchEncoder.Encode(event); err != nil {
@@ -280,52 +327,60 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
280327
return
281328
}
282329

283-
if len(ch) == 0 {
330+
if len(resultCh) == 0 {
284331
flusher.Flush()
285332
}
286333
if isWatchListLatencyRecordingRequired {
287-
metrics.RecordWatchListLatency(req.Context(), s.Scope.Resource, s.metricsScope)
334+
metrics.RecordWatchListLatency(ctx, s.Scope.Resource, s.metricsScope)
288335
}
289336
}
290337
}
291338
}
292339

293340
// HandleWS serves a series of encoded events over a websocket connection.
294341
func (s *WatchServer) HandleWS(ws *websocket.Conn) {
342+
defer close(s.DoneChannel)
295343
defer func() {
296344
if s.MemoryAllocator != nil {
297345
runtime.AllocatorPool.Put(s.MemoryAllocator)
298346
}
299347
}()
300348

301349
defer ws.Close()
302-
done := make(chan struct{})
303-
// ensure the connection times out
304-
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
305-
defer cleanup()
350+
351+
// Ensure the for loop stops when the context is done (cancel or timeout).
352+
ctx, cancel := context.WithCancel(s.RequestContext)
353+
// Ensure the watch encoder context is stopped when the handler returns.
354+
defer cancel()
306355

307356
go func() {
308357
defer utilruntime.HandleCrash()
309-
// This blocks until the connection is closed.
310-
// Client should not send anything.
358+
// Block until client request is closed (EOF) or reading errors.
359+
// The watch client should not send the server anything after the
360+
// initial request, so it's safe to ignore incoming messages.
311361
wsstream.IgnoreReceives(ws, 0)
312-
// Once the client closes, we should also close
313-
close(done)
362+
// Signal done to stop writing response events
363+
cancel()
314364
}()
315365

316366
framer := newWebsocketFramer(ws, s.UseTextFraming)
317367

318-
kind := s.Scope.Kind
319-
watchEncoder := newWatchEncoder(context.TODO(), kind, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
320-
ch := s.Watching.ResultChan()
368+
gvk := s.Scope.Kind
369+
watchEncoder := newWatchEncoder(ctx, gvk, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
370+
371+
// Avoid calling Done & ResultChan multiple times,
372+
// to reduce locking, unlocking, and memory allocations.
373+
reqDoneCh := ctx.Done()
374+
timeoutCh := s.TimeoutContext.Done()
375+
resultCh := s.Watcher.ResultChan()
321376

322377
for {
323378
select {
324-
case <-done:
325-
return
326379
case <-timeoutCh:
327380
return
328-
case event, ok := <-ch:
381+
case <-reqDoneCh:
382+
return
383+
case event, ok := <-resultCh:
329384
if !ok {
330385
// End of results.
331386
return

0 commit comments

Comments
 (0)