From d8a971491d647fef12d4ae42dc024af3a2d58a83 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 16 Apr 2025 12:06:42 +0100 Subject: [PATCH 1/7] refactor(coderd/notifications): decouple notifier processing loop from ticker --- coderd/notifications/notifier.go | 52 +++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index b2713533cecb3..1e0c3f4b271f8 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -99,27 +99,49 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes // TODO: idea from Cian: instead of querying the database on a short interval, we could wait for pubsub notifications. // if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these? // PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get - // triggered by a code path, but rather by a timeout expiring which makes the message retryable) - - // run the ticker with the graceful context, so we stop fetching after stop() is called - tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { - // Check if notifier is not paused. - ok, err := n.ensureRunning(n.outerCtx) - if err != nil { - n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err)) - } - - if ok { - err = n.process(n.outerCtx, success, failure) + // triggered by a code path, but rather by a timeout expiring which makes the message retryable) + + // loopTick is used to synchronize the goroutine that processes messages with the ticker. + loopTick := make(chan chan struct{}) + // loopDone is used to signal when the processing loop has exited due to + // graceful stop or otherwise. + loopDone := make(chan struct{}) + go func() { + defer close(loopDone) + for c := range loopTick { + n.log.Info(n.outerCtx, "processing messages") + // Check if notifier is not paused. + ok, err := n.ensureRunning(n.outerCtx) if err != nil { - n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err)) + n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err)) } + + if ok { + err = n.process(n.outerCtx, success, failure) + if err != nil { + n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err)) + } + } + // Signal that we've finished processing one iteration. + close(c) } - // we don't return any errors because we don't want to kill the loop because of them. + }() + + // run the ticker with the graceful context, so we stop fetching after stop() is called + tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { + c := make(chan struct{}) + loopTick <- c + // Wait for the processing to finish before continuing. The ticker will + // compensate for the time it takes to process the messages. + <-c return nil }, "notifier", "fetchInterval") - _ = tick.Wait() + // Note the order of operations here. + _ = tick.Wait() // will block until gracefulCtx is done + close(loopTick) // happens immediately + <-loopDone // wait for the current processing loop to finish + // only errors we can return are context errors. Only return an error if the outer context // was canceled, not if we were gracefully stopped. if n.outerCtx.Err() != nil { From c9c634ac2c47c68254e2a18877b96c57183da8d3 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 16 Apr 2025 14:03:01 +0100 Subject: [PATCH 2/7] chore(coderd/notifications): plumb through pubsub into enqueuer and dispatcher --- cli/server.go | 62 +++++++++++----------- coderd/notifications/enqueuer.go | 5 +- coderd/notifications/manager.go | 4 +- coderd/notifications/manager_test.go | 6 +-- coderd/notifications/metrics_test.go | 8 +-- coderd/notifications/notifications_test.go | 48 +++++++++-------- coderd/notifications/notifier.go | 16 +++++- 7 files changed, 84 insertions(+), 65 deletions(-) diff --git a/cli/server.go b/cli/server.go index c5532e07e7a81..bfd69643c9889 100644 --- a/cli/server.go +++ b/cli/server.go @@ -928,37 +928,6 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. options.StatsBatcher = batcher defer closeBatcher() - // Manage notifications. - var ( - notificationsCfg = options.DeploymentValues.Notifications - notificationsManager *notifications.Manager - ) - - metrics := notifications.NewMetrics(options.PrometheusRegistry) - helpers := templateHelpers(options) - - // The enqueuer is responsible for enqueueing notifications to the given store. - enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal()) - if err != nil { - return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err) - } - options.NotificationsEnqueuer = enqueuer - - // The notification manager is responsible for: - // - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications) - // - keeping the store updated with status updates - notificationsManager, err = notifications.NewManager(notificationsCfg, options.Database, options.Pubsub, helpers, metrics, logger.Named("notifications.manager")) - if err != nil { - return xerrors.Errorf("failed to instantiate notification manager: %w", err) - } - - // nolint:gocritic // We need to run the manager in a notifier context. - notificationsManager.Run(dbauthz.AsNotifier(ctx)) - - // Run report generator to distribute periodic reports. - notificationReportGenerator := reports.NewReportGenerator(ctx, logger.Named("notifications.report_generator"), options.Database, options.NotificationsEnqueuer, quartz.NewReal()) - defer notificationReportGenerator.Close() - // We use a separate coderAPICloser so the Enterprise API // can have its own close functions. This is cleaner // than abstracting the Coder API itself. @@ -1006,6 +975,37 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. return xerrors.Errorf("write config url: %w", err) } + // Manage notifications. + var ( + notificationsCfg = options.DeploymentValues.Notifications + notificationsManager *notifications.Manager + ) + + metrics := notifications.NewMetrics(options.PrometheusRegistry) + helpers := templateHelpers(options) + + // The enqueuer is responsible for enqueueing notifications to the given store. + enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, options.Pubsub, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal()) + if err != nil { + return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err) + } + options.NotificationsEnqueuer = enqueuer + + // The notification manager is responsible for: + // - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications) + // - keeping the store updated with status updates + notificationsManager, err = notifications.NewManager(notificationsCfg, options.Database, options.Pubsub, helpers, metrics, logger.Named("notifications.manager")) + if err != nil { + return xerrors.Errorf("failed to instantiate notification manager: %w", err) + } + + // nolint:gocritic // We need to run the manager in a notifier context. + notificationsManager.Run(dbauthz.AsNotifier(ctx)) + + // Run report generator to distribute periodic reports. + notificationReportGenerator := reports.NewReportGenerator(ctx, logger.Named("notifications.report_generator"), options.Database, options.NotificationsEnqueuer, quartz.NewReal()) + defer notificationReportGenerator.Close() + // Since errCh only has one buffered slot, all routines // sending on it must be wrapped in a select/default to // avoid leaving dangling goroutines waiting for the diff --git a/coderd/notifications/enqueuer.go b/coderd/notifications/enqueuer.go index ff3af3fc5eaa1..58bbd66b67070 100644 --- a/coderd/notifications/enqueuer.go +++ b/coderd/notifications/enqueuer.go @@ -16,6 +16,7 @@ import ( "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/notifications/render" "github.com/coder/coder/v2/coderd/notifications/types" "github.com/coder/coder/v2/codersdk" @@ -36,6 +37,7 @@ func (e InvalidDefaultNotificationMethodError) Error() string { type StoreEnqueuer struct { store Store + ps pubsub.Pubsub log slog.Logger defaultMethod database.NotificationMethod @@ -50,7 +52,7 @@ type StoreEnqueuer struct { } // NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store. -func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) { +func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) { var method database.NotificationMethod // TODO(DanielleMaywood): // Currently we do not want to allow setting `inbox` as the default notification method. @@ -63,6 +65,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem return &StoreEnqueuer{ store: store, + ps: ps, log: log, defaultMethod: method, defaultEnabled: cfg.Enabled(), diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index 1a2c418a014bb..2b0b25c3f59f6 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -42,6 +42,7 @@ type Manager struct { cfg codersdk.NotificationsConfig store Store + ps pubsub.Pubsub log slog.Logger handlers map[database.NotificationMethod]Handler @@ -168,7 +169,8 @@ func (m *Manager) loop(ctx context.Context) error { var eg errgroup.Group - m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) + // Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications. + m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.ps, m.handlers, m.helpers, m.metrics, m.clock) eg.Go(func() error { // run the notifier which will handle dequeueing and dispatching notifications. return m.notifier.run(m.success, m.failure) diff --git a/coderd/notifications/manager_test.go b/coderd/notifications/manager_test.go index e9c309f0a09d3..e184fd0fb68f5 100644 --- a/coderd/notifications/manager_test.go +++ b/coderd/notifications/manager_test.go @@ -53,7 +53,7 @@ func TestBufferedUpdates(t *testing.T) { } mgr.WithHandlers(handlers) - enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, ps, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal()) require.NoError(t, err) user := dbgen.User(t, store, database.User{}) @@ -110,7 +110,7 @@ func TestBuildPayload(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, ps := dbtestutil.NewDB(t) logger := testutil.Logger(t) // GIVEN: a set of helpers to be injected into the templates @@ -145,7 +145,7 @@ func TestBuildPayload(t *testing.T) { } }) - enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, ps, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal()) require.NoError(t, err) // WHEN: a notification is enqueued diff --git a/coderd/notifications/metrics_test.go b/coderd/notifications/metrics_test.go index e88282bbc1861..11c72b0b16eb1 100644 --- a/coderd/notifications/metrics_test.go +++ b/coderd/notifications/metrics_test.go @@ -71,7 +71,7 @@ func TestMetrics(t *testing.T) { database.NotificationMethodInbox: &fakeHandler{}, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -264,7 +264,7 @@ func TestPendingUpdatesMetric(t *testing.T) { database.NotificationMethodInbox: inboxHandler, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -354,7 +354,7 @@ func TestInflightDispatchesMetric(t *testing.T) { database.NotificationMethodInbox: &fakeHandler{}, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -441,7 +441,7 @@ func TestCustomMethodMetricCollection(t *testing.T) { database.NotificationMethodInbox: &fakeHandler{}, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index 8f8a3c82441e0..f5195a6ab7bd6 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -90,7 +90,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -171,7 +171,7 @@ func TestSMTPDispatch(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -236,7 +236,7 @@ func TestWebhookDispatch(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) const ( @@ -327,7 +327,7 @@ func TestBackpressure(t *testing.T) { method: handler, database.NotificationMethodInbox: handler, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock) require.NoError(t, err) user := createSampleUser(t, store) @@ -478,7 +478,7 @@ func TestRetries(t *testing.T) { method: handler, database.NotificationMethodInbox: &fakeHandler{}, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -542,7 +542,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) { mgr, err := notifications.NewManager(cfg, noopInterceptor, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager")) require.NoError(t, err) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -667,7 +667,7 @@ func TestNotifierPaused(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) // Pause the notifier. @@ -1410,6 +1410,7 @@ func TestNotificationTemplates_Golden(t *testing.T) { smtpEnqueuer, err := notifications.NewStoreEnqueuer( notificationCfg, *db, + pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal(), @@ -1535,6 +1536,7 @@ func TestNotificationTemplates_Golden(t *testing.T) { httpEnqueuer, err := notifications.NewStoreEnqueuer( defaultNotificationsConfig(database.NotificationMethodWebhook), *db, + pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal(), @@ -1638,11 +1640,11 @@ func TestDisabledByDefaultBeforeEnqueue(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) cfg := defaultNotificationsConfig(database.NotificationMethodSmtp) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -1664,12 +1666,12 @@ func TestDisabledBeforeEnqueue(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) // GIVEN: an enqueuer & a sample user cfg := defaultNotificationsConfig(database.NotificationMethodSmtp) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -1712,7 +1714,7 @@ func TestDisabledAfterEnqueue(t *testing.T) { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -1821,7 +1823,7 @@ func TestCustomNotificationMethod(t *testing.T) { _ = mgr.Stop(ctx) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) // WHEN: a notification of that template is enqueued, it should be delivered with the configured method - not the default. @@ -1914,7 +1916,7 @@ func TestNotificationDuplicates(t *testing.T) { mClock := quartz.NewMock(t) mClock.Set(time.Date(2024, 1, 15, 9, 0, 0, 0, time.UTC)) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock) require.NoError(t, err) user := createSampleUser(t, store) @@ -1940,12 +1942,12 @@ func TestNotificationDuplicates(t *testing.T) { func TestNotificationMethodCannotDefaultToInbox(t *testing.T) { t.Parallel() - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) cfg := defaultNotificationsConfig(database.NotificationMethodInbox) - _, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) + _, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) require.ErrorIs(t, err, notifications.InvalidDefaultNotificationMethodError{Method: string(database.NotificationMethodInbox)}) } @@ -2020,7 +2022,7 @@ func TestNotificationTargetMatrix(t *testing.T) { mClock := quartz.NewMock(t) mClock.Set(time.Date(2024, 1, 15, 9, 0, 0, 0, time.UTC)) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock) require.NoError(t, err) user := createSampleUser(t, store) @@ -2041,7 +2043,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) // Given: Coder Inbox is enabled and SMTP/Webhook are disabled. @@ -2050,7 +2052,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { cfg.SMTP = codersdk.NotificationsEmailConfig{} cfg.Webhook = codersdk.NotificationsWebhookConfig{} - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) require.NoError(t, err) user := createSampleUser(t, store) @@ -2066,7 +2068,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) // Given: Coder Inbox/Webhook are disabled and SMTP is enabled. @@ -2074,7 +2076,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { cfg.Inbox.Enabled = false cfg.Webhook = codersdk.NotificationsWebhookConfig{} - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) require.NoError(t, err) user := createSampleUser(t, store) @@ -2090,7 +2092,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) // Given: Coder Inbox/SMTP are disabled and Webhook is enabled. @@ -2098,7 +2100,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { cfg.Inbox.Enabled = false cfg.SMTP = codersdk.NotificationsEmailConfig{} - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) require.NoError(t, err) user := createSampleUser(t, store) diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index 1e0c3f4b271f8..44fef904461c0 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -12,6 +12,7 @@ import ( "golang.org/x/xerrors" "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/notifications/dispatch" "github.com/coder/coder/v2/coderd/notifications/render" "github.com/coder/coder/v2/coderd/notifications/types" @@ -52,6 +53,7 @@ type notifier struct { cfg codersdk.NotificationsConfig log slog.Logger store Store + ps pubsub.Pubsub stopOnce sync.Once outerCtx context.Context @@ -67,8 +69,17 @@ type notifier struct { clock quartz.Clock } -func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, - hr map[database.NotificationMethod]Handler, helpers template.FuncMap, metrics *Metrics, clock quartz.Clock, +func newNotifier( + outerCtx context.Context, + cfg codersdk.NotificationsConfig, + id uuid.UUID, + log slog.Logger, + db Store, + ps pubsub.Pubsub, + hr map[database.NotificationMethod]Handler, + helpers template.FuncMap, + metrics *Metrics, + clock quartz.Clock, ) *notifier { gracefulCtx, gracefulCancel := context.WithCancel(outerCtx) return ¬ifier{ @@ -80,6 +91,7 @@ func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id gracefulCancel: gracefulCancel, done: make(chan any), store: db, + ps: ps, handlers: hr, helpers: helpers, metrics: metrics, From 98f3b170f063d4d3fcf313157bc732bbd7fec532 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 16 Apr 2025 14:48:25 +0100 Subject: [PATCH 3/7] feat(coderd/notifications): notify pubsub on enqueue --- coderd/notifications/enqueuer.go | 9 ++++ coderd/notifications/fetcher_internal_test.go | 23 ++++++++ coderd/notifications/manager.go | 1 + coderd/notifications/notifications_test.go | 52 +++++++++++++++++++ coderd/notifications/notifier.go | 37 +++++++++---- 5 files changed, 112 insertions(+), 10 deletions(-) diff --git a/coderd/notifications/enqueuer.go b/coderd/notifications/enqueuer.go index 58bbd66b67070..24689f7f59e8d 100644 --- a/coderd/notifications/enqueuer.go +++ b/coderd/notifications/enqueuer.go @@ -27,6 +27,8 @@ var ( ErrDuplicate = xerrors.New("duplicate notification") ) +const EventNotificationEnqueued = "notification_enqueued" + type InvalidDefaultNotificationMethodError struct { Method string } @@ -83,6 +85,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI // Enqueue queues a notification message for later delivery. // Messages will be dequeued by a notifier later and dispatched. func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, data map[string]any, createdBy string, targets ...uuid.UUID) ([]uuid.UUID, error) { + defer func() { + // Publish an event to notify that a notification has been enqueued. + // Failure to publish is acceptable, as the fetcher will still process the + // message on its next run. + // TODO(Cian): debounce this to maybe once per second or so? + _ = s.ps.Publish(EventNotificationEnqueued, nil) + }() metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{ UserID: userID, NotificationTemplateID: templateID, diff --git a/coderd/notifications/fetcher_internal_test.go b/coderd/notifications/fetcher_internal_test.go index a8d0149c883b8..946f368da0916 100644 --- a/coderd/notifications/fetcher_internal_test.go +++ b/coderd/notifications/fetcher_internal_test.go @@ -11,6 +11,7 @@ import ( "golang.org/x/xerrors" "github.com/coder/coder/v2/coderd/database/dbmock" + "github.com/coder/coder/v2/coderd/database/pubsub/psmock" ) func TestNotifier_FetchHelpers(t *testing.T) { @@ -21,9 +22,11 @@ func TestNotifier_FetchHelpers(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, helpers: template.FuncMap{}, } @@ -48,9 +51,11 @@ func TestNotifier_FetchHelpers(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, helpers: template.FuncMap{}, } @@ -67,9 +72,11 @@ func TestNotifier_FetchHelpers(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, helpers: template.FuncMap{}, } @@ -90,9 +97,11 @@ func TestNotifier_FetchAppName(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("ACME Inc.", nil) @@ -107,9 +116,11 @@ func TestNotifier_FetchAppName(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", sql.ErrNoRows) @@ -125,9 +136,11 @@ func TestNotifier_FetchAppName(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", nil) @@ -143,9 +156,11 @@ func TestNotifier_FetchAppName(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", xerrors.New("internal error")) @@ -164,9 +179,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("https://example.com/logo.png", nil) @@ -181,9 +198,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", sql.ErrNoRows) @@ -199,9 +218,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", nil) @@ -217,9 +238,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", xerrors.New("internal error")) diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index 2b0b25c3f59f6..a70b6253b1d17 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -95,6 +95,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, log: log, cfg: cfg, store: store, + ps: ps, // Buffer successful/failed notification dispatches in memory to reduce load on the store. // diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index f5195a6ab7bd6..351e603a24fed 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -2112,6 +2112,58 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { }) } +func TestNotificationEnqueuePubsubNotify(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("This test requires postgres; it relies on business-logic only implemented in the database") + } + + store, pubsub := dbtestutil.NewDB(t) + logger := testutil.Logger(t) + // nolint:gocritic // Unit test. + ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitShort)) + + const method = database.NotificationMethodWebhook + cfg := defaultNotificationsConfig(method) + + // Tune the queue to fetch infrequently. + const fetchInterval = time.Minute + cfg.FetchInterval = serpent.Duration(fetchInterval) + + mClock := quartz.NewMock(t) + fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval") + defer fetchTrap.Close() + + mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), + logger.Named("manager"), notifications.WithTestClock(mClock)) + require.NoError(t, err) + + handler := &chanHandler{calls: make(chan dispatchCall)} + mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ + method: handler, + database.NotificationMethodInbox: handler, + }) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock) + require.NoError(t, err) + + user := createSampleUser(t, store) + + // Given: the manager is running and the fetch interval is set to 1 minute. + mgr.Run(ctx) + fetchTrap.MustWait(ctx).Release() + + // When: a notification is enqueued + _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test") + require.NoError(t, err) + + // Then: we attempt to dispatch the notification immediately. + call := testutil.TryReceive(ctx, t, handler.calls) + testutil.RequireSend(ctx, t, call.result, dispatchResult{ + retryable: false, + err: nil, + }) +} + type fakeHandler struct { mu sync.RWMutex succeeded, failed []string diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index 44fef904461c0..f6fad958f6b23 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -108,11 +108,6 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes n.log.Info(context.Background(), "gracefully stopped") }() - // TODO: idea from Cian: instead of querying the database on a short interval, we could wait for pubsub notifications. - // if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these? - // PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get - // triggered by a code path, but rather by a timeout expiring which makes the message retryable) - // loopTick is used to synchronize the goroutine that processes messages with the ticker. loopTick := make(chan chan struct{}) // loopDone is used to signal when the processing loop has exited due to @@ -139,7 +134,7 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes } }() - // run the ticker with the graceful context, so we stop fetching after stop() is called + // Periodically trigger the processing loop. tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { c := make(chan struct{}) loopTick <- c @@ -149,10 +144,32 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes return nil }, "notifier", "fetchInterval") - // Note the order of operations here. - _ = tick.Wait() // will block until gracefulCtx is done - close(loopTick) // happens immediately - <-loopDone // wait for the current processing loop to finish + // Also signal the processing loop when a notification is enqueued. + if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) { + n.log.Debug(n.outerCtx, "got pubsub event", slog.F("event", EventNotificationEnqueued)) + c := make(chan struct{}) + select { + case <-n.gracefulCtx.Done(): + return + // This is a no-op if the notifier is paused. + case loopTick <- c: + <-c // Wait for the processing loop to finish. + default: + // If the loop is busy, don't send a notification. + n.log.Debug(ctx, "notifier busy, skipping notification") + return + } + }); err != nil { + // Intentionally not making this a fatal error. The notifier will still run, + // albeit without notification events. + n.log.Error(n.outerCtx, "failed to subscribe to notification events", slog.Error(err)) + } else { + defer stopListen() + } + + _ = tick.Wait() // Block until the ticker exits. This will be after gracefulCtx is canceled. + close(loopTick) // Signal the processing goroutine to stop. + <-loopDone // Wait for the processing goroutine to exit. // only errors we can return are context errors. Only return an error if the outer context // was canceled, not if we were gracefully stopped. From 5abde6d66cdf6c1b92596979c570f4adc4fff8d0 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 22 Apr 2025 17:45:04 +0100 Subject: [PATCH 4/7] test enqeueue multiple --- coderd/notifications/notifications_test.go | 27 +++++++++++++++------- coderd/notifications/notifier.go | 2 +- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index 351e603a24fed..45732373b6905 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -2152,16 +2152,27 @@ func TestNotificationEnqueuePubsubNotify(t *testing.T) { mgr.Run(ctx) fetchTrap.MustWait(ctx).Release() - // When: a notification is enqueued - _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test") - require.NoError(t, err) + // When: a number of notifications are enqueued + const numEnqueued = 10 + for i := range numEnqueued { + _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, fmt.Sprintf("test %d", i)) + require.NoError(t, err) + } // Then: we attempt to dispatch the notification immediately. - call := testutil.TryReceive(ctx, t, handler.calls) - testutil.RequireSend(ctx, t, call.result, dispatchResult{ - retryable: false, - err: nil, - }) + recvDone := make(chan struct{}) + go func() { + defer close(recvDone) + for range numEnqueued { + call := testutil.TryReceive(ctx, t, handler.calls) + <-time.After(testutil.IntervalFast) // Simulate some processing time. + testutil.RequireSend(ctx, t, call.result, dispatchResult{ + retryable: false, + err: nil, + }) + } + }() + _ = testutil.TryReceive(ctx, t, recvDone) } type fakeHandler struct { diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index f6fad958f6b23..13244e1523abf 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -153,7 +153,7 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes return // This is a no-op if the notifier is paused. case loopTick <- c: - <-c // Wait for the processing loop to finish. + // We do not wait for the processing loop to finish here. default: // If the loop is busy, don't send a notification. n.log.Debug(ctx, "notifier busy, skipping notification") From 9a812acb06fe48101da7463f1b3a2263056b3cee Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 22 Apr 2025 17:56:00 +0100 Subject: [PATCH 5/7] avoid spamming notification dispatch --- coderd/notifications/notifications_test.go | 2 ++ coderd/notifications/notifier.go | 13 ++++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index 45732373b6905..4d5f90672227b 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -2173,6 +2173,8 @@ func TestNotificationEnqueuePubsubNotify(t *testing.T) { } }() _ = testutil.TryReceive(ctx, t, recvDone) + // TODO: this sometimes fails with + // t.go:106: 2025-04-22 16:55:04.153 [warn] manager: content canceled with pending updates in buffer, these messages will be sent again after lease expires success_count=6 failure_count=0 } type fakeHandler struct { diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index 13244e1523abf..e285542ab8a9d 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "sync" + "sync/atomic" "text/template" "github.com/google/uuid" @@ -134,8 +135,13 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes } }() + // Keep track of how many notification_enqueued events seen this loop to avoid + // unnecessary database load. + var enqueueEventsThisLoop atomic.Int64 + // Periodically trigger the processing loop. tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { + defer enqueueEventsThisLoop.Store(0) c := make(chan struct{}) loopTick <- c // Wait for the processing to finish before continuing. The ticker will @@ -146,7 +152,12 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes // Also signal the processing loop when a notification is enqueued. if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) { - n.log.Debug(n.outerCtx, "got pubsub event", slog.F("event", EventNotificationEnqueued)) + enqueued := enqueueEventsThisLoop.Add(1) + skipEarlyDispatch := enqueued > 1 + n.log.Debug(n.outerCtx, "got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued)) + if enqueued > 1 { + return + } c := make(chan struct{}) select { case <-n.gracefulCtx.Done(): From 21cad84ebe776db6b843d1cadb015c88a4d4ee89 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 23 Apr 2025 17:52:40 +0100 Subject: [PATCH 6/7] address some but not all PR comments --- coderd/notifications/enqueuer.go | 12 +++++------- coderd/notifications/notifier.go | 18 +++++++++++------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/coderd/notifications/enqueuer.go b/coderd/notifications/enqueuer.go index 24689f7f59e8d..7b1ddfcd7feaf 100644 --- a/coderd/notifications/enqueuer.go +++ b/coderd/notifications/enqueuer.go @@ -85,13 +85,6 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI // Enqueue queues a notification message for later delivery. // Messages will be dequeued by a notifier later and dispatched. func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, data map[string]any, createdBy string, targets ...uuid.UUID) ([]uuid.UUID, error) { - defer func() { - // Publish an event to notify that a notification has been enqueued. - // Failure to publish is acceptable, as the fetcher will still process the - // message on its next run. - // TODO(Cian): debounce this to maybe once per second or so? - _ = s.ps.Publish(EventNotificationEnqueued, nil) - }() metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{ UserID: userID, NotificationTemplateID: templateID, @@ -171,6 +164,11 @@ func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID } s.log.Debug(ctx, "enqueued notification", slog.F("msg_ids", uuids)) + // Publish an event to notify that a notification has been enqueued. + // Failure to publish is acceptable, as the fetcher will still process the + // message on its next run. + // TODO(Cian): debounce this to maybe once per second or so? + _ = s.ps.Publish(EventNotificationEnqueued, nil) return uuids, nil } diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index e285542ab8a9d..f94d33af66750 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -123,12 +123,14 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes if err != nil { n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err)) } + if !ok { // Notifier is paused, skip processing. + close(c) + continue + } - if ok { - err = n.process(n.outerCtx, success, failure) - if err != nil { - n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err)) - } + err = n.process(n.outerCtx, success, failure) + if err != nil { + n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err)) } // Signal that we've finished processing one iteration. close(c) @@ -141,6 +143,7 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes // Periodically trigger the processing loop. tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { + // Reset the enqueue counter after each tick. defer enqueueEventsThisLoop.Store(0) c := make(chan struct{}) loopTick <- c @@ -154,8 +157,9 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) { enqueued := enqueueEventsThisLoop.Add(1) skipEarlyDispatch := enqueued > 1 - n.log.Debug(n.outerCtx, "got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued)) - if enqueued > 1 { + n.log.Debug(n.outerCtx, "TODO REMOVE THIS got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued)) + if skipEarlyDispatch { + // Avoid overloading the database. We will get to these in the next tick. return } c := make(chan struct{}) From d60b9cb5b1293ebe049f625f4500dfd28c303865 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Mon, 19 May 2025 17:14:01 +0100 Subject: [PATCH 7/7] reduce diff size --- cli/server.go | 62 +++++++++++++++++++++++++-------------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/cli/server.go b/cli/server.go index bfd69643c9889..b7a0d1a811307 100644 --- a/cli/server.go +++ b/cli/server.go @@ -928,6 +928,37 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. options.StatsBatcher = batcher defer closeBatcher() + // Manage notifications. + var ( + notificationsCfg = options.DeploymentValues.Notifications + notificationsManager *notifications.Manager + ) + + metrics := notifications.NewMetrics(options.PrometheusRegistry) + helpers := templateHelpers(options) + + // The enqueuer is responsible for enqueueing notifications to the given store. + enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, options.Pubsub, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal()) + if err != nil { + return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err) + } + options.NotificationsEnqueuer = enqueuer + + // The notification manager is responsible for: + // - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications) + // - keeping the store updated with status updates + notificationsManager, err = notifications.NewManager(notificationsCfg, options.Database, options.Pubsub, helpers, metrics, logger.Named("notifications.manager")) + if err != nil { + return xerrors.Errorf("failed to instantiate notification manager: %w", err) + } + + // nolint:gocritic // We need to run the manager in a notifier context. + notificationsManager.Run(dbauthz.AsNotifier(ctx)) + + // Run report generator to distribute periodic reports. + notificationReportGenerator := reports.NewReportGenerator(ctx, logger.Named("notifications.report_generator"), options.Database, options.NotificationsEnqueuer, quartz.NewReal()) + defer notificationReportGenerator.Close() + // We use a separate coderAPICloser so the Enterprise API // can have its own close functions. This is cleaner // than abstracting the Coder API itself. @@ -975,37 +1006,6 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. return xerrors.Errorf("write config url: %w", err) } - // Manage notifications. - var ( - notificationsCfg = options.DeploymentValues.Notifications - notificationsManager *notifications.Manager - ) - - metrics := notifications.NewMetrics(options.PrometheusRegistry) - helpers := templateHelpers(options) - - // The enqueuer is responsible for enqueueing notifications to the given store. - enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, options.Pubsub, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal()) - if err != nil { - return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err) - } - options.NotificationsEnqueuer = enqueuer - - // The notification manager is responsible for: - // - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications) - // - keeping the store updated with status updates - notificationsManager, err = notifications.NewManager(notificationsCfg, options.Database, options.Pubsub, helpers, metrics, logger.Named("notifications.manager")) - if err != nil { - return xerrors.Errorf("failed to instantiate notification manager: %w", err) - } - - // nolint:gocritic // We need to run the manager in a notifier context. - notificationsManager.Run(dbauthz.AsNotifier(ctx)) - - // Run report generator to distribute periodic reports. - notificationReportGenerator := reports.NewReportGenerator(ctx, logger.Named("notifications.report_generator"), options.Database, options.NotificationsEnqueuer, quartz.NewReal()) - defer notificationReportGenerator.Close() - // Since errCh only has one buffered slot, all routines // sending on it must be wrapped in a select/default to // avoid leaving dangling goroutines waiting for the