Skip to content

Commit 0747bb8

Browse files
committed
fix: fix TestPendingUpdatesMetric flaky assertion
1 parent 4c8a560 commit 0747bb8

File tree

3 files changed

+58
-43
lines changed

3 files changed

+58
-43
lines changed

coderd/notifications/manager.go

+24-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"golang.org/x/xerrors"
1212

1313
"cdr.dev/slog"
14+
"github.com/coder/quartz"
1415

1516
"github.com/coder/coder/v2/coderd/database"
1617
"github.com/coder/coder/v2/coderd/notifications/dispatch"
@@ -54,13 +55,25 @@ type Manager struct {
5455
stopOnce sync.Once
5556
stop chan any
5657
done chan any
58+
59+
// clock is for testing only
60+
clock quartz.Clock
61+
}
62+
63+
type ManagerOption func(*Manager)
64+
65+
// WithTestClock is used in testing to set the quartz clock on the manager
66+
func WithTestClock(clock quartz.Clock) ManagerOption {
67+
return func(m *Manager) {
68+
m.clock = clock
69+
}
5770
}
5871

5972
// NewManager instantiates a new Manager instance which coordinates notification enqueuing and delivery.
6073
//
6174
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
6275
// access URL etc.
63-
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger) (*Manager, error) {
76+
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger, opts ...ManagerOption) (*Manager, error) {
6477
// TODO(dannyk): add the ability to use multiple notification methods.
6578
var method database.NotificationMethod
6679
if err := method.Scan(cfg.Method.String()); err != nil {
@@ -74,7 +87,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
7487
return nil, ErrInvalidDispatchTimeout
7588
}
7689

77-
return &Manager{
90+
m := &Manager{
7891
log: log,
7992
cfg: cfg,
8093
store: store,
@@ -95,7 +108,13 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
95108
done: make(chan any),
96109

97110
handlers: defaultHandlers(cfg, helpers, log),
98-
}, nil
111+
112+
clock: quartz.NewReal(),
113+
}
114+
for _, o := range opts {
115+
o(m)
116+
}
117+
return m, nil
99118
}
100119

101120
// defaultHandlers builds a set of known handlers; panics if any error occurs as these handlers should be valid at compile time.
@@ -150,15 +169,15 @@ func (m *Manager) loop(ctx context.Context) error {
150169
var eg errgroup.Group
151170

152171
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
153-
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics)
172+
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics, m.clock)
154173
eg.Go(func() error {
155174
return m.notifier.run(ctx, m.success, m.failure)
156175
})
157176

158177
// Periodically flush notification state changes to the store.
159178
eg.Go(func() error {
160179
// Every interval, collect the messages in the channels and bulk update them in the store.
161-
tick := time.NewTicker(m.cfg.StoreSyncInterval.Value())
180+
tick := m.clock.NewTicker(m.cfg.StoreSyncInterval.Value(), "Manager", "storeSync")
162181
defer tick.Stop()
163182
for {
164183
select {

coderd/notifications/metrics_test.go

+18-29
Original file line numberDiff line numberDiff line change
@@ -221,13 +221,16 @@ func TestPendingUpdatesMetric(t *testing.T) {
221221

222222
// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
223223
cfg := defaultNotificationsConfig(method)
224-
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
225224
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
226225
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)
227226

228227
syncer := &syncInterceptor{Store: api.Database}
229228
interceptor := newUpdateSignallingInterceptor(syncer)
230-
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"))
229+
mClock := quartz.NewMock(t)
230+
trap := mClock.Trap().NewTicker("Manager", "storeSync")
231+
defer trap.Close()
232+
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"),
233+
notifications.WithTestClock(mClock))
231234
require.NoError(t, err)
232235
t.Cleanup(func() {
233236
assert.NoError(t, mgr.Stop(ctx))
@@ -249,6 +252,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
249252
require.NoError(t, err)
250253

251254
mgr.Run(ctx)
255+
trap.MustWait(ctx).Release() // ensures ticker has been set
252256

253257
// THEN:
254258
// Wait until the handler has dispatched the given notifications.
@@ -259,17 +263,20 @@ func TestPendingUpdatesMetric(t *testing.T) {
259263
return len(handler.succeeded) == 1 && len(handler.failed) == 1
260264
}, testutil.WaitShort, testutil.IntervalFast)
261265

262-
// Wait until we intercept the calls to sync the pending updates to the store.
263-
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
264-
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)
265-
266-
// Wait for the metric to be updated with the expected count of metrics.
266+
// Both handler calls should be pending in the metrics.
267267
require.Eventually(t, func() bool {
268-
return promtest.ToFloat64(metrics.PendingUpdates) == float64(success+failure)
268+
return promtest.ToFloat64(metrics.PendingUpdates) == float64(2)
269269
}, testutil.WaitShort, testutil.IntervalFast)
270270

271-
// Unpause the interceptor so the updates can proceed.
272-
interceptor.unpause()
271+
// THEN:
272+
// Trigger syncing updates
273+
mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx)
274+
275+
// Wait until we intercept the calls to sync the pending updates to the store.
276+
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
277+
require.EqualValues(t, 1, success)
278+
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)
279+
require.EqualValues(t, 1, failure)
273280

274281
// Validate that the store synced the expected number of updates.
275282
require.Eventually(t, func() bool {
@@ -464,43 +471,25 @@ func fingerprintLabels(lbs ...string) model.Fingerprint {
464471
// signaled by the caller so it can continue.
465472
type updateSignallingInterceptor struct {
466473
notifications.Store
467-
468-
pause chan any
469-
470474
updateSuccess chan int
471475
updateFailure chan int
472476
}
473477

474478
func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSignallingInterceptor {
475479
return &updateSignallingInterceptor{
476-
Store: interceptor,
477-
478-
pause: make(chan any, 1),
479-
480+
Store: interceptor,
480481
updateSuccess: make(chan int, 1),
481482
updateFailure: make(chan int, 1),
482483
}
483484
}
484485

485-
func (u *updateSignallingInterceptor) unpause() {
486-
close(u.pause)
487-
}
488-
489486
func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
490487
u.updateSuccess <- len(arg.IDs)
491-
492-
// Wait until signaled so we have a chance to read the number of pending updates.
493-
<-u.pause
494-
495488
return u.Store.BulkMarkNotificationMessagesSent(ctx, arg)
496489
}
497490

498491
func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
499492
u.updateFailure <- len(arg.IDs)
500-
501-
// Wait until signaled so we have a chance to read the number of pending updates.
502-
<-u.pause
503-
504493
return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
505494
}
506495

coderd/notifications/notifier.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"sync"
7-
"time"
87

98
"github.com/google/uuid"
109
"golang.org/x/sync/errgroup"
@@ -15,6 +14,7 @@ import (
1514
"github.com/coder/coder/v2/coderd/notifications/render"
1615
"github.com/coder/coder/v2/coderd/notifications/types"
1716
"github.com/coder/coder/v2/codersdk"
17+
"github.com/coder/quartz"
1818

1919
"cdr.dev/slog"
2020

@@ -29,26 +29,33 @@ type notifier struct {
2929
log slog.Logger
3030
store Store
3131

32-
tick *time.Ticker
32+
tick *quartz.Ticker
3333
stopOnce sync.Once
3434
quit chan any
3535
done chan any
3636

3737
handlers map[database.NotificationMethod]Handler
3838
metrics *Metrics
39+
40+
// clock is for testing
41+
clock quartz.Clock
3942
}
4043

41-
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, metrics *Metrics) *notifier {
44+
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
45+
hr map[database.NotificationMethod]Handler, metrics *Metrics, clock quartz.Clock,
46+
) *notifier {
47+
tick := clock.NewTicker(cfg.FetchInterval.Value(), "notifier", "fetchInterval")
4248
return &notifier{
4349
id: id,
4450
cfg: cfg,
4551
log: log.Named("notifier").With(slog.F("notifier_id", id)),
4652
quit: make(chan any),
4753
done: make(chan any),
48-
tick: time.NewTicker(cfg.FetchInterval.Value()),
54+
tick: tick,
4955
store: db,
5056
handlers: hr,
5157
metrics: metrics,
58+
clock: clock,
5259
}
5360
}
5461

@@ -245,10 +252,10 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
245252
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Inc()
246253
n.metrics.QueuedSeconds.WithLabelValues(string(msg.Method)).Observe(msg.QueuedSeconds)
247254

248-
start := time.Now()
255+
start := n.clock.Now()
249256
retryable, err := deliver(ctx, msg.ID)
250257

251-
n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(time.Since(start).Seconds())
258+
n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(n.clock.Since(start).Seconds())
252259
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Dec()
253260

254261
if err != nil {
@@ -291,7 +298,7 @@ func (n *notifier) newSuccessfulDispatch(msg database.AcquireNotificationMessage
291298
return dispatchResult{
292299
notifier: n.id,
293300
msg: msg.ID,
294-
ts: dbtime.Now(),
301+
ts: dbtime.Time(n.clock.Now()),
295302
}
296303
}
297304

@@ -311,7 +318,7 @@ func (n *notifier) newFailedDispatch(msg database.AcquireNotificationMessagesRow
311318
return dispatchResult{
312319
notifier: n.id,
313320
msg: msg.ID,
314-
ts: dbtime.Now(),
321+
ts: dbtime.Time(n.clock.Now()),
315322
err: err,
316323
retryable: retryable,
317324
}
@@ -321,7 +328,7 @@ func (n *notifier) newInhibitedDispatch(msg database.AcquireNotificationMessages
321328
return dispatchResult{
322329
notifier: n.id,
323330
msg: msg.ID,
324-
ts: dbtime.Now(),
331+
ts: dbtime.Time(n.clock.Now()),
325332
retryable: false,
326333
inhibited: true,
327334
}

0 commit comments

Comments
 (0)