Skip to content

Commit f5ec419

Browse files
committed
address some but not all PR comments
1 parent 5a1bcb3 commit f5ec419

File tree

2 files changed

+16
-14
lines changed

2 files changed

+16
-14
lines changed

coderd/notifications/enqueuer.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,6 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
8585
// Enqueue queues a notification message for later delivery.
8686
// Messages will be dequeued by a notifier later and dispatched.
8787
func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, data map[string]any, createdBy string, targets ...uuid.UUID) ([]uuid.UUID, error) {
88-
defer func() {
89-
// Publish an event to notify that a notification has been enqueued.
90-
// Failure to publish is acceptable, as the fetcher will still process the
91-
// message on its next run.
92-
// TODO(Cian): debounce this to maybe once per second or so?
93-
_ = s.ps.Publish(EventNotificationEnqueued, nil)
94-
}()
9588
metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{
9689
UserID: userID,
9790
NotificationTemplateID: templateID,
@@ -171,6 +164,11 @@ func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID
171164
}
172165

173166
s.log.Debug(ctx, "enqueued notification", slog.F("msg_ids", uuids))
167+
// Publish an event to notify that a notification has been enqueued.
168+
// Failure to publish is acceptable, as the fetcher will still process the
169+
// message on its next run.
170+
// TODO(Cian): debounce this to maybe once per second or so?
171+
_ = s.ps.Publish(EventNotificationEnqueued, nil)
174172
return uuids, nil
175173
}
176174

coderd/notifications/notifier.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,14 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
123123
if err != nil {
124124
n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err))
125125
}
126+
if !ok { // Notifier is paused, skip processing.
127+
close(c)
128+
continue
129+
}
126130

127-
if ok {
128-
err = n.process(n.outerCtx, success, failure)
129-
if err != nil {
130-
n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err))
131-
}
131+
err = n.process(n.outerCtx, success, failure)
132+
if err != nil {
133+
n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err))
132134
}
133135
// Signal that we've finished processing one iteration.
134136
close(c)
@@ -141,6 +143,7 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
141143

142144
// Periodically trigger the processing loop.
143145
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
146+
// Reset the enqueue counter after each tick.
144147
defer enqueueEventsThisLoop.Store(0)
145148
c := make(chan struct{})
146149
loopTick <- c
@@ -154,8 +157,9 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
154157
if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) {
155158
enqueued := enqueueEventsThisLoop.Add(1)
156159
skipEarlyDispatch := enqueued > 1
157-
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued))
158-
if enqueued > 1 {
160+
n.log.Debug(n.outerCtx, "TODO REMOVE THIS got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued))
161+
if skipEarlyDispatch {
162+
// Avoid overloading the database. We will get to these in the next tick.
159163
return
160164
}
161165
c := make(chan struct{})

0 commit comments

Comments
 (0)