@@ -123,12 +123,14 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
123
123
if err != nil {
124
124
n .log .Warn (n .outerCtx , "failed to check notifier state" , slog .Error (err ))
125
125
}
126
+ if ! ok { // Notifier is paused, skip processing.
127
+ close (c )
128
+ continue
129
+ }
126
130
127
- if ok {
128
- err = n .process (n .outerCtx , success , failure )
129
- if err != nil {
130
- n .log .Error (n .outerCtx , "failed to process messages" , slog .Error (err ))
131
- }
131
+ err = n .process (n .outerCtx , success , failure )
132
+ if err != nil {
133
+ n .log .Error (n .outerCtx , "failed to process messages" , slog .Error (err ))
132
134
}
133
135
// Signal that we've finished processing one iteration.
134
136
close (c )
@@ -141,6 +143,7 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
141
143
142
144
// Periodically trigger the processing loop.
143
145
tick := n .clock .TickerFunc (n .gracefulCtx , n .cfg .FetchInterval .Value (), func () error {
146
+ // Reset the enqueue counter after each tick.
144
147
defer enqueueEventsThisLoop .Store (0 )
145
148
c := make (chan struct {})
146
149
loopTick <- c
@@ -154,8 +157,9 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
154
157
if stopListen , err := n .ps .Subscribe (EventNotificationEnqueued , func (ctx context.Context , _ []byte ) {
155
158
enqueued := enqueueEventsThisLoop .Add (1 )
156
159
skipEarlyDispatch := enqueued > 1
157
- n .log .Debug (n .outerCtx , "got pubsub event" , slog .F ("count" , enqueued ), slog .F ("skip_early_dispatch" , skipEarlyDispatch ), slog .F ("event" , EventNotificationEnqueued ))
158
- if enqueued > 1 {
160
+ n .log .Debug (n .outerCtx , "TODO REMOVE THIS got pubsub event" , slog .F ("count" , enqueued ), slog .F ("skip_early_dispatch" , skipEarlyDispatch ), slog .F ("event" , EventNotificationEnqueued ))
161
+ if skipEarlyDispatch {
162
+ // Avoid overloading the database. We will get to these in the next tick.
159
163
return
160
164
}
161
165
c := make (chan struct {})
0 commit comments