Skip to content

feat(coderd/notifications): notify pubsub on enqueue #17412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

johnstcn
Copy link
Member

@johnstcn johnstcn commented Apr 16, 2025

Relates to coder/internal#573

  • Decouples notifier processing loop from ticker
  • Plumbs through pubsub into enqueuer and dispatcher
  • Adds pubsub NOTIFY for message enqueuing
  • Modifies notifier to LISTEN for message enqueue events

@johnstcn johnstcn force-pushed the cj/notifications-pubsub branch from eeedb44 to 401ec20 Compare April 22, 2025 10:42
@johnstcn johnstcn force-pushed the cj/notifications-pubsub branch from b174ea4 to 325471d Compare April 22, 2025 11:46
@johnstcn johnstcn changed the title refactor(coderd/notifications): decouple notifier processing loop from ticker feat(coderd/notifications): notify pubsub on enqueue Apr 22, 2025

// Periodically trigger the processing loop.
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
defer enqueueEventsThisLoop.Store(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defer enqueueEventsThisLoop.Store(0)
// Reset the enqueue counter after each tick.
defer enqueueEventsThisLoop.Store(0)

loopTick <- c
// Wait for the processing to finish before continuing. The ticker will
// compensate for the time it takes to process the messages.
<-c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should select on this and the context so we can bail early on cancelation

}

if ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: happy path should be unindented.

enqueued := enqueueEventsThisLoop.Add(1)
skipEarlyDispatch := enqueued > 1
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued))
if enqueued > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if enqueued > 1 {
if skipEarlyDispatch {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here explaining why we bail out in this case.

if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) {
enqueued := enqueueEventsThisLoop.Add(1)
skipEarlyDispatch := enqueued > 1
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will get pretty noisy; maybe this was just for your testing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this was mainly for testing. I'll remove it.

loopDone := make(chan struct{})
go func() {
defer close(loopDone)
for c := range loopTick {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might need to check if the ctx is done at the top of each iteration, or at least before the blocking call to process()


// Keep track of how many notification_enqueued events seen this loop to avoid
// unnecessary database load.
var enqueueEventsThisLoop atomic.Int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a better name for this; it's unclear what "this loop" means. I think a better name might indicate "since last tick" or something.

// Failure to publish is acceptable, as the fetcher will still process the
// message on its next run.
// TODO(Cian): debounce this to maybe once per second or so?
_ = s.ps.Publish(EventNotificationEnqueued, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This publishes unconditionally; i.e. if an error was returned we still signal the notifier.

@github-actions github-actions bot added the stale This issue is like stale bread. label May 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale This issue is like stale bread.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants