Skip to content

Commit 5a1bcb3

Browse files
committed
avoid spamming notification dispatch
1 parent b82c7de commit 5a1bcb3

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

coderd/notifications/notifications_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -2149,6 +2149,8 @@ func TestNotificationEnqueuePubsubNotify(t *testing.T) {
21492149
}
21502150
}()
21512151
_ = testutil.TryReceive(ctx, t, recvDone)
2152+
// TODO: this sometimes fails with
2153+
// t.go:106: 2025-04-22 16:55:04.153 [warn] manager: content canceled with pending updates in buffer, these messages will be sent again after lease expires success_count=6 failure_count=0
21522154
}
21532155

21542156
type fakeHandler struct {

coderd/notifications/notifier.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"sync"
8+
"sync/atomic"
89
"text/template"
910

1011
"github.com/google/uuid"
@@ -134,8 +135,13 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
134135
}
135136
}()
136137

138+
// Keep track of how many notification_enqueued events seen this loop to avoid
139+
// unnecessary database load.
140+
var enqueueEventsThisLoop atomic.Int64
141+
137142
// Periodically trigger the processing loop.
138143
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
144+
defer enqueueEventsThisLoop.Store(0)
139145
c := make(chan struct{})
140146
loopTick <- c
141147
// Wait for the processing to finish before continuing. The ticker will
@@ -146,7 +152,12 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
146152

147153
// Also signal the processing loop when a notification is enqueued.
148154
if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) {
149-
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("event", EventNotificationEnqueued))
155+
enqueued := enqueueEventsThisLoop.Add(1)
156+
skipEarlyDispatch := enqueued > 1
157+
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued))
158+
if enqueued > 1 {
159+
return
160+
}
150161
c := make(chan struct{})
151162
select {
152163
case <-n.gracefulCtx.Done():

0 commit comments

Comments
 (0)