@@ -99,27 +99,49 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
99
99
// TODO: idea from Cian: instead of querying the database on a short interval, we could wait for pubsub notifications.
100
100
// if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these?
101
101
// PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get
102
- // triggered by a code path, but rather by a timeout expiring which makes the message retryable)
103
-
104
- // run the ticker with the graceful context, so we stop fetching after stop() is called
105
- tick := n .clock .TickerFunc (n .gracefulCtx , n .cfg .FetchInterval .Value (), func () error {
106
- // Check if notifier is not paused.
107
- ok , err := n .ensureRunning (n .outerCtx )
108
- if err != nil {
109
- n .log .Warn (n .outerCtx , "failed to check notifier state" , slog .Error (err ))
110
- }
111
-
112
- if ok {
113
- err = n .process (n .outerCtx , success , failure )
102
+ // triggered by a code path, but rather by a timeout expiring which makes the message retryable)
103
+
104
+ // loopTick is used to synchronize the goroutine that processes messages with the ticker.
105
+ loopTick := make (chan chan struct {})
106
+ // loopDone is used to signal when the processing loop has exited due to
107
+ // graceful stop or otherwise.
108
+ loopDone := make (chan struct {})
109
+ go func () {
110
+ defer close (loopDone )
111
+ for c := range loopTick {
112
+ n .log .Info (n .outerCtx , "processing messages" )
113
+ // Check if notifier is not paused.
114
+ ok , err := n .ensureRunning (n .outerCtx )
114
115
if err != nil {
115
- n .log .Error (n .outerCtx , "failed to process messages " , slog .Error (err ))
116
+ n .log .Warn (n .outerCtx , "failed to check notifier state " , slog .Error (err ))
116
117
}
118
+
119
+ if ok {
120
+ err = n .process (n .outerCtx , success , failure )
121
+ if err != nil {
122
+ n .log .Error (n .outerCtx , "failed to process messages" , slog .Error (err ))
123
+ }
124
+ }
125
+ // Signal that we've finished processing one iteration.
126
+ close (c )
117
127
}
118
- // we don't return any errors because we don't want to kill the loop because of them.
128
+ }()
129
+
130
+ // run the ticker with the graceful context, so we stop fetching after stop() is called
131
+ tick := n .clock .TickerFunc (n .gracefulCtx , n .cfg .FetchInterval .Value (), func () error {
132
+ c := make (chan struct {})
133
+ loopTick <- c
134
+ // Wait for the processing to finish before continuing. The ticker will
135
+ // compensate for the time it takes to process the messages.
136
+ <- c
119
137
return nil
120
138
}, "notifier" , "fetchInterval" )
121
139
122
- _ = tick .Wait ()
140
+ // Note the order of operations here.
141
+ _ = tick .Wait () // will block until gracefulCtx is done
142
+ close (loopTick ) // happens immediately
143
+ <- loopDone // wait for the current processing loop to finish
144
+
123
145
// only errors we can return are context errors. Only return an error if the outer context
124
146
// was canceled, not if we were gracefully stopped.
125
147
if n .outerCtx .Err () != nil {
0 commit comments