-
Notifications
You must be signed in to change notification settings - Fork 874
feat(coderd/notifications): notify pubsub on enqueue #17412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
728981f
to
1d28ccf
Compare
eeedb44
to
401ec20
Compare
b174ea4
to
325471d
Compare
|
||
// Periodically trigger the processing loop. | ||
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { | ||
defer enqueueEventsThisLoop.Store(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer enqueueEventsThisLoop.Store(0) | |
// Reset the enqueue counter after each tick. | |
defer enqueueEventsThisLoop.Store(0) |
loopTick <- c | ||
// Wait for the processing to finish before continuing. The ticker will | ||
// compensate for the time it takes to process the messages. | ||
<-c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we should select
on this and the context so we can bail early on cancelation
coderd/notifications/notifier.go
Outdated
} | ||
|
||
if ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: happy path should be unindented.
coderd/notifications/notifier.go
Outdated
enqueued := enqueueEventsThisLoop.Add(1) | ||
skipEarlyDispatch := enqueued > 1 | ||
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued)) | ||
if enqueued > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if enqueued > 1 { | |
if skipEarlyDispatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment here explaining why we bail out in this case.
coderd/notifications/notifier.go
Outdated
if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) { | ||
enqueued := enqueueEventsThisLoop.Add(1) | ||
skipEarlyDispatch := enqueued > 1 | ||
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will get pretty noisy; maybe this was just for your testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this was mainly for testing. I'll remove it.
loopDone := make(chan struct{}) | ||
go func() { | ||
defer close(loopDone) | ||
for c := range loopTick { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need to check if the ctx is done at the top of each iteration, or at least before the blocking call to process()
|
||
// Keep track of how many notification_enqueued events seen this loop to avoid | ||
// unnecessary database load. | ||
var enqueueEventsThisLoop atomic.Int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider a better name for this; it's unclear what "this loop" means. I think a better name might indicate "since last tick" or something.
coderd/notifications/enqueuer.go
Outdated
// Failure to publish is acceptable, as the fetcher will still process the | ||
// message on its next run. | ||
// TODO(Cian): debounce this to maybe once per second or so? | ||
_ = s.ps.Publish(EventNotificationEnqueued, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This publishes unconditionally; i.e. if an error was returned we still signal the notifier.
Relates to coder/internal#573