Skip to content

Commit 08b8f53

Browse files
committed
refactor(coderd/notifications): decouple notifier processing loop from ticker
1 parent cbc699b commit 08b8f53

File tree

1 file changed

+37
-15
lines changed

1 file changed

+37
-15
lines changed

coderd/notifications/notifier.go

+37-15
Original file line numberDiff line numberDiff line change
@@ -99,27 +99,49 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
9999
// TODO: idea from Cian: instead of querying the database on a short interval, we could wait for pubsub notifications.
100100
// if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these?
101101
// PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get
102-
// triggered by a code path, but rather by a timeout expiring which makes the message retryable)
103-
104-
// run the ticker with the graceful context, so we stop fetching after stop() is called
105-
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
106-
// Check if notifier is not paused.
107-
ok, err := n.ensureRunning(n.outerCtx)
108-
if err != nil {
109-
n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err))
110-
}
111-
112-
if ok {
113-
err = n.process(n.outerCtx, success, failure)
102+
// triggered by a code path, but rather by a timeout expiring which makes the message retryable)
103+
104+
// loopTick is used to synchronize the goroutine that processes messages with the ticker.
105+
loopTick := make(chan chan struct{})
106+
// loopDone is used to signal when the processing loop has exited due to
107+
// graceful stop or otherwise.
108+
loopDone := make(chan struct{})
109+
go func() {
110+
defer close(loopDone)
111+
for c := range loopTick {
112+
n.log.Info(n.outerCtx, "processing messages")
113+
// Check if notifier is not paused.
114+
ok, err := n.ensureRunning(n.outerCtx)
114115
if err != nil {
115-
n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err))
116+
n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err))
116117
}
118+
119+
if ok {
120+
err = n.process(n.outerCtx, success, failure)
121+
if err != nil {
122+
n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err))
123+
}
124+
}
125+
// Signal that we've finished processing one iteration.
126+
close(c)
117127
}
118-
// we don't return any errors because we don't want to kill the loop because of them.
128+
}()
129+
130+
// run the ticker with the graceful context, so we stop fetching after stop() is called
131+
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
132+
c := make(chan struct{})
133+
loopTick <- c
134+
// Wait for the processing to finish before continuing. The ticker will
135+
// compensate for the time it takes to process the messages.
136+
<-c
119137
return nil
120138
}, "notifier", "fetchInterval")
121139

122-
_ = tick.Wait()
140+
// Note the order of operations here.
141+
_ = tick.Wait() // will block until gracefulCtx is done
142+
close(loopTick) // happens immediately
143+
<-loopDone // wait for the current processing loop to finish
144+
123145
// only errors we can return are context errors. Only return an error if the outer context
124146
// was canceled, not if we were gracefully stopped.
125147
if n.outerCtx.Err() != nil {

0 commit comments

Comments
 (0)