Skip to content

[WIP] fix: watch client errors #131339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/util/net/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func IsProbableEOF(err error) bool {
return true
case msg == "http: can't write HTTP request on broken connection":
return true
case msg == "http: read on closed response body":
return true
case strings.Contains(msg, "http2: server sent GOAWAY and closed the connection"):
return true
case strings.Contains(msg, "connection reset by peer"):
Expand Down
187 changes: 146 additions & 41 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package handlers

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -27,8 +27,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -38,6 +39,7 @@ import (
apitesting "k8s.io/apiserver/pkg/endpoints/testing"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
utiltesting "k8s.io/client-go/util/testing"
)

// Fake API versions, similar to api/latest.go
Expand All @@ -61,6 +63,7 @@ func init() {
}

func TestWatchHTTPErrors(t *testing.T) {
ctx := t.Context()
watcher := watch.NewFake()
timeoutCh := make(chan time.Time)
doneCh := make(chan struct{})
Expand Down Expand Up @@ -88,19 +91,24 @@ func TestWatchHTTPErrors(t *testing.T) {
defer s.Close()

// Setup a client
dest, _ := url.Parse(s.URL)
dest, err := url.Parse(s.URL)
require.NoError(t, err)
dest.Path = "/" + namedGroupPrefix + "/" + testGroupV2.Group + "/" + testGroupV2.Version + "/simple"
dest.RawQuery = "watch=true"

req, _ := http.NewRequest(http.MethodGet, dest.String(), nil)
// Start watch request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, dest.String(), nil)
require.NoError(t, err)
client := http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
errStatus := errors.NewInternalError(fmt.Errorf("we got an error")).Status()
defer assertClosed(t, resp.Body)

// Send error to server from storage
errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status()
watcher.Error(&errStatus)
watcher.Stop()

// Make sure we can actually watch an endpoint
// Decode error from the response
decoder := json.NewDecoder(resp.Body)
var got watchJSON
err = decoder.Decode(&got)
Expand All @@ -121,12 +129,27 @@ func TestWatchHTTPErrors(t *testing.T) {
Details: errStatus.Details,
}
require.Equal(t, expectedStatus, status)

// Close the response body to signal the server to stop serving.
require.NoError(t, resp.Body.Close())

// Wait for the server to call the CancelFunc returned by
// TimeoutFactory.TimeoutCh, closing the done channel.
err = utiltesting.WaitForChannelToCloseWithTimeout(ctx, wait.ForeverTestTimeout, doneCh)
require.NoError(t, err)

// Wait for the server to call watcher.Stop, closing the result channel.
err = utiltesting.WaitForChannelToCloseWithTimeout(ctx, wait.ForeverTestTimeout, watcher.ResultChan())
require.NoError(t, err)

// Confirm watcher.Stop was called by the server.
require.Truef(t, watcher.IsStopped(),
"Leaked watcher goroutine after request done")
}

func TestWatchHTTPErrorsBeforeServe(t *testing.T) {
ctx := t.Context()
watcher := watch.NewFake()
timeoutCh := make(chan time.Time)
doneCh := make(chan struct{})

info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
if !ok || info.StreamSerializer == nil {
Expand All @@ -147,24 +170,29 @@ func TestWatchHTTPErrorsBeforeServe(t *testing.T) {
Encoder: testCodecV2,
EmbeddedEncoder: testCodecV2,

TimeoutFactory: &fakeTimeoutFactory{timeoutCh, doneCh},
// TimeoutFactory should not be needed, because the server should error
// before calling TimeoutFactory.TimeoutCh.
}

statusErr := errors.NewInternalError(fmt.Errorf("we got an error"))
statusErr := apierrors.NewInternalError(fmt.Errorf("we got an error"))
errStatus := statusErr.Status()

s := httptest.NewServer(serveWatch(watcher, watchServer, statusErr))
defer s.Close()

// Setup a client
dest, _ := url.Parse(s.URL)
dest, err := url.Parse(s.URL)
require.NoError(t, err)
dest.Path = "/" + namedGroupPrefix + "/" + testGroupV2.Group + "/" + testGroupV2.Version + "/simple"
dest.RawQuery = "watch=true"

req, _ := http.NewRequest(http.MethodGet, dest.String(), nil)
// Start watch request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, dest.String(), nil)
require.NoError(t, err)
client := http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
defer assertClosed(t, resp.Body)

// We had already got an error before watch serve started
decoder := json.NewDecoder(resp.Body)
Expand All @@ -184,15 +212,25 @@ func TestWatchHTTPErrorsBeforeServe(t *testing.T) {
}
require.Equal(t, expectedStatus, status)

// check for leaks
// Close the response body to signal the server to stop serving.
// This isn't strictly necessary, since the test serveWatch doesn't block,
// but it would be if this were the real watch server.
require.NoError(t, resp.Body.Close())

// Wait for the server to call watcher.Stop, closing the result channel.
err = utiltesting.WaitForChannelToCloseWithTimeout(ctx, wait.ForeverTestTimeout, watcher.ResultChan())
require.NoError(t, err)

// Confirm watcher.Stop was called by the server.
require.Truef(t, watcher.IsStopped(),
"Leaked watcher goruntine after request done")
"Leaked watcher goroutine after request done")
}

func TestWatchHTTPDynamicClientErrors(t *testing.T) {
ctx := t.Context()
watcher := watch.NewFake()
timeoutCh := make(chan time.Time)
done := make(chan struct{})
doneCh := make(chan struct{})

info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
if !ok || info.StreamSerializer == nil {
Expand All @@ -210,7 +248,7 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
Encoder: testCodecV2,
EmbeddedEncoder: testCodecV2,

TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
TimeoutFactory: &fakeTimeoutFactory{timeoutCh: timeoutCh, done: doneCh},
}

s := httptest.NewServer(serveWatch(watcher, watchServer, nil))
Expand All @@ -222,14 +260,30 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
APIPath: "/" + namedGroupPrefix,
}).Resource(testGroupV2.WithResource("simple"))

_, err := client.Watch(context.TODO(), metav1.ListOptions{})
_, err := client.Watch(ctx, metav1.ListOptions{})
require.Equal(t, runtime.NegotiateError{Stream: true, ContentType: "testcase/json"}, err)

// The client should automatically close the connection on error.

// Wait for the server to call the CancelFunc returned by
// TimeoutFactory.TimeoutCh, closing the done channel.
err = utiltesting.WaitForChannelToCloseWithTimeout(ctx, wait.ForeverTestTimeout, doneCh)
require.NoError(t, err)

// Wait for the server to call watcher.Stop, closing the result channel.
err = utiltesting.WaitForChannelToCloseWithTimeout(ctx, wait.ForeverTestTimeout, watcher.ResultChan())
require.NoError(t, err)

// Confirm watcher.Stop was called by the server.
require.Truef(t, watcher.IsStopped(),
"Leaked watcher goroutine after request done")
}

func TestWatchHTTPTimeout(t *testing.T) {
ctx := t.Context()
watcher := watch.NewFake()
timeoutCh := make(chan time.Time)
done := make(chan struct{})
doneCh := make(chan struct{})

info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
if !ok || info.StreamSerializer == nil {
Expand All @@ -247,21 +301,27 @@ func TestWatchHTTPTimeout(t *testing.T) {
Encoder: testCodecV2,
EmbeddedEncoder: testCodecV2,

TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
TimeoutFactory: &fakeTimeoutFactory{timeoutCh: timeoutCh, done: doneCh},
}

s := httptest.NewServer(serveWatch(watcher, watchServer, nil))
defer s.Close()

// Setup a client
dest, _ := url.Parse(s.URL)
dest, err := url.Parse(s.URL)
require.NoError(t, err)
dest.Path = "/" + namedGroupPrefix + "/" + testGroupV2.Group + "/" + testGroupV2.Version + "/simple"
dest.RawQuery = "watch=true"

req, _ := http.NewRequest(http.MethodGet, dest.String(), nil)
// Start watch request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, dest.String(), nil)
require.NoError(t, err)
client := http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
defer assertClosed(t, resp.Body)

// Send object added event to server from storage
watcher.Add(&apitesting.Simple{TypeMeta: metav1.TypeMeta{APIVersion: testGroupV2.String()}})

// Make sure we can actually watch an endpoint
Expand All @@ -270,29 +330,28 @@ func TestWatchHTTPTimeout(t *testing.T) {
err = decoder.Decode(&got)
require.NoError(t, err)

// Timeout and check for leaks
// Trigger server-side timeout.
close(timeoutCh)
select {
case <-done:
eventCh := watcher.ResultChan()
select {
case _, opened := <-eventCh:
if opened {
t.Errorf("Watcher received unexpected event")
}
if !watcher.IsStopped() {
t.Errorf("Watcher is not stopped")
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Leaked watch on timeout")
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Failed to stop watcher after %s of timeout signal", wait.ForeverTestTimeout.String())
}

// Make sure we can't receive any more events through the timeout watch
// Wait for the server to call the CancelFunc returned by
// TimeoutFactory.TimeoutCh, closing the done channel.
err = utiltesting.WaitForChannelToCloseWithTimeout(ctx, wait.ForeverTestTimeout, doneCh)
require.NoError(t, err)

// Wait for the server to call watcher.Stop, closing the result channel.
err = utiltesting.WaitForChannelToCloseWithTimeout(ctx, wait.ForeverTestTimeout, watcher.ResultChan())
require.NoError(t, err)

// Confirm watcher.Stop was called by the server.
require.Truef(t, watcher.IsStopped(),
"Leaked watcher goroutine after request done")

// Make sure we can't receive any more events after the watch timeout
err = decoder.Decode(&got)
require.Equal(t, io.EOF, err)

// Close the response body to clean up watch client resources.
require.NoError(t, resp.Body.Close())
}

// watchJSON defines the expected JSON wire equivalent of watch.Event.
Expand Down Expand Up @@ -330,3 +389,49 @@ func serveWatch(watcher watch.Interface, watchServer *WatchServer, preServeErr e
watchServer.HandleHTTP(w, req)
}
}

// From https://github.com/golang/go/blob/go1.20/src/net/http/transport.go#L2779
var errReadOnClosedResBody = errors.New("http: read on closed response body")

// assertClosed fails the test if the ReadCloser is NOT already closed.
// If not already closed, the ReadCloser will be drained and closed.
// Defer when your test is expected to close the ReadCloser before ending.
func assertClosed(t *testing.T, rc io.ReadCloser) {
assert.Equal(t, errReadOnClosedResBody, drainAndClose(rc))
}

// drainAndClose reads from the ReadCloser until EOF, discarding the content,
// and closes the ReadCloser when finished or on error.
// Returns an error when either Read or Close error. If both error, the errors
// are joined and returned.
//
// In a defer from a test, use with t.Error or assert.NoError, NOT t.Fatal or
// require.NoError.
func drainAndClose(rc io.ReadCloser) error {
errCh := make(chan error)
go func() {
// Close after done reading
defer func() {
defer close(errCh)
if err := rc.Close(); err != nil {
errCh <- err
}
}()
// Read until EOF and discard
if _, err := io.Copy(io.Discard, rc); err != nil {
errCh <- err
}
}()

// Wait until Read and Close are both done.
// Combine errors, if multiple.
var multiErr error
for err := range errCh {
if multiErr != nil {
multiErr = errors.Join(multiErr, err)
} else {
multiErr = err
}
}
return multiErr
}
Loading