Skip to content

Commit ea49e94

Browse files
committed
Fix client watch reestablishment handling of client-side timeouts
1 parent 906513a commit ea49e94

File tree

9 files changed

+148
-10
lines changed

9 files changed

+148
-10
lines changed

staging/src/k8s.io/apimachinery/pkg/util/net/http.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ func JoinPreservingTrailingSlash(elem ...string) string {
5555
return result
5656
}
5757

58+
// IsTimeout returns true if the given error is a network timeout error
59+
func IsTimeout(err error) bool {
60+
neterr, ok := err.(net.Error)
61+
return ok && neterr != nil && neterr.Timeout()
62+
}
63+
5864
// IsProbableEOF returns true if the given error resembles a connection termination
5965
// scenario that would justify assuming that the watch is empty.
6066
// These errors are what the Go http stack returns back to us which are general

staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (sw *StreamWatcher) receive() {
113113
case io.ErrUnexpectedEOF:
114114
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
115115
default:
116-
if net.IsProbableEOF(err) {
116+
if net.IsProbableEOF(err) || net.IsTimeout(err) {
117117
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
118118
} else {
119119
sw.result <- Event{

staging/src/k8s.io/client-go/rest/request.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
655655
if err != nil {
656656
// The watch stream mechanism handles many common partial data errors, so closed
657657
// connections can be retried in many cases.
658-
if net.IsProbableEOF(err) {
658+
if net.IsProbableEOF(err) || net.IsTimeout(err) {
659659
return watch.NewEmptyWatch(), nil
660660
}
661661
return nil, err

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
364364
AllowWatchBookmarks: true,
365365
}
366366

367+
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
368+
start := r.clock.Now()
367369
w, err := r.listerWatcher.Watch(options)
368370
if err != nil {
369371
switch {
@@ -390,7 +392,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
390392
return nil
391393
}
392394

393-
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
395+
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
394396
if err != errorStopRequested {
395397
switch {
396398
case isExpiredError(err):
@@ -417,8 +419,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
417419
}
418420

419421
// watchHandler watches w and keeps *resourceVersion up to date.
420-
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
421-
start := r.clock.Now()
422+
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
422423
eventCount := 0
423424

424425
// Stopping the watcher should be idempotent and if we return from this function there's no way

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
136136
fw.Stop()
137137
}()
138138
var resumeRV string
139-
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
139+
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
140140
if err == nil {
141141
t.Errorf("unexpected non-error")
142142
}
@@ -156,7 +156,7 @@ func TestReflectorWatchHandler(t *testing.T) {
156156
fw.Stop()
157157
}()
158158
var resumeRV string
159-
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
159+
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
160160
if err != nil {
161161
t.Errorf("unexpected error %v", err)
162162
}
@@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
205205
var resumeRV string
206206
stopWatch := make(chan struct{}, 1)
207207
stopWatch <- struct{}{}
208-
err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
208+
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch)
209209
if err != errorStopRequested {
210210
t.Errorf("expected stop error, got %q", err)
211211
}

staging/src/k8s.io/client-go/tools/watch/retrywatcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
120120

121121
default:
122122
msg := "Watch failed: %v"
123-
if net.IsProbableEOF(err) {
123+
if net.IsProbableEOF(err) || net.IsTimeout(err) {
124124
klog.V(5).Infof(msg, err)
125125
// Retry
126126
return false, 0

test/cmd/request-timeout.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ run_kubectl_request_timeout_tests() {
3838
kube::test::if_has_string "${output_message}" 'valid-pod'
3939

4040
## check --request-timeout on 'get pod' with --watch
41-
output_message=$(kubectl get pod valid-pod --request-timeout=1 --watch 2>&1)
41+
output_message=$(kubectl get pod valid-pod --request-timeout=1 --watch --v=5 2>&1)
4242
kube::test::if_has_string "${output_message}" 'Timeout exceeded while reading body'
4343

4444
## check --request-timeout value with no time unit

test/integration/apimachinery/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_test(
55
srcs = [
66
"main_test.go",
77
"watch_restart_test.go",
8+
"watch_timeout_test.go",
89
],
910
tags = [
1011
"etcd",
@@ -22,6 +23,7 @@ go_test(
2223
"//staging/src/k8s.io/client-go/rest:go_default_library",
2324
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
2425
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
26+
"//staging/src/k8s.io/kubectl/pkg/proxy:go_default_library",
2527
"//test/integration/framework:go_default_library",
2628
],
2729
)
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package apimachinery
18+
19+
import (
20+
"context"
21+
"net/http/httptest"
22+
"net/http/httputil"
23+
"net/url"
24+
"testing"
25+
"time"
26+
27+
corev1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/runtime"
30+
"k8s.io/apimachinery/pkg/watch"
31+
"k8s.io/client-go/kubernetes"
32+
restclient "k8s.io/client-go/rest"
33+
"k8s.io/client-go/tools/cache"
34+
kubectlproxy "k8s.io/kubectl/pkg/proxy"
35+
"k8s.io/kubernetes/test/integration/framework"
36+
)
37+
38+
func TestWatchClientTimeout(t *testing.T) {
39+
masterConfig := framework.NewIntegrationTestMasterConfig()
40+
_, s, closeFn := framework.RunAMaster(masterConfig)
41+
defer closeFn()
42+
43+
t.Run("direct", func(t *testing.T) {
44+
t.Logf("client at %s", s.URL)
45+
testWatchClientTimeouts(t, s.URL)
46+
})
47+
48+
t.Run("reverse proxy", func(t *testing.T) {
49+
u, _ := url.Parse(s.URL)
50+
proxy := httputil.NewSingleHostReverseProxy(u)
51+
proxy.FlushInterval = -1
52+
proxyServer := httptest.NewServer(httputil.NewSingleHostReverseProxy(u))
53+
defer proxyServer.Close()
54+
55+
t.Logf("client to %s, backend at %s", proxyServer.URL, s.URL)
56+
testWatchClientTimeouts(t, proxyServer.URL)
57+
})
58+
59+
t.Run("kubectl proxy", func(t *testing.T) {
60+
kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, &restclient.Config{Host: s.URL, Timeout: 2 * time.Second}, 0)
61+
if err != nil {
62+
t.Fatal(err)
63+
}
64+
kubectlProxyListener, err := kubectlProxyServer.Listen("", 0)
65+
if err != nil {
66+
t.Fatal(err)
67+
}
68+
defer kubectlProxyListener.Close()
69+
go kubectlProxyServer.ServeOnListener(kubectlProxyListener)
70+
71+
t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), s.URL)
72+
testWatchClientTimeouts(t, "http://"+kubectlProxyListener.Addr().String())
73+
})
74+
}
75+
76+
func testWatchClientTimeouts(t *testing.T, url string) {
77+
t.Run("timeout", func(t *testing.T) {
78+
testWatchClientTimeout(t, url, time.Second, 0)
79+
})
80+
t.Run("timeoutSeconds", func(t *testing.T) {
81+
testWatchClientTimeout(t, url, 0, time.Second)
82+
})
83+
t.Run("timeout+timeoutSeconds", func(t *testing.T) {
84+
testWatchClientTimeout(t, url, time.Second, time.Second)
85+
})
86+
}
87+
88+
func testWatchClientTimeout(t *testing.T, serverURL string, timeout, timeoutSeconds time.Duration) {
89+
// client
90+
client, err := kubernetes.NewForConfig(&restclient.Config{Host: serverURL, Timeout: timeout})
91+
if err != nil {
92+
t.Fatal(err)
93+
}
94+
95+
listCount := 0
96+
watchCount := 0
97+
stopCh := make(chan struct{})
98+
listWatch := &cache.ListWatch{
99+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
100+
t.Logf("listing (version=%s continue=%s)", options.ResourceVersion, options.Continue)
101+
listCount++
102+
if listCount > 1 {
103+
t.Errorf("listed more than once")
104+
close(stopCh)
105+
}
106+
return client.CoreV1().ConfigMaps(metav1.NamespaceAll).List(context.TODO(), options)
107+
},
108+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
109+
t.Logf("watching (version=%s)", options.ResourceVersion)
110+
if timeoutSeconds != 0 {
111+
timeout := int64(timeoutSeconds / time.Second)
112+
options.TimeoutSeconds = &timeout
113+
}
114+
watchCount++
115+
if watchCount > 1 {
116+
// success, restarted watch
117+
close(stopCh)
118+
}
119+
return client.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), options)
120+
},
121+
}
122+
_, informer := cache.NewIndexerInformer(listWatch, &corev1.ConfigMap{}, 30*time.Minute, cache.ResourceEventHandlerFuncs{}, cache.Indexers{})
123+
informer.Run(stopCh)
124+
select {
125+
case <-stopCh:
126+
case <-time.After(time.Minute):
127+
t.Fatal("timeout")
128+
}
129+
}

0 commit comments

Comments
 (0)