diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index 8b765bbe88c33..33d7c0b96571d 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -174,9 +174,9 @@ func (m *Manager) loop(ctx context.Context) error { var eg errgroup.Group // Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications. - m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) + m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) eg.Go(func() error { - return m.notifier.run(ctx, m.success, m.failure) + return m.notifier.run(m.success, m.failure) }) // Periodically flush notification state changes to the store. diff --git a/coderd/notifications/metrics_test.go b/coderd/notifications/metrics_test.go index 31cce65026643..6dec66f4bc981 100644 --- a/coderd/notifications/metrics_test.go +++ b/coderd/notifications/metrics_test.go @@ -225,6 +225,7 @@ func TestPendingUpdatesMetric(t *testing.T) { // GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric cfg := defaultNotificationsConfig(method) cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere. + cfg.FetchInterval = serpent.Duration(time.Millisecond * 50) cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) syncer := &syncInterceptor{Store: store} @@ -232,6 +233,8 @@ func TestPendingUpdatesMetric(t *testing.T) { mClock := quartz.NewMock(t) trap := mClock.Trap().NewTicker("Manager", "storeSync") defer trap.Close() + fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval") + defer fetchTrap.Close() mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, logger.Named("manager"), notifications.WithTestClock(mClock)) require.NoError(t, err) @@ -256,24 +259,27 @@ func TestPendingUpdatesMetric(t *testing.T) { mgr.Run(ctx) trap.MustWait(ctx).Release() // ensures ticker has been set + fetchTrap.MustWait(ctx).Release() + + // Advance to the first fetch + mClock.Advance(cfg.FetchInterval.Value()).MustWait(ctx) // THEN: - // Wait until the handler has dispatched the given notifications. - require.Eventually(t, func() bool { + // handler has dispatched the given notifications. + func() { handler.mu.RLock() defer handler.mu.RUnlock() - return len(handler.succeeded) == 1 && len(handler.failed) == 1 - }, testutil.WaitShort, testutil.IntervalFast) + require.Len(t, handler.succeeded, 1) + require.Len(t, handler.failed, 1) + }() // Both handler calls should be pending in the metrics. - require.Eventually(t, func() bool { - return promtest.ToFloat64(metrics.PendingUpdates) == float64(2) - }, testutil.WaitShort, testutil.IntervalFast) + require.EqualValues(t, 2, promtest.ToFloat64(metrics.PendingUpdates)) // THEN: // Trigger syncing updates - mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx) + mClock.Advance(cfg.StoreSyncInterval.Value() - cfg.FetchInterval.Value()).MustWait(ctx) // Wait until we intercept the calls to sync the pending updates to the store. success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess) diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index fea0b0f2f49f4..2280f4680ae89 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -21,7 +21,6 @@ import ( "sort" "strings" "sync" - "sync/atomic" "testing" "time" @@ -278,40 +277,22 @@ func TestBackpressure(t *testing.T) { t.Skip("This test requires postgres; it relies on business-logic only implemented in the database") } - // nolint:gocritic // Unit test. - ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong)) store, _ := dbtestutil.NewDB(t) logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + // nolint:gocritic // Unit test. + ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitShort)) - // Mock server to simulate webhook endpoint. - var received atomic.Int32 - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var payload dispatch.WebhookPayload - err := json.NewDecoder(r.Body).Decode(&payload) - assert.NoError(t, err) - - w.WriteHeader(http.StatusOK) - _, err = w.Write([]byte("noted.")) - assert.NoError(t, err) - - received.Add(1) - })) - defer server.Close() - - endpoint, err := url.Parse(server.URL) - require.NoError(t, err) - - method := database.NotificationMethodWebhook + const method = database.NotificationMethodWebhook cfg := defaultNotificationsConfig(method) - cfg.Webhook = codersdk.NotificationsWebhookConfig{ - Endpoint: *serpent.URLOf(endpoint), - } // Tune the queue to fetch often. const fetchInterval = time.Millisecond * 200 const batchSize = 10 cfg.FetchInterval = serpent.Duration(fetchInterval) cfg.LeaseCount = serpent.Int64(batchSize) + // never time out for this test + cfg.LeasePeriod = serpent.Duration(time.Hour) + cfg.DispatchTimeout = serpent.Duration(time.Hour - time.Millisecond) // Shrink buffers down and increase flush interval to provoke backpressure. // Flush buffers every 5 fetch intervals. @@ -319,45 +300,99 @@ func TestBackpressure(t *testing.T) { cfg.StoreSyncInterval = serpent.Duration(syncInterval) cfg.StoreSyncBufferSize = serpent.Int64(2) - handler := newDispatchInterceptor(dispatch.NewWebhookHandler(cfg.Webhook, logger.Named("webhook"))) + handler := &chanHandler{calls: make(chan dispatchCall)} // Intercept calls to submit the buffered updates to the store. storeInterceptor := &syncInterceptor{Store: store} + mClock := quartz.NewMock(t) + syncTrap := mClock.Trap().NewTicker("Manager", "storeSync") + defer syncTrap.Close() + fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval") + defer fetchTrap.Close() + // GIVEN: a notification manager whose updates will be intercepted - mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(), + logger.Named("manager"), notifications.WithTestClock(mClock)) require.NoError(t, err) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler}) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock) require.NoError(t, err) user := createSampleUser(t, store) // WHEN: a set of notifications are enqueued, which causes backpressure due to the batchSize which can be processed per fetch const totalMessages = 30 - for i := 0; i < totalMessages; i++ { + for i := range totalMessages { _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, "test") require.NoError(t, err) } // Start the notifier. mgr.Run(ctx) + syncTrap.MustWait(ctx).Release() + fetchTrap.MustWait(ctx).Release() // THEN: - // Wait for 3 fetch intervals, then check progress. - time.Sleep(fetchInterval * 3) + // Trigger a fetch + w := mClock.Advance(fetchInterval) + + // one batch of dispatches is sent + for range batchSize { + call := testutil.RequireRecvCtx(ctx, t, handler.calls) + testutil.RequireSendCtx(ctx, t, call.result, dispatchResult{ + retryable: false, + err: nil, + }) + } + + // The first fetch will not complete, because of the short sync buffer of 2. This is the + // backpressure. + select { + case <-time.After(testutil.IntervalMedium): + // success + case <-w.Done(): + t.Fatal("fetch completed despite backpressure") + } - // We expect the notifier will have dispatched ONLY the initial batch of messages. - // In other words, the notifier should have dispatched 3 batches by now, but because the buffered updates have not - // been processed: there is backpressure. - require.EqualValues(t, batchSize, handler.sent.Load()+handler.err.Load()) // We expect that the store will have received NO updates. require.EqualValues(t, 0, storeInterceptor.sent.Load()+storeInterceptor.failed.Load()) // However, when we Stop() the manager the backpressure will be relieved and the buffered updates will ALL be flushed, // since all the goroutines that were blocked (on writing updates to the buffer) will be unblocked and will complete. - require.NoError(t, mgr.Stop(ctx)) + // Stop() waits for the in-progress flush to complete, meaning we have to advance the time such that sync triggers + // a total of (batchSize/StoreSyncBufferSize)-1 times. The -1 is because once we run the penultimate sync, it + // clears space in the buffer for the last dispatches of the batch, which allows graceful shutdown to continue + // immediately, without waiting for the last trigger. + stopErr := make(chan error, 1) + go func() { + stopErr <- mgr.Stop(ctx) + }() + elapsed := fetchInterval + syncEnd := time.Duration(batchSize/cfg.StoreSyncBufferSize.Value()-1) * cfg.StoreSyncInterval.Value() + t.Logf("will advance until %dms have elapsed", syncEnd.Milliseconds()) + for elapsed < syncEnd { + d, wt := mClock.AdvanceNext() + elapsed += d + t.Logf("elapsed: %dms", elapsed.Milliseconds()) + // fetches complete immediately, since TickerFunc only allows one call to the callback in flight at at time. + wt.MustWait(ctx) + if elapsed%cfg.StoreSyncInterval.Value() == 0 { + numSent := cfg.StoreSyncBufferSize.Value() * int64(elapsed/cfg.StoreSyncInterval.Value()) + t.Logf("waiting for %d messages", numSent) + require.Eventually(t, func() bool { + // need greater or equal because the last set of messages can come immediately due + // to graceful shut down + return int64(storeInterceptor.sent.Load()) >= numSent + }, testutil.WaitShort, testutil.IntervalFast) + } + } + t.Logf("done advancing") + // The batch completes + w.MustWait(ctx) + + require.NoError(t, testutil.RequireRecvCtx(ctx, t, stopErr)) require.EqualValues(t, batchSize, storeInterceptor.sent.Load()+storeInterceptor.failed.Load()) } diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index a3ca9fc931aa1..8a8c92b3e81d1 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -30,10 +30,11 @@ type notifier struct { log slog.Logger store Store - tick *quartz.Ticker - stopOnce sync.Once - quit chan any - done chan any + stopOnce sync.Once + outerCtx context.Context + gracefulCtx context.Context + gracefulCancel context.CancelFunc + done chan any handlers map[database.NotificationMethod]Handler metrics *Metrics @@ -43,28 +44,29 @@ type notifier struct { clock quartz.Clock } -func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, +func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, helpers template.FuncMap, metrics *Metrics, clock quartz.Clock, ) *notifier { - tick := clock.NewTicker(cfg.FetchInterval.Value(), "notifier", "fetchInterval") + gracefulCtx, gracefulCancel := context.WithCancel(outerCtx) return ¬ifier{ - id: id, - cfg: cfg, - log: log.Named("notifier").With(slog.F("notifier_id", id)), - quit: make(chan any), - done: make(chan any), - tick: tick, - store: db, - handlers: hr, - helpers: helpers, - metrics: metrics, - clock: clock, + id: id, + cfg: cfg, + log: log.Named("notifier").With(slog.F("notifier_id", id)), + outerCtx: outerCtx, + gracefulCtx: gracefulCtx, + gracefulCancel: gracefulCancel, + done: make(chan any), + store: db, + handlers: hr, + helpers: helpers, + metrics: metrics, + clock: clock, } } // run is the main loop of the notifier. -func (n *notifier) run(ctx context.Context, success chan<- dispatchResult, failure chan<- dispatchResult) error { - n.log.Info(ctx, "started") +func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchResult) error { + n.log.Info(n.outerCtx, "started") defer func() { close(n.done) @@ -75,39 +77,32 @@ func (n *notifier) run(ctx context.Context, success chan<- dispatchResult, failu // if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these? // PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get // triggered by a code path, but rather by a timeout expiring which makes the message retryable) - for { - select { - case <-ctx.Done(): - return xerrors.Errorf("notifier %q context canceled: %w", n.id, ctx.Err()) - case <-n.quit: - return nil - default: - } + // run the ticker with the graceful context, so we stop fetching after stop() is called + tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { // Check if notifier is not paused. - ok, err := n.ensureRunning(ctx) + ok, err := n.ensureRunning(n.outerCtx) if err != nil { - n.log.Warn(ctx, "failed to check notifier state", slog.Error(err)) + n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err)) } if ok { - // Call process() immediately (i.e. don't wait an initial tick). - err = n.process(ctx, success, failure) + err = n.process(n.outerCtx, success, failure) if err != nil { - n.log.Error(ctx, "failed to process messages", slog.Error(err)) + n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err)) } } + // we don't return any errors because we don't want to kill the loop because of them. + return nil + }, "notifier", "fetchInterval") - // Shortcut to bail out quickly if stop() has been called or the context canceled. - select { - case <-ctx.Done(): - return xerrors.Errorf("notifier %q context canceled: %w", n.id, ctx.Err()) - case <-n.quit: - return nil - case <-n.tick.C: - // sleep until next invocation - } + _ = tick.Wait() + // only errors we can return are context errors. Only return an error if the outer context + // was canceled, not if we were gracefully stopped. + if n.outerCtx.Err() != nil { + return xerrors.Errorf("notifier %q context canceled: %w", n.id, n.outerCtx.Err()) } + return nil } // ensureRunning checks if notifier is not paused. @@ -343,9 +338,7 @@ func (n *notifier) newInhibitedDispatch(msg database.AcquireNotificationMessages func (n *notifier) stop() { n.stopOnce.Do(func() { n.log.Info(context.Background(), "graceful stop requested") - - n.tick.Stop() - close(n.quit) + n.gracefulCancel() <-n.done }) } diff --git a/coderd/notifications/utils_test.go b/coderd/notifications/utils_test.go index 124b8554c51fb..9799d52e7bc17 100644 --- a/coderd/notifications/utils_test.go +++ b/coderd/notifications/utils_test.go @@ -92,3 +92,43 @@ func (i *dispatchInterceptor) Dispatcher(payload types.MessagePayload, title, bo return retryable, err }, nil } + +type dispatchCall struct { + payload types.MessagePayload + title, body string + result chan<- dispatchResult +} + +type dispatchResult struct { + retryable bool + err error +} + +type chanHandler struct { + calls chan dispatchCall +} + +func (c chanHandler) Dispatcher(payload types.MessagePayload, title, body string) (dispatch.DeliveryFunc, error) { + result := make(chan dispatchResult) + call := dispatchCall{ + payload: payload, + title: title, + body: body, + result: result, + } + return func(ctx context.Context, _ uuid.UUID) (bool, error) { + select { + case c.calls <- call: + select { + case r := <-result: + return r.retryable, r.err + case <-ctx.Done(): + return false, ctx.Err() + } + case <-ctx.Done(): + return false, ctx.Err() + } + }, nil +} + +var _ notifications.Handler = &chanHandler{} diff --git a/go.mod b/go.mod index 0999f40f8a903..aa458fae8ea46 100644 --- a/go.mod +++ b/go.mod @@ -88,7 +88,7 @@ require ( github.com/cli/safeexec v1.0.1 github.com/coder/flog v1.1.0 github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0 - github.com/coder/quartz v0.1.0 + github.com/coder/quartz v0.1.2 github.com/coder/retry v1.5.1 github.com/coder/terraform-provider-coder v1.0.2 github.com/coder/wgtunnel v0.1.13-0.20240522110300-ade90dfb2da0 diff --git a/go.sum b/go.sum index c301db9d821ff..0a99551661385 100644 --- a/go.sum +++ b/go.sum @@ -222,8 +222,8 @@ github.com/coder/pq v1.10.5-0.20240813183442-0c420cb5a048 h1:3jzYUlGH7ZELIH4XggX github.com/coder/pq v1.10.5-0.20240813183442-0c420cb5a048/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0 h1:3A0ES21Ke+FxEM8CXx9n47SZOKOpgSE1bbJzlE4qPVs= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0/go.mod h1:5UuS2Ts+nTToAMeOjNlnHFkPahrtDkmpydBen/3wgZc= -github.com/coder/quartz v0.1.0 h1:cLL+0g5l7xTf6ordRnUMMiZtRE8Sq5LxpghS63vEXrQ= -github.com/coder/quartz v0.1.0/go.mod h1:vsiCc+AHViMKH2CQpGIpFgdHIEQsxwm8yCscqKmzbRA= +github.com/coder/quartz v0.1.2 h1:PVhc9sJimTdKd3VbygXtS4826EOCpB1fXoRlLnCrE+s= +github.com/coder/quartz v0.1.2/go.mod h1:vsiCc+AHViMKH2CQpGIpFgdHIEQsxwm8yCscqKmzbRA= github.com/coder/retry v1.5.1 h1:iWu8YnD8YqHs3XwqrqsjoBTAVqT9ml6z9ViJ2wlMiqc= github.com/coder/retry v1.5.1/go.mod h1:blHMk9vs6LkoRT9ZHyuZo360cufXEhrxqvEzeMtRGoY= github.com/coder/serpent v0.8.0 h1:6OR+k6fekhSeEDmwwzBgnSjaa7FfGGrMlc3GoAEH9dg=