@@ -26,6 +26,7 @@ import (
26
26
"golang.org/x/net/websocket"
27
27
28
28
"k8s.io/apimachinery/pkg/api/errors"
29
+ metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
29
30
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
31
"k8s.io/apimachinery/pkg/runtime"
31
32
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
@@ -35,6 +36,7 @@ import (
35
36
"k8s.io/apiserver/pkg/endpoints/metrics"
36
37
apirequest "k8s.io/apiserver/pkg/endpoints/request"
37
38
"k8s.io/apiserver/pkg/features"
39
+ "k8s.io/apiserver/pkg/registry/rest"
38
40
"k8s.io/apiserver/pkg/storage"
39
41
utilfeature "k8s.io/apiserver/pkg/util/feature"
40
42
)
@@ -64,7 +66,14 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
64
66
65
67
// serveWatchHandler returns a handle to serve a watch response.
66
68
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
67
- func serveWatchHandler (watcher watch.Interface , scope * RequestScope , mediaTypeOptions negotiation.MediaTypeOptions , req * http.Request , w http.ResponseWriter , timeout time.Duration , metricsScope string , initialEventsListBlueprint runtime.Object ) (http.Handler , error ) {
69
+ func serveWatchHandler (ctx context.Context , req * http.Request , w http.ResponseWriter , rw rest.Watcher , watchOpts * metainternalversion.ListOptions , scope * RequestScope , mediaTypeOptions negotiation.MediaTypeOptions , timeout time.Duration , initialEventsListBlueprint runtime.Object ) (http.Handler , error ) {
70
+ // Start the server-side request timeout clock now.
71
+ // Use a separate context so that timeout doesn't send a DeadlineExceeded error back to the client.
72
+ // TODO(karlkfi): The watch server probably SHOULD send a DeadlineExceeded error, but historically there was a race condition between DeadlineExceeded & EOF.
73
+ timeoutCtx , timeoutCancel := withOptionalTimeout (ctx , timeout )
74
+ defer func () { timeoutCancel () }()
75
+ // TODO: req = req.WithContext(ctx) IFF we use the same context
76
+
68
77
options , err := optionsForTransform (mediaTypeOptions , req )
69
78
if err != nil {
70
79
return nil , err
@@ -102,8 +111,6 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
102
111
mediaType += ";stream=watch"
103
112
}
104
113
105
- ctx := req .Context ()
106
-
107
114
// locate the appropriate embedded encoder based on the transform
108
115
var negotiatedEncoder runtime.Encoder
109
116
contentKind , contentSerializer , transform := targetEncodingForTransform (scope , mediaTypeOptions , req )
@@ -153,13 +160,34 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
153
160
}
154
161
155
162
var serverShuttingDownCh <- chan struct {}
156
- if signals := apirequest .ServerShutdownSignalFrom (req . Context () ); signals != nil {
163
+ if signals := apirequest .ServerShutdownSignalFrom (ctx ); signals != nil {
157
164
serverShuttingDownCh = signals .ShuttingDown ()
158
165
}
159
166
167
+ // Start watching the resource storage.
168
+ doneCh := make (chan struct {})
169
+ watcher , err := rw .Watch (ctx , watchOpts )
170
+ if err != nil {
171
+ utilruntime .HandleError (err )
172
+ return nil , err
173
+ }
174
+ // Invalidate timeoutCancel() to defer until ServeHTTP() is done.
175
+ deferredTimeoutCancel := timeoutCancel
176
+ timeoutCancel = func () {}
177
+ // Cleanup after ServeHTTP() is done.
178
+ go func () {
179
+ defer watcher .Stop ()
180
+ defer deferredTimeoutCancel ()
181
+ for range doneCh {
182
+ }
183
+ }()
184
+
160
185
server := & WatchServer {
161
- Watching : watcher ,
162
- Scope : scope ,
186
+ RequestContext : ctx ,
187
+ TimeoutContext : timeoutCtx ,
188
+ Scope : scope ,
189
+ Watcher : watcher ,
190
+ DoneChannel : doneCh ,
163
191
164
192
UseTextFraming : useTextFraming ,
165
193
MediaType : mediaType ,
@@ -170,10 +198,9 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
170
198
watchListTransformerFn : newWatchListTransformer (initialEventsListBlueprint , mediaTypeOptions .Convert , negotiatedEncoder ).transform ,
171
199
172
200
MemoryAllocator : memoryAllocator ,
173
- TimeoutFactory : & realTimeoutFactory {timeout },
174
201
ServerShuttingDownCh : serverShuttingDownCh ,
175
202
176
- metricsScope : metricsScope ,
203
+ metricsScope : metrics . CleanListScope ( ctx , watchOpts ) ,
177
204
}
178
205
179
206
if wsstream .IsWebSocketRequest (req ) {
@@ -183,11 +210,26 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
183
210
return http .HandlerFunc (server .HandleHTTP ), nil
184
211
}
185
212
213
+ func withOptionalTimeout (parent context.Context , timeout time.Duration ) (context.Context , context.CancelFunc ) {
214
+ if timeout > 0 {
215
+ return context .WithTimeout (parent , timeout )
216
+ }
217
+ return context .WithCancel (parent )
218
+ }
219
+
186
220
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
187
221
type WatchServer struct {
188
- Watching watch.Interface
189
- Scope * RequestScope
190
-
222
+ // RequestContext of the request
223
+ RequestContext context.Context
224
+ // TimeoutContext of the request
225
+ TimeoutContext context.Context
226
+ // Scope of the request
227
+ Scope * RequestScope
228
+ // Watcher of the resource storage
229
+ Watcher watch.Interface
230
+ // DoneChannel is closed by the handler before returning, to allow the
231
+ // caller to clean up the WatchServer.
232
+ DoneChannel chan <- struct {}
191
233
// true if websocket messages should use text framing (as opposed to binary framing)
192
234
UseTextFraming bool
193
235
// the media type this watch is being served with
@@ -204,7 +246,6 @@ type WatchServer struct {
204
246
watchListTransformerFn watchListTransformerFunction
205
247
206
248
MemoryAllocator runtime.MemoryAllocator
207
- TimeoutFactory TimeoutFactory
208
249
ServerShuttingDownCh <- chan struct {}
209
250
210
251
metricsScope string
@@ -213,6 +254,7 @@ type WatchServer struct {
213
254
// HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
214
255
// or over a websocket connection.
215
256
func (s * WatchServer ) HandleHTTP (w http.ResponseWriter , req * http.Request ) {
257
+ defer close (s .DoneChannel )
216
258
defer func () {
217
259
if s .MemoryAllocator != nil {
218
260
runtime .AllocatorPool .Put (s .MemoryAllocator )
@@ -236,20 +278,25 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
236
278
return
237
279
}
238
280
239
- // ensure the connection times out
240
- timeoutCh , cleanup := s .TimeoutFactory .TimeoutCh ()
241
- defer cleanup ()
281
+ // Ensure the for loop stops when the context is done (cancel or timeout).
282
+ ctx , cancel := context .WithCancel (s .RequestContext )
283
+ // Ensure the watch encoder context is stopped when the handler returns.
284
+ defer cancel ()
242
285
243
286
// begin the stream
244
287
w .Header ().Set ("Content-Type" , s .MediaType )
245
288
w .Header ().Set ("Transfer-Encoding" , "chunked" )
246
289
w .WriteHeader (http .StatusOK )
247
290
flusher .Flush ()
248
291
249
- kind := s .Scope .Kind
250
- watchEncoder := newWatchEncoder (req .Context (), kind , s .EmbeddedEncoder , s .Encoder , framer , s .watchListTransformerFn )
251
- ch := s .Watching .ResultChan ()
252
- done := req .Context ().Done ()
292
+ gvk := s .Scope .Kind
293
+ watchEncoder := newWatchEncoder (ctx , gvk , s .EmbeddedEncoder , s .Encoder , framer , s .watchListTransformerFn )
294
+
295
+ // Avoid calling Done & ResultChan multiple times,
296
+ // to reduce locking, unlocking, and memory allocations.
297
+ reqDoneCh := ctx .Done ()
298
+ timeoutCh := s .TimeoutContext .Done ()
299
+ resultCh := s .Watcher .ResultChan ()
253
300
254
301
for {
255
302
select {
@@ -262,16 +309,16 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
262
309
// client(s) try to reestablish the WATCH on the other
263
310
// available apiserver instance(s).
264
311
return
265
- case <- done :
266
- return
267
312
case <- timeoutCh :
268
313
return
269
- case event , ok := <- ch :
314
+ case <- reqDoneCh :
315
+ return
316
+ case event , ok := <- resultCh :
270
317
if ! ok {
271
318
// End of results.
272
319
return
273
320
}
274
- metrics .WatchEvents .WithContext (req . Context ()) .WithLabelValues (kind .Group , kind .Version , kind .Kind ).Inc ()
321
+ metrics .WatchEvents .WithContext (ctx ) .WithLabelValues (gvk .Group , gvk .Version , gvk .Kind ).Inc ()
275
322
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency (event )
276
323
277
324
if err := watchEncoder .Encode (event ); err != nil {
@@ -280,52 +327,60 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
280
327
return
281
328
}
282
329
283
- if len (ch ) == 0 {
330
+ if len (resultCh ) == 0 {
284
331
flusher .Flush ()
285
332
}
286
333
if isWatchListLatencyRecordingRequired {
287
- metrics .RecordWatchListLatency (req . Context () , s .Scope .Resource , s .metricsScope )
334
+ metrics .RecordWatchListLatency (ctx , s .Scope .Resource , s .metricsScope )
288
335
}
289
336
}
290
337
}
291
338
}
292
339
293
340
// HandleWS serves a series of encoded events over a websocket connection.
294
341
func (s * WatchServer ) HandleWS (ws * websocket.Conn ) {
342
+ defer close (s .DoneChannel )
295
343
defer func () {
296
344
if s .MemoryAllocator != nil {
297
345
runtime .AllocatorPool .Put (s .MemoryAllocator )
298
346
}
299
347
}()
300
348
301
349
defer ws .Close ()
302
- done := make (chan struct {})
303
- // ensure the connection times out
304
- timeoutCh , cleanup := s .TimeoutFactory .TimeoutCh ()
305
- defer cleanup ()
350
+
351
+ // Ensure the for loop stops when the context is done (cancel or timeout).
352
+ ctx , cancel := context .WithCancel (s .RequestContext )
353
+ // Ensure the watch encoder context is stopped when the handler returns.
354
+ defer cancel ()
306
355
307
356
go func () {
308
357
defer utilruntime .HandleCrash ()
309
- // This blocks until the connection is closed.
310
- // Client should not send anything.
358
+ // Block until client request is closed (EOF) or reading errors.
359
+ // The watch client should not send the server anything after the
360
+ // initial request, so it's safe to ignore incoming messages.
311
361
wsstream .IgnoreReceives (ws , 0 )
312
- // Once the client closes, we should also close
313
- close ( done )
362
+ // Signal done to stop writing response events
363
+ cancel ( )
314
364
}()
315
365
316
366
framer := newWebsocketFramer (ws , s .UseTextFraming )
317
367
318
- kind := s .Scope .Kind
319
- watchEncoder := newWatchEncoder (context .TODO (), kind , s .EmbeddedEncoder , s .Encoder , framer , s .watchListTransformerFn )
320
- ch := s .Watching .ResultChan ()
368
+ gvk := s .Scope .Kind
369
+ watchEncoder := newWatchEncoder (ctx , gvk , s .EmbeddedEncoder , s .Encoder , framer , s .watchListTransformerFn )
370
+
371
+ // Avoid calling Done & ResultChan multiple times,
372
+ // to reduce locking, unlocking, and memory allocations.
373
+ reqDoneCh := ctx .Done ()
374
+ timeoutCh := s .TimeoutContext .Done ()
375
+ resultCh := s .Watcher .ResultChan ()
321
376
322
377
for {
323
378
select {
324
- case <- done :
325
- return
326
379
case <- timeoutCh :
327
380
return
328
- case event , ok := <- ch :
381
+ case <- reqDoneCh :
382
+ return
383
+ case event , ok := <- resultCh :
329
384
if ! ok {
330
385
// End of results.
331
386
return
0 commit comments