-
Notifications
You must be signed in to change notification settings - Fork 41.1k
client-go watch: context support #129341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client-go watch: context support #129341
Conversation
default: | ||
close(rw.stopChan) | ||
} | ||
rw.cancel(errors.New("asked to stop")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
The Lister and Watcher interfaces only supported methods without context, but were typically implemented with client-go API calls which need a context. New interfaces get added using the same approach as in kubernetes#129109.
For compatibility reasons, the old functions without the ctx parameter still get generated, now with context.Background instead of context.TODO. In practice that code won't be used by the client-go reflector code because it prefers the *WithContext functions, but it cannot be ruled out that some other code only supports the old fields.
ba2d90b
to
8cc74e8
Compare
} | ||
|
||
// NewStreamWatcherWithLogger creates a StreamWatcher from the given decoder and logger. | ||
func NewStreamWatcherWithLogger(logger klog.Logger, d Decoder, r Reporter) *StreamWatcher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to avoid a situation where we have *WithLogger
and then *WithContext
like we have in
// HandleCrash simply catches a crash and logs an error. Meant to be called via | |
// defer. Additional context-specific handlers can be provided, and will be | |
// called in case of panic. HandleCrash actually crashes, after calling the | |
// handlers and logging the panic message. | |
// | |
// E.g., you can provide one or more additional handlers for something like shutting down go routines gracefully. | |
// | |
// Contextual logging: HandleCrashWithContext or HandleCrashWithLogger should be used instead of HandleCrash in code which supports contextual logging. | |
func HandleCrash(additionalHandlers ...func(interface{})) { | |
if r := recover(); r != nil { | |
additionalHandlersWithContext := make([]func(context.Context, interface{}), len(additionalHandlers)) | |
for i, handler := range additionalHandlers { | |
handler := handler // capture loop variable | |
additionalHandlersWithContext[i] = func(_ context.Context, r interface{}) { | |
handler(r) | |
} | |
} | |
handleCrash(context.Background(), r, additionalHandlersWithContext...) | |
} | |
} | |
// HandleCrashWithContext simply catches a crash and logs an error. Meant to be called via | |
// defer. Additional context-specific handlers can be provided, and will be | |
// called in case of panic. HandleCrash actually crashes, after calling the | |
// handlers and logging the panic message. | |
// | |
// E.g., you can provide one or more additional handlers for something like shutting down go routines gracefully. | |
// | |
// The context is used to determine how to log. | |
func HandleCrashWithContext(ctx context.Context, additionalHandlers ...func(context.Context, interface{})) { | |
if r := recover(); r != nil { | |
handleCrash(ctx, r, additionalHandlers...) | |
} | |
} | |
// HandleCrashWithLogger simply catches a crash and logs an error. Meant to be called via | |
// defer. Additional context-specific handlers can be provided, and will be | |
// called in case of panic. HandleCrash actually crashes, after calling the | |
// handlers and logging the panic message. | |
// | |
// E.g., you can provide one or more additional handlers for something like shutting down go routines gracefully. | |
func HandleCrashWithLogger(logger klog.Logger, additionalHandlers ...func(context.Context, interface{})) { | |
if r := recover(); r != nil { | |
ctx := klog.NewContext(context.Background(), logger) | |
handleCrash(ctx, r, additionalHandlers...) | |
} | |
} |
StreamWatcher
has a done
channel, which we allow users to invoke through Stop
method. So the question is do we want to just pass a context and wire it such that both Stop
and ctx.Done
are equivalent? We've done something similar in kubernetes/staging/src/k8s.io/client-go/tools/cache/controller.go
Lines 143 to 148 in 07cc230
func (c *controller) Run(stopCh <-chan struct{}) { | |
c.RunWithContext(wait.ContextForChannel(stopCh)) | |
} | |
// RunWithContext implements [Controller.RunWithContext]. | |
func (c *controller) RunWithContext(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A NewStreamWatcherWithContext
could use a similar construct as NewIndexerInformerWatcherWithContext
in this PR to map context cancellation to Stop
. It definitely would be more consistent and IMHO useful.
I don't remember any particular reason for deciding not to do that also here. I'll change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, such a change then begs the question: should the context that is getting passed to this NewStreamWatcherWithContext
be the same context that is passed to Request.Watch
?
kubernetes/staging/src/k8s.io/client-go/rest/request.go
Lines 756 to 761 in 07cc230
// Watch attempts to begin watching the requested location. | |
// Returns a watch.Interface, or an error. | |
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { | |
w, _, e := r.watchInternal(ctx) | |
return w, e | |
} |
The call chain is Watch -> watchInternal -> newStreamWatcher -> NewStreamWatcherWithLogger
.
That would be a functional change of Watch
with regards to how it uses the context that it is being passed. Someone might currently pass a short-lived context that gets canceled after Watch
returns, which then breaks the returned StreamWatcher.
So newStreamWatcher
would have to call NewStreamWatcherWithContext
with klog.NewContext(context.Background(), klog.FromContext(ctx))
. Doable, but more complicated.
This problem does not exist for NewIndexerInformerWatcherWithContext
in our code base because where the PR passes the parent's context the watcher is expected to stop before the caller returns:
kubernetes/staging/src/k8s.io/client-go/tools/watch/until.go
Lines 128 to 135 in 07cc230
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { | |
indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) | |
// We need to wait for the internal informers to fully stop so it's easier to reason about | |
// and it works with non-thread safe clients. | |
defer func() { <-done }() | |
// Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and | |
// let UntilWithoutRetry to stop it | |
defer watcher.Stop() |
Should NewIndexerInformerWatcherWithContext
perhaps get replaced by a simpler NewIndexerInformerWatcherWithLogger
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watch is and always will be long running function, even the doc says on Watch method says:
// Watch attempts to begin watching the requested location.
so it's a begin, not a short-lived request. So I'm inclined to say it's not a functional change, but maybe that's a question for sig-apimachinery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't say anything about the context, though. Usually, contexts are only passed through functions call, so my expectation as a Go developer is that the context is only used while the Watch
call runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, code might set a timeout like this to stop the attempt to set up the watch:
ctx, cancel := context.Timeout(time.Minute)
defer cancel()
return req.Watch(ctx)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should NewIndexerInformerWatcherWithContext perhaps get replaced by a simpler NewIndexerInformerWatcherWithLogger?
I convinced myself that this is the right way to go and therefore I included it in this PR as an additional commit. I can squash or remove it, depending on how we decide to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't heard from sig-apimachinery, but given we're early in 1.33 cycle, I don't want to block the work here. So we'll continue the discussion on slack but this can move forward
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you LGTM again? My last commit merely updated the unit test for the no longer supported context cancellation instead of removing it completely - fixed. Sorry, should have checked test results before pinging you!
return &FakeWatcher{ | ||
result: make(chan Event, size), | ||
logger: ptr.Deref(options.Logger, klog.Background()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical comment as the other wrt passing a context rather than logger here, which you can then wire to stopping the watcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
LGTM label has been added. Git tree hash: 5e2caa3eef455cd5a9a7031a900043e772997756
|
91f8403
to
1a8d8c9
Compare
@pohly: The following tests failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
The ability to automatically stop on context cancellation was new functionality that adds complexity and wasn't really used in Kubernetes. If someone wants this, they can add it outside of the function. A *WithLogger variant avoids the complexity and is consistent with NewStreamWatcherWithLogger over in apimachinery.
/lgtm |
LGTM label has been added. Git tree hash: c2fed5f35b3b68e3612ea556f7dec32119303cf9
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: pohly, soltysh, sttts The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
kubernetes/staging/src/k8s.io/client-go/tools/cache/listwatch.go Lines 134 to 148 in 5be7941
I noticed a slight inconsistency in naming conventions while reviewing the context-aware interfaces:
I sent a PR for fix this #131529 |
What type of PR is this?
/kind cleanup
What this PR does / why we need it:
The Lister and Watcher interfaces only supported methods without context, but
were typically implemented with client-go API calls which need a context. New
interfaces get added using the same approach as in
#129109.
For StreamWatcher and RaceFreeFakeWatcher it was easier to add
WithLogger
configuration methods which override the default global logger. The downside of
this approach is that it cannot be enforced by logcheck. This is okay for
Kubernetes in this case because all existing usages of those two types are
local to client-go and get updated immediately.
Which issue(s) this PR fixes:
Part of #129125
Special notes for your reviewer:
Generated informers: for compatibility reasons, the old functions without the ctx parameter still
get generated, now with context.Background instead of context.TODO. In practice
that code won't be used by the client-go reflector code because it prefers
the *WithContext functions, but it cannot be ruled out that some other code
only supports the old fields.
Does this PR introduce a user-facing change?
/wg structured-logging