diff --git a/cli/server.go b/cli/server.go index c5532e07e7a81..b7a0d1a811307 100644 --- a/cli/server.go +++ b/cli/server.go @@ -938,7 +938,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. helpers := templateHelpers(options) // The enqueuer is responsible for enqueueing notifications to the given store. - enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal()) + enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, options.Pubsub, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal()) if err != nil { return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err) } diff --git a/coderd/notifications/enqueuer.go b/coderd/notifications/enqueuer.go index ff3af3fc5eaa1..7b1ddfcd7feaf 100644 --- a/coderd/notifications/enqueuer.go +++ b/coderd/notifications/enqueuer.go @@ -16,6 +16,7 @@ import ( "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/notifications/render" "github.com/coder/coder/v2/coderd/notifications/types" "github.com/coder/coder/v2/codersdk" @@ -26,6 +27,8 @@ var ( ErrDuplicate = xerrors.New("duplicate notification") ) +const EventNotificationEnqueued = "notification_enqueued" + type InvalidDefaultNotificationMethodError struct { Method string } @@ -36,6 +39,7 @@ func (e InvalidDefaultNotificationMethodError) Error() string { type StoreEnqueuer struct { store Store + ps pubsub.Pubsub log slog.Logger defaultMethod database.NotificationMethod @@ -50,7 +54,7 @@ type StoreEnqueuer struct { } // NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store. -func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) { +func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) { var method database.NotificationMethod // TODO(DanielleMaywood): // Currently we do not want to allow setting `inbox` as the default notification method. @@ -63,6 +67,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem return &StoreEnqueuer{ store: store, + ps: ps, log: log, defaultMethod: method, defaultEnabled: cfg.Enabled(), @@ -159,6 +164,11 @@ func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID } s.log.Debug(ctx, "enqueued notification", slog.F("msg_ids", uuids)) + // Publish an event to notify that a notification has been enqueued. + // Failure to publish is acceptable, as the fetcher will still process the + // message on its next run. + // TODO(Cian): debounce this to maybe once per second or so? + _ = s.ps.Publish(EventNotificationEnqueued, nil) return uuids, nil } diff --git a/coderd/notifications/fetcher_internal_test.go b/coderd/notifications/fetcher_internal_test.go index a8d0149c883b8..946f368da0916 100644 --- a/coderd/notifications/fetcher_internal_test.go +++ b/coderd/notifications/fetcher_internal_test.go @@ -11,6 +11,7 @@ import ( "golang.org/x/xerrors" "github.com/coder/coder/v2/coderd/database/dbmock" + "github.com/coder/coder/v2/coderd/database/pubsub/psmock" ) func TestNotifier_FetchHelpers(t *testing.T) { @@ -21,9 +22,11 @@ func TestNotifier_FetchHelpers(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, helpers: template.FuncMap{}, } @@ -48,9 +51,11 @@ func TestNotifier_FetchHelpers(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, helpers: template.FuncMap{}, } @@ -67,9 +72,11 @@ func TestNotifier_FetchHelpers(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, helpers: template.FuncMap{}, } @@ -90,9 +97,11 @@ func TestNotifier_FetchAppName(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("ACME Inc.", nil) @@ -107,9 +116,11 @@ func TestNotifier_FetchAppName(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", sql.ErrNoRows) @@ -125,9 +136,11 @@ func TestNotifier_FetchAppName(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", nil) @@ -143,9 +156,11 @@ func TestNotifier_FetchAppName(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", xerrors.New("internal error")) @@ -164,9 +179,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("https://example.com/logo.png", nil) @@ -181,9 +198,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", sql.ErrNoRows) @@ -199,9 +218,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", nil) @@ -217,9 +238,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) { ctrl := gomock.NewController(t) dbmock := dbmock.NewMockStore(ctrl) + psmock := psmock.NewMockPubsub(ctrl) n := ¬ifier{ store: dbmock, + ps: psmock, } dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", xerrors.New("internal error")) diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index 1a2c418a014bb..a70b6253b1d17 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -42,6 +42,7 @@ type Manager struct { cfg codersdk.NotificationsConfig store Store + ps pubsub.Pubsub log slog.Logger handlers map[database.NotificationMethod]Handler @@ -94,6 +95,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, log: log, cfg: cfg, store: store, + ps: ps, // Buffer successful/failed notification dispatches in memory to reduce load on the store. // @@ -168,7 +170,8 @@ func (m *Manager) loop(ctx context.Context) error { var eg errgroup.Group - m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) + // Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications. + m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.ps, m.handlers, m.helpers, m.metrics, m.clock) eg.Go(func() error { // run the notifier which will handle dequeueing and dispatching notifications. return m.notifier.run(m.success, m.failure) diff --git a/coderd/notifications/manager_test.go b/coderd/notifications/manager_test.go index e9c309f0a09d3..e184fd0fb68f5 100644 --- a/coderd/notifications/manager_test.go +++ b/coderd/notifications/manager_test.go @@ -53,7 +53,7 @@ func TestBufferedUpdates(t *testing.T) { } mgr.WithHandlers(handlers) - enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, ps, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal()) require.NoError(t, err) user := dbgen.User(t, store, database.User{}) @@ -110,7 +110,7 @@ func TestBuildPayload(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, ps := dbtestutil.NewDB(t) logger := testutil.Logger(t) // GIVEN: a set of helpers to be injected into the templates @@ -145,7 +145,7 @@ func TestBuildPayload(t *testing.T) { } }) - enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, ps, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal()) require.NoError(t, err) // WHEN: a notification is enqueued diff --git a/coderd/notifications/metrics_test.go b/coderd/notifications/metrics_test.go index e88282bbc1861..11c72b0b16eb1 100644 --- a/coderd/notifications/metrics_test.go +++ b/coderd/notifications/metrics_test.go @@ -71,7 +71,7 @@ func TestMetrics(t *testing.T) { database.NotificationMethodInbox: &fakeHandler{}, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -264,7 +264,7 @@ func TestPendingUpdatesMetric(t *testing.T) { database.NotificationMethodInbox: inboxHandler, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -354,7 +354,7 @@ func TestInflightDispatchesMetric(t *testing.T) { database.NotificationMethodInbox: &fakeHandler{}, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -441,7 +441,7 @@ func TestCustomMethodMetricCollection(t *testing.T) { database.NotificationMethodInbox: &fakeHandler{}, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index 8f8a3c82441e0..4d5f90672227b 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -90,7 +90,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -171,7 +171,7 @@ func TestSMTPDispatch(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -236,7 +236,7 @@ func TestWebhookDispatch(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) const ( @@ -327,7 +327,7 @@ func TestBackpressure(t *testing.T) { method: handler, database.NotificationMethodInbox: handler, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock) require.NoError(t, err) user := createSampleUser(t, store) @@ -478,7 +478,7 @@ func TestRetries(t *testing.T) { method: handler, database.NotificationMethodInbox: &fakeHandler{}, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -542,7 +542,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) { mgr, err := notifications.NewManager(cfg, noopInterceptor, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager")) require.NoError(t, err) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -667,7 +667,7 @@ func TestNotifierPaused(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) // Pause the notifier. @@ -1410,6 +1410,7 @@ func TestNotificationTemplates_Golden(t *testing.T) { smtpEnqueuer, err := notifications.NewStoreEnqueuer( notificationCfg, *db, + pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal(), @@ -1535,6 +1536,7 @@ func TestNotificationTemplates_Golden(t *testing.T) { httpEnqueuer, err := notifications.NewStoreEnqueuer( defaultNotificationsConfig(database.NotificationMethodWebhook), *db, + pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal(), @@ -1638,11 +1640,11 @@ func TestDisabledByDefaultBeforeEnqueue(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) cfg := defaultNotificationsConfig(database.NotificationMethodSmtp) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -1664,12 +1666,12 @@ func TestDisabledBeforeEnqueue(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) // GIVEN: an enqueuer & a sample user cfg := defaultNotificationsConfig(database.NotificationMethodSmtp) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -1712,7 +1714,7 @@ func TestDisabledAfterEnqueue(t *testing.T) { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -1821,7 +1823,7 @@ func TestCustomNotificationMethod(t *testing.T) { _ = mgr.Stop(ctx) }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) // WHEN: a notification of that template is enqueued, it should be delivered with the configured method - not the default. @@ -1914,7 +1916,7 @@ func TestNotificationDuplicates(t *testing.T) { mClock := quartz.NewMock(t) mClock.Set(time.Date(2024, 1, 15, 9, 0, 0, 0, time.UTC)) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock) require.NoError(t, err) user := createSampleUser(t, store) @@ -1940,12 +1942,12 @@ func TestNotificationDuplicates(t *testing.T) { func TestNotificationMethodCannotDefaultToInbox(t *testing.T) { t.Parallel() - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) cfg := defaultNotificationsConfig(database.NotificationMethodInbox) - _, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) + _, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) require.ErrorIs(t, err, notifications.InvalidDefaultNotificationMethodError{Method: string(database.NotificationMethodInbox)}) } @@ -2020,7 +2022,7 @@ func TestNotificationTargetMatrix(t *testing.T) { mClock := quartz.NewMock(t) mClock.Set(time.Date(2024, 1, 15, 9, 0, 0, 0, time.UTC)) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock) require.NoError(t, err) user := createSampleUser(t, store) @@ -2041,7 +2043,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) // Given: Coder Inbox is enabled and SMTP/Webhook are disabled. @@ -2050,7 +2052,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { cfg.SMTP = codersdk.NotificationsEmailConfig{} cfg.Webhook = codersdk.NotificationsWebhookConfig{} - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) require.NoError(t, err) user := createSampleUser(t, store) @@ -2066,7 +2068,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) // Given: Coder Inbox/Webhook are disabled and SMTP is enabled. @@ -2074,7 +2076,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { cfg.Inbox.Enabled = false cfg.Webhook = codersdk.NotificationsWebhookConfig{} - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) require.NoError(t, err) user := createSampleUser(t, store) @@ -2090,7 +2092,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { // nolint:gocritic // Unit test. ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong)) - store, _ := dbtestutil.NewDB(t) + store, pubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) // Given: Coder Inbox/SMTP are disabled and Webhook is enabled. @@ -2098,7 +2100,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { cfg.Inbox.Enabled = false cfg.SMTP = codersdk.NotificationsEmailConfig{} - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t)) require.NoError(t, err) user := createSampleUser(t, store) @@ -2110,6 +2112,71 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) { }) } +func TestNotificationEnqueuePubsubNotify(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("This test requires postgres; it relies on business-logic only implemented in the database") + } + + store, pubsub := dbtestutil.NewDB(t) + logger := testutil.Logger(t) + // nolint:gocritic // Unit test. + ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitShort)) + + const method = database.NotificationMethodWebhook + cfg := defaultNotificationsConfig(method) + + // Tune the queue to fetch infrequently. + const fetchInterval = time.Minute + cfg.FetchInterval = serpent.Duration(fetchInterval) + + mClock := quartz.NewMock(t) + fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval") + defer fetchTrap.Close() + + mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), + logger.Named("manager"), notifications.WithTestClock(mClock)) + require.NoError(t, err) + + handler := &chanHandler{calls: make(chan dispatchCall)} + mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ + method: handler, + database.NotificationMethodInbox: handler, + }) + enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock) + require.NoError(t, err) + + user := createSampleUser(t, store) + + // Given: the manager is running and the fetch interval is set to 1 minute. + mgr.Run(ctx) + fetchTrap.MustWait(ctx).Release() + + // When: a number of notifications are enqueued + const numEnqueued = 10 + for i := range numEnqueued { + _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, fmt.Sprintf("test %d", i)) + require.NoError(t, err) + } + + // Then: we attempt to dispatch the notification immediately. + recvDone := make(chan struct{}) + go func() { + defer close(recvDone) + for range numEnqueued { + call := testutil.TryReceive(ctx, t, handler.calls) + <-time.After(testutil.IntervalFast) // Simulate some processing time. + testutil.RequireSend(ctx, t, call.result, dispatchResult{ + retryable: false, + err: nil, + }) + } + }() + _ = testutil.TryReceive(ctx, t, recvDone) + // TODO: this sometimes fails with + // t.go:106: 2025-04-22 16:55:04.153 [warn] manager: content canceled with pending updates in buffer, these messages will be sent again after lease expires success_count=6 failure_count=0 +} + type fakeHandler struct { mu sync.RWMutex succeeded, failed []string diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index b2713533cecb3..f94d33af66750 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "sync" + "sync/atomic" "text/template" "github.com/google/uuid" @@ -12,6 +13,7 @@ import ( "golang.org/x/xerrors" "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/notifications/dispatch" "github.com/coder/coder/v2/coderd/notifications/render" "github.com/coder/coder/v2/coderd/notifications/types" @@ -52,6 +54,7 @@ type notifier struct { cfg codersdk.NotificationsConfig log slog.Logger store Store + ps pubsub.Pubsub stopOnce sync.Once outerCtx context.Context @@ -67,8 +70,17 @@ type notifier struct { clock quartz.Clock } -func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, - hr map[database.NotificationMethod]Handler, helpers template.FuncMap, metrics *Metrics, clock quartz.Clock, +func newNotifier( + outerCtx context.Context, + cfg codersdk.NotificationsConfig, + id uuid.UUID, + log slog.Logger, + db Store, + ps pubsub.Pubsub, + hr map[database.NotificationMethod]Handler, + helpers template.FuncMap, + metrics *Metrics, + clock quartz.Clock, ) *notifier { gracefulCtx, gracefulCancel := context.WithCancel(outerCtx) return ¬ifier{ @@ -80,6 +92,7 @@ func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id gracefulCancel: gracefulCancel, done: make(chan any), store: db, + ps: ps, handlers: hr, helpers: helpers, metrics: metrics, @@ -96,30 +109,83 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes n.log.Info(context.Background(), "gracefully stopped") }() - // TODO: idea from Cian: instead of querying the database on a short interval, we could wait for pubsub notifications. - // if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these? - // PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get - // triggered by a code path, but rather by a timeout expiring which makes the message retryable) - - // run the ticker with the graceful context, so we stop fetching after stop() is called - tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { - // Check if notifier is not paused. - ok, err := n.ensureRunning(n.outerCtx) - if err != nil { - n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err)) - } + // loopTick is used to synchronize the goroutine that processes messages with the ticker. + loopTick := make(chan chan struct{}) + // loopDone is used to signal when the processing loop has exited due to + // graceful stop or otherwise. + loopDone := make(chan struct{}) + go func() { + defer close(loopDone) + for c := range loopTick { + n.log.Info(n.outerCtx, "processing messages") + // Check if notifier is not paused. + ok, err := n.ensureRunning(n.outerCtx) + if err != nil { + n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err)) + } + if !ok { // Notifier is paused, skip processing. + close(c) + continue + } - if ok { err = n.process(n.outerCtx, success, failure) if err != nil { n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err)) } + // Signal that we've finished processing one iteration. + close(c) } - // we don't return any errors because we don't want to kill the loop because of them. + }() + + // Keep track of how many notification_enqueued events seen this loop to avoid + // unnecessary database load. + var enqueueEventsThisLoop atomic.Int64 + + // Periodically trigger the processing loop. + tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error { + // Reset the enqueue counter after each tick. + defer enqueueEventsThisLoop.Store(0) + c := make(chan struct{}) + loopTick <- c + // Wait for the processing to finish before continuing. The ticker will + // compensate for the time it takes to process the messages. + <-c return nil }, "notifier", "fetchInterval") - _ = tick.Wait() + // Also signal the processing loop when a notification is enqueued. + if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) { + enqueued := enqueueEventsThisLoop.Add(1) + skipEarlyDispatch := enqueued > 1 + n.log.Debug(n.outerCtx, "TODO REMOVE THIS got pubsub event", slog.F("count", enqueued), slog.F("skip_early_dispatch", skipEarlyDispatch), slog.F("event", EventNotificationEnqueued)) + if skipEarlyDispatch { + // Avoid overloading the database. We will get to these in the next tick. + return + } + c := make(chan struct{}) + select { + case <-n.gracefulCtx.Done(): + return + // This is a no-op if the notifier is paused. + case loopTick <- c: + // We do not wait for the processing loop to finish here. + default: + // If the loop is busy, don't send a notification. + n.log.Debug(ctx, "notifier busy, skipping notification") + return + } + }); err != nil { + // Intentionally not making this a fatal error. The notifier will still run, + // albeit without notification events. + n.log.Error(n.outerCtx, "failed to subscribe to notification events", slog.Error(err)) + } else { + defer stopListen() + } + + _ = tick.Wait() // Block until the ticker exits. This will be after gracefulCtx is canceled. + close(loopTick) // Signal the processing goroutine to stop. + <-loopDone // Wait for the processing goroutine to exit. + // only errors we can return are context errors. Only return an error if the outer context // was canceled, not if we were gracefully stopped. if n.outerCtx.Err() != nil {