From 3ae3fbb0315b4b23d23b3b83f83aba354ef60d3b Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Tue, 3 Sep 2024 12:47:02 +0400 Subject: [PATCH] fix: fix TestPendingUpdatesMetric flaky assertion --- coderd/notifications/manager.go | 29 ++++++++++++++--- coderd/notifications/metrics_test.go | 47 +++++++++++----------------- coderd/notifications/notifier.go | 25 +++++++++------ 3 files changed, 58 insertions(+), 43 deletions(-) diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index 7ce26ffbd40c2..6d8d200939880 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -11,6 +11,7 @@ import ( "golang.org/x/xerrors" "cdr.dev/slog" + "github.com/coder/quartz" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/notifications/dispatch" @@ -54,13 +55,25 @@ type Manager struct { stopOnce sync.Once stop chan any done chan any + + // clock is for testing only + clock quartz.Clock +} + +type ManagerOption func(*Manager) + +// WithTestClock is used in testing to set the quartz clock on the manager +func WithTestClock(clock quartz.Clock) ManagerOption { + return func(m *Manager) { + m.clock = clock + } } // NewManager instantiates a new Manager instance which coordinates notification enqueuing and delivery. // // helpers is a map of template helpers which are used to customize notification messages to use global settings like // access URL etc. -func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger) (*Manager, error) { +func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger, opts ...ManagerOption) (*Manager, error) { // TODO(dannyk): add the ability to use multiple notification methods. var method database.NotificationMethod if err := method.Scan(cfg.Method.String()); err != nil { @@ -74,7 +87,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template. return nil, ErrInvalidDispatchTimeout } - return &Manager{ + m := &Manager{ log: log, cfg: cfg, store: store, @@ -95,7 +108,13 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template. done: make(chan any), handlers: defaultHandlers(cfg, helpers, log), - }, nil + + clock: quartz.NewReal(), + } + for _, o := range opts { + o(m) + } + return m, nil } // defaultHandlers builds a set of known handlers; panics if any error occurs as these handlers should be valid at compile time. @@ -150,7 +169,7 @@ func (m *Manager) loop(ctx context.Context) error { var eg errgroup.Group // Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications. - m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics) + m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics, m.clock) eg.Go(func() error { return m.notifier.run(ctx, m.success, m.failure) }) @@ -158,7 +177,7 @@ func (m *Manager) loop(ctx context.Context) error { // Periodically flush notification state changes to the store. eg.Go(func() error { // Every interval, collect the messages in the channels and bulk update them in the store. - tick := time.NewTicker(m.cfg.StoreSyncInterval.Value()) + tick := m.clock.NewTicker(m.cfg.StoreSyncInterval.Value(), "Manager", "storeSync") defer tick.Stop() for { select { diff --git a/coderd/notifications/metrics_test.go b/coderd/notifications/metrics_test.go index 9989a8a7fda4d..49367cbe79777 100644 --- a/coderd/notifications/metrics_test.go +++ b/coderd/notifications/metrics_test.go @@ -221,13 +221,16 @@ func TestPendingUpdatesMetric(t *testing.T) { // GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric cfg := defaultNotificationsConfig(method) - cfg.FetchInterval = serpent.Duration(time.Millisecond * 50) cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere. cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) syncer := &syncInterceptor{Store: api.Database} interceptor := newUpdateSignallingInterceptor(syncer) - mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager")) + mClock := quartz.NewMock(t) + trap := mClock.Trap().NewTicker("Manager", "storeSync") + defer trap.Close() + mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"), + notifications.WithTestClock(mClock)) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) @@ -249,6 +252,7 @@ func TestPendingUpdatesMetric(t *testing.T) { require.NoError(t, err) mgr.Run(ctx) + trap.MustWait(ctx).Release() // ensures ticker has been set // THEN: // Wait until the handler has dispatched the given notifications. @@ -259,17 +263,20 @@ func TestPendingUpdatesMetric(t *testing.T) { return len(handler.succeeded) == 1 && len(handler.failed) == 1 }, testutil.WaitShort, testutil.IntervalFast) - // Wait until we intercept the calls to sync the pending updates to the store. - success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess) - failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure) - - // Wait for the metric to be updated with the expected count of metrics. + // Both handler calls should be pending in the metrics. require.Eventually(t, func() bool { - return promtest.ToFloat64(metrics.PendingUpdates) == float64(success+failure) + return promtest.ToFloat64(metrics.PendingUpdates) == float64(2) }, testutil.WaitShort, testutil.IntervalFast) - // Unpause the interceptor so the updates can proceed. - interceptor.unpause() + // THEN: + // Trigger syncing updates + mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx) + + // Wait until we intercept the calls to sync the pending updates to the store. + success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess) + require.EqualValues(t, 1, success) + failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure) + require.EqualValues(t, 1, failure) // Validate that the store synced the expected number of updates. require.Eventually(t, func() bool { @@ -464,43 +471,25 @@ func fingerprintLabels(lbs ...string) model.Fingerprint { // signaled by the caller so it can continue. type updateSignallingInterceptor struct { notifications.Store - - pause chan any - updateSuccess chan int updateFailure chan int } func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSignallingInterceptor { return &updateSignallingInterceptor{ - Store: interceptor, - - pause: make(chan any, 1), - + Store: interceptor, updateSuccess: make(chan int, 1), updateFailure: make(chan int, 1), } } -func (u *updateSignallingInterceptor) unpause() { - close(u.pause) -} - func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { u.updateSuccess <- len(arg.IDs) - - // Wait until signaled so we have a chance to read the number of pending updates. - <-u.pause - return u.Store.BulkMarkNotificationMessagesSent(ctx, arg) } func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { u.updateFailure <- len(arg.IDs) - - // Wait until signaled so we have a chance to read the number of pending updates. - <-u.pause - return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg) } diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index ac7ed8b2d5f4a..0bfaa04324327 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "sync" - "time" "github.com/google/uuid" "golang.org/x/sync/errgroup" @@ -15,6 +14,7 @@ import ( "github.com/coder/coder/v2/coderd/notifications/render" "github.com/coder/coder/v2/coderd/notifications/types" "github.com/coder/coder/v2/codersdk" + "github.com/coder/quartz" "cdr.dev/slog" @@ -29,26 +29,33 @@ type notifier struct { log slog.Logger store Store - tick *time.Ticker + tick *quartz.Ticker stopOnce sync.Once quit chan any done chan any handlers map[database.NotificationMethod]Handler metrics *Metrics + + // clock is for testing + clock quartz.Clock } -func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, metrics *Metrics) *notifier { +func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, + hr map[database.NotificationMethod]Handler, metrics *Metrics, clock quartz.Clock, +) *notifier { + tick := clock.NewTicker(cfg.FetchInterval.Value(), "notifier", "fetchInterval") return ¬ifier{ id: id, cfg: cfg, log: log.Named("notifier").With(slog.F("notifier_id", id)), quit: make(chan any), done: make(chan any), - tick: time.NewTicker(cfg.FetchInterval.Value()), + tick: tick, store: db, handlers: hr, metrics: metrics, + clock: clock, } } @@ -245,10 +252,10 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Inc() n.metrics.QueuedSeconds.WithLabelValues(string(msg.Method)).Observe(msg.QueuedSeconds) - start := time.Now() + start := n.clock.Now() retryable, err := deliver(ctx, msg.ID) - n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(time.Since(start).Seconds()) + n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(n.clock.Since(start).Seconds()) n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Dec() if err != nil { @@ -291,7 +298,7 @@ func (n *notifier) newSuccessfulDispatch(msg database.AcquireNotificationMessage return dispatchResult{ notifier: n.id, msg: msg.ID, - ts: dbtime.Now(), + ts: dbtime.Time(n.clock.Now().UTC()), } } @@ -311,7 +318,7 @@ func (n *notifier) newFailedDispatch(msg database.AcquireNotificationMessagesRow return dispatchResult{ notifier: n.id, msg: msg.ID, - ts: dbtime.Now(), + ts: dbtime.Time(n.clock.Now().UTC()), err: err, retryable: retryable, } @@ -321,7 +328,7 @@ func (n *notifier) newInhibitedDispatch(msg database.AcquireNotificationMessages return dispatchResult{ notifier: n.id, msg: msg.ID, - ts: dbtime.Now(), + ts: dbtime.Time(n.clock.Now().UTC()), retryable: false, inhibited: true, }