Skip to content

Commit 29099d4

Browse files
authored
chore: refactor notifier to use quartz.TickerFunc (coder#15134)
In investigating coder/internal#109 I noticed many of the notification tests are still using `time.Sleep` and `require.Eventually`. This is an initial effort to start converting these to Quartz. One product change is to switch the `notifier` to use a `TickerFunc` instead of a normal Ticker, since it allows the test to assert that a batch process is complete via the Quartz `Mock` clock. This does introduce one slight behavioral change in that the notifier waits the fetch interval before processing its first batch. In practice, this is inconsequential: no one will notice if we send notifications immediately on startup, or just a little later. But, it does make a difference to some tests, which are fixed up here.
1 parent 8c8bd31 commit 29099d4

File tree

7 files changed

+167
-93
lines changed

7 files changed

+167
-93
lines changed

coderd/notifications/manager.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,9 @@ func (m *Manager) loop(ctx context.Context) error {
174174
var eg errgroup.Group
175175

176176
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
177-
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
177+
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
178178
eg.Go(func() error {
179-
return m.notifier.run(ctx, m.success, m.failure)
179+
return m.notifier.run(m.success, m.failure)
180180
})
181181

182182
// Periodically flush notification state changes to the store.

coderd/notifications/metrics_test.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -225,13 +225,16 @@ func TestPendingUpdatesMetric(t *testing.T) {
225225
// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
226226
cfg := defaultNotificationsConfig(method)
227227
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
228+
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
228229
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)
229230

230231
syncer := &syncInterceptor{Store: store}
231232
interceptor := newUpdateSignallingInterceptor(syncer)
232233
mClock := quartz.NewMock(t)
233234
trap := mClock.Trap().NewTicker("Manager", "storeSync")
234235
defer trap.Close()
236+
fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
237+
defer fetchTrap.Close()
235238
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, logger.Named("manager"),
236239
notifications.WithTestClock(mClock))
237240
require.NoError(t, err)
@@ -256,24 +259,27 @@ func TestPendingUpdatesMetric(t *testing.T) {
256259

257260
mgr.Run(ctx)
258261
trap.MustWait(ctx).Release() // ensures ticker has been set
262+
fetchTrap.MustWait(ctx).Release()
263+
264+
// Advance to the first fetch
265+
mClock.Advance(cfg.FetchInterval.Value()).MustWait(ctx)
259266

260267
// THEN:
261-
// Wait until the handler has dispatched the given notifications.
262-
require.Eventually(t, func() bool {
268+
// handler has dispatched the given notifications.
269+
func() {
263270
handler.mu.RLock()
264271
defer handler.mu.RUnlock()
265272

266-
return len(handler.succeeded) == 1 && len(handler.failed) == 1
267-
}, testutil.WaitShort, testutil.IntervalFast)
273+
require.Len(t, handler.succeeded, 1)
274+
require.Len(t, handler.failed, 1)
275+
}()
268276

269277
// Both handler calls should be pending in the metrics.
270-
require.Eventually(t, func() bool {
271-
return promtest.ToFloat64(metrics.PendingUpdates) == float64(2)
272-
}, testutil.WaitShort, testutil.IntervalFast)
278+
require.EqualValues(t, 2, promtest.ToFloat64(metrics.PendingUpdates))
273279

274280
// THEN:
275281
// Trigger syncing updates
276-
mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx)
282+
mClock.Advance(cfg.StoreSyncInterval.Value() - cfg.FetchInterval.Value()).MustWait(ctx)
277283

278284
// Wait until we intercept the calls to sync the pending updates to the store.
279285
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)

coderd/notifications/notifications_test.go

+71-36
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"sort"
2222
"strings"
2323
"sync"
24-
"sync/atomic"
2524
"testing"
2625
"time"
2726

@@ -278,86 +277,122 @@ func TestBackpressure(t *testing.T) {
278277
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
279278
}
280279

281-
// nolint:gocritic // Unit test.
282-
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
283280
store, _ := dbtestutil.NewDB(t)
284281
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
282+
// nolint:gocritic // Unit test.
283+
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitShort))
285284

286-
// Mock server to simulate webhook endpoint.
287-
var received atomic.Int32
288-
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
289-
var payload dispatch.WebhookPayload
290-
err := json.NewDecoder(r.Body).Decode(&payload)
291-
assert.NoError(t, err)
292-
293-
w.WriteHeader(http.StatusOK)
294-
_, err = w.Write([]byte("noted."))
295-
assert.NoError(t, err)
296-
297-
received.Add(1)
298-
}))
299-
defer server.Close()
300-
301-
endpoint, err := url.Parse(server.URL)
302-
require.NoError(t, err)
303-
304-
method := database.NotificationMethodWebhook
285+
const method = database.NotificationMethodWebhook
305286
cfg := defaultNotificationsConfig(method)
306-
cfg.Webhook = codersdk.NotificationsWebhookConfig{
307-
Endpoint: *serpent.URLOf(endpoint),
308-
}
309287

310288
// Tune the queue to fetch often.
311289
const fetchInterval = time.Millisecond * 200
312290
const batchSize = 10
313291
cfg.FetchInterval = serpent.Duration(fetchInterval)
314292
cfg.LeaseCount = serpent.Int64(batchSize)
293+
// never time out for this test
294+
cfg.LeasePeriod = serpent.Duration(time.Hour)
295+
cfg.DispatchTimeout = serpent.Duration(time.Hour - time.Millisecond)
315296

316297
// Shrink buffers down and increase flush interval to provoke backpressure.
317298
// Flush buffers every 5 fetch intervals.
318299
const syncInterval = time.Second
319300
cfg.StoreSyncInterval = serpent.Duration(syncInterval)
320301
cfg.StoreSyncBufferSize = serpent.Int64(2)
321302

322-
handler := newDispatchInterceptor(dispatch.NewWebhookHandler(cfg.Webhook, logger.Named("webhook")))
303+
handler := &chanHandler{calls: make(chan dispatchCall)}
323304

324305
// Intercept calls to submit the buffered updates to the store.
325306
storeInterceptor := &syncInterceptor{Store: store}
326307

308+
mClock := quartz.NewMock(t)
309+
syncTrap := mClock.Trap().NewTicker("Manager", "storeSync")
310+
defer syncTrap.Close()
311+
fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
312+
defer fetchTrap.Close()
313+
327314
// GIVEN: a notification manager whose updates will be intercepted
328-
mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
315+
mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(),
316+
logger.Named("manager"), notifications.WithTestClock(mClock))
329317
require.NoError(t, err)
330318
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
331-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
319+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock)
332320
require.NoError(t, err)
333321

334322
user := createSampleUser(t, store)
335323

336324
// WHEN: a set of notifications are enqueued, which causes backpressure due to the batchSize which can be processed per fetch
337325
const totalMessages = 30
338-
for i := 0; i < totalMessages; i++ {
326+
for i := range totalMessages {
339327
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, "test")
340328
require.NoError(t, err)
341329
}
342330

343331
// Start the notifier.
344332
mgr.Run(ctx)
333+
syncTrap.MustWait(ctx).Release()
334+
fetchTrap.MustWait(ctx).Release()
345335

346336
// THEN:
347337

348-
// Wait for 3 fetch intervals, then check progress.
349-
time.Sleep(fetchInterval * 3)
338+
// Trigger a fetch
339+
w := mClock.Advance(fetchInterval)
340+
341+
// one batch of dispatches is sent
342+
for range batchSize {
343+
call := testutil.RequireRecvCtx(ctx, t, handler.calls)
344+
testutil.RequireSendCtx(ctx, t, call.result, dispatchResult{
345+
retryable: false,
346+
err: nil,
347+
})
348+
}
349+
350+
// The first fetch will not complete, because of the short sync buffer of 2. This is the
351+
// backpressure.
352+
select {
353+
case <-time.After(testutil.IntervalMedium):
354+
// success
355+
case <-w.Done():
356+
t.Fatal("fetch completed despite backpressure")
357+
}
350358

351-
// We expect the notifier will have dispatched ONLY the initial batch of messages.
352-
// In other words, the notifier should have dispatched 3 batches by now, but because the buffered updates have not
353-
// been processed: there is backpressure.
354-
require.EqualValues(t, batchSize, handler.sent.Load()+handler.err.Load())
355359
// We expect that the store will have received NO updates.
356360
require.EqualValues(t, 0, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())
357361

358362
// However, when we Stop() the manager the backpressure will be relieved and the buffered updates will ALL be flushed,
359363
// since all the goroutines that were blocked (on writing updates to the buffer) will be unblocked and will complete.
360-
require.NoError(t, mgr.Stop(ctx))
364+
// Stop() waits for the in-progress flush to complete, meaning we have to advance the time such that sync triggers
365+
// a total of (batchSize/StoreSyncBufferSize)-1 times. The -1 is because once we run the penultimate sync, it
366+
// clears space in the buffer for the last dispatches of the batch, which allows graceful shutdown to continue
367+
// immediately, without waiting for the last trigger.
368+
stopErr := make(chan error, 1)
369+
go func() {
370+
stopErr <- mgr.Stop(ctx)
371+
}()
372+
elapsed := fetchInterval
373+
syncEnd := time.Duration(batchSize/cfg.StoreSyncBufferSize.Value()-1) * cfg.StoreSyncInterval.Value()
374+
t.Logf("will advance until %dms have elapsed", syncEnd.Milliseconds())
375+
for elapsed < syncEnd {
376+
d, wt := mClock.AdvanceNext()
377+
elapsed += d
378+
t.Logf("elapsed: %dms", elapsed.Milliseconds())
379+
// fetches complete immediately, since TickerFunc only allows one call to the callback in flight at at time.
380+
wt.MustWait(ctx)
381+
if elapsed%cfg.StoreSyncInterval.Value() == 0 {
382+
numSent := cfg.StoreSyncBufferSize.Value() * int64(elapsed/cfg.StoreSyncInterval.Value())
383+
t.Logf("waiting for %d messages", numSent)
384+
require.Eventually(t, func() bool {
385+
// need greater or equal because the last set of messages can come immediately due
386+
// to graceful shut down
387+
return int64(storeInterceptor.sent.Load()) >= numSent
388+
}, testutil.WaitShort, testutil.IntervalFast)
389+
}
390+
}
391+
t.Logf("done advancing")
392+
// The batch completes
393+
w.MustWait(ctx)
394+
395+
require.NoError(t, testutil.RequireRecvCtx(ctx, t, stopErr))
361396
require.EqualValues(t, batchSize, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())
362397
}
363398

coderd/notifications/notifier.go

+37-44
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ type notifier struct {
3030
log slog.Logger
3131
store Store
3232

33-
tick *quartz.Ticker
34-
stopOnce sync.Once
35-
quit chan any
36-
done chan any
33+
stopOnce sync.Once
34+
outerCtx context.Context
35+
gracefulCtx context.Context
36+
gracefulCancel context.CancelFunc
37+
done chan any
3738

3839
handlers map[database.NotificationMethod]Handler
3940
metrics *Metrics
@@ -43,28 +44,29 @@ type notifier struct {
4344
clock quartz.Clock
4445
}
4546

46-
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
47+
func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
4748
hr map[database.NotificationMethod]Handler, helpers template.FuncMap, metrics *Metrics, clock quartz.Clock,
4849
) *notifier {
49-
tick := clock.NewTicker(cfg.FetchInterval.Value(), "notifier", "fetchInterval")
50+
gracefulCtx, gracefulCancel := context.WithCancel(outerCtx)
5051
return &notifier{
51-
id: id,
52-
cfg: cfg,
53-
log: log.Named("notifier").With(slog.F("notifier_id", id)),
54-
quit: make(chan any),
55-
done: make(chan any),
56-
tick: tick,
57-
store: db,
58-
handlers: hr,
59-
helpers: helpers,
60-
metrics: metrics,
61-
clock: clock,
52+
id: id,
53+
cfg: cfg,
54+
log: log.Named("notifier").With(slog.F("notifier_id", id)),
55+
outerCtx: outerCtx,
56+
gracefulCtx: gracefulCtx,
57+
gracefulCancel: gracefulCancel,
58+
done: make(chan any),
59+
store: db,
60+
handlers: hr,
61+
helpers: helpers,
62+
metrics: metrics,
63+
clock: clock,
6264
}
6365
}
6466

6567
// run is the main loop of the notifier.
66-
func (n *notifier) run(ctx context.Context, success chan<- dispatchResult, failure chan<- dispatchResult) error {
67-
n.log.Info(ctx, "started")
68+
func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchResult) error {
69+
n.log.Info(n.outerCtx, "started")
6870

6971
defer func() {
7072
close(n.done)
@@ -75,39 +77,32 @@ func (n *notifier) run(ctx context.Context, success chan<- dispatchResult, failu
7577
// if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these?
7678
// PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get
7779
// triggered by a code path, but rather by a timeout expiring which makes the message retryable)
78-
for {
79-
select {
80-
case <-ctx.Done():
81-
return xerrors.Errorf("notifier %q context canceled: %w", n.id, ctx.Err())
82-
case <-n.quit:
83-
return nil
84-
default:
85-
}
8680

81+
// run the ticker with the graceful context, so we stop fetching after stop() is called
82+
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
8783
// Check if notifier is not paused.
88-
ok, err := n.ensureRunning(ctx)
84+
ok, err := n.ensureRunning(n.outerCtx)
8985
if err != nil {
90-
n.log.Warn(ctx, "failed to check notifier state", slog.Error(err))
86+
n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err))
9187
}
9288

9389
if ok {
94-
// Call process() immediately (i.e. don't wait an initial tick).
95-
err = n.process(ctx, success, failure)
90+
err = n.process(n.outerCtx, success, failure)
9691
if err != nil {
97-
n.log.Error(ctx, "failed to process messages", slog.Error(err))
92+
n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err))
9893
}
9994
}
95+
// we don't return any errors because we don't want to kill the loop because of them.
96+
return nil
97+
}, "notifier", "fetchInterval")
10098

101-
// Shortcut to bail out quickly if stop() has been called or the context canceled.
102-
select {
103-
case <-ctx.Done():
104-
return xerrors.Errorf("notifier %q context canceled: %w", n.id, ctx.Err())
105-
case <-n.quit:
106-
return nil
107-
case <-n.tick.C:
108-
// sleep until next invocation
109-
}
99+
_ = tick.Wait()
100+
// only errors we can return are context errors. Only return an error if the outer context
101+
// was canceled, not if we were gracefully stopped.
102+
if n.outerCtx.Err() != nil {
103+
return xerrors.Errorf("notifier %q context canceled: %w", n.id, n.outerCtx.Err())
110104
}
105+
return nil
111106
}
112107

113108
// ensureRunning checks if notifier is not paused.
@@ -343,9 +338,7 @@ func (n *notifier) newInhibitedDispatch(msg database.AcquireNotificationMessages
343338
func (n *notifier) stop() {
344339
n.stopOnce.Do(func() {
345340
n.log.Info(context.Background(), "graceful stop requested")
346-
347-
n.tick.Stop()
348-
close(n.quit)
341+
n.gracefulCancel()
349342
<-n.done
350343
})
351344
}

0 commit comments

Comments
 (0)