Skip to content

Commit c9c634a

Browse files
committed
chore(coderd/notifications): plumb through pubsub into enqueuer and dispatcher
1 parent d8a9714 commit c9c634a

File tree

7 files changed

+84
-65
lines changed

7 files changed

+84
-65
lines changed

cli/server.go

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -928,37 +928,6 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
928928
options.StatsBatcher = batcher
929929
defer closeBatcher()
930930

931-
// Manage notifications.
932-
var (
933-
notificationsCfg = options.DeploymentValues.Notifications
934-
notificationsManager *notifications.Manager
935-
)
936-
937-
metrics := notifications.NewMetrics(options.PrometheusRegistry)
938-
helpers := templateHelpers(options)
939-
940-
// The enqueuer is responsible for enqueueing notifications to the given store.
941-
enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal())
942-
if err != nil {
943-
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
944-
}
945-
options.NotificationsEnqueuer = enqueuer
946-
947-
// The notification manager is responsible for:
948-
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
949-
// - keeping the store updated with status updates
950-
notificationsManager, err = notifications.NewManager(notificationsCfg, options.Database, options.Pubsub, helpers, metrics, logger.Named("notifications.manager"))
951-
if err != nil {
952-
return xerrors.Errorf("failed to instantiate notification manager: %w", err)
953-
}
954-
955-
// nolint:gocritic // We need to run the manager in a notifier context.
956-
notificationsManager.Run(dbauthz.AsNotifier(ctx))
957-
958-
// Run report generator to distribute periodic reports.
959-
notificationReportGenerator := reports.NewReportGenerator(ctx, logger.Named("notifications.report_generator"), options.Database, options.NotificationsEnqueuer, quartz.NewReal())
960-
defer notificationReportGenerator.Close()
961-
962931
// We use a separate coderAPICloser so the Enterprise API
963932
// can have its own close functions. This is cleaner
964933
// than abstracting the Coder API itself.
@@ -1006,6 +975,37 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
1006975
return xerrors.Errorf("write config url: %w", err)
1007976
}
1008977

978+
// Manage notifications.
979+
var (
980+
notificationsCfg = options.DeploymentValues.Notifications
981+
notificationsManager *notifications.Manager
982+
)
983+
984+
metrics := notifications.NewMetrics(options.PrometheusRegistry)
985+
helpers := templateHelpers(options)
986+
987+
// The enqueuer is responsible for enqueueing notifications to the given store.
988+
enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, options.Pubsub, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal())
989+
if err != nil {
990+
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
991+
}
992+
options.NotificationsEnqueuer = enqueuer
993+
994+
// The notification manager is responsible for:
995+
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
996+
// - keeping the store updated with status updates
997+
notificationsManager, err = notifications.NewManager(notificationsCfg, options.Database, options.Pubsub, helpers, metrics, logger.Named("notifications.manager"))
998+
if err != nil {
999+
return xerrors.Errorf("failed to instantiate notification manager: %w", err)
1000+
}
1001+
1002+
// nolint:gocritic // We need to run the manager in a notifier context.
1003+
notificationsManager.Run(dbauthz.AsNotifier(ctx))
1004+
1005+
// Run report generator to distribute periodic reports.
1006+
notificationReportGenerator := reports.NewReportGenerator(ctx, logger.Named("notifications.report_generator"), options.Database, options.NotificationsEnqueuer, quartz.NewReal())
1007+
defer notificationReportGenerator.Close()
1008+
10091009
// Since errCh only has one buffered slot, all routines
10101010
// sending on it must be wrapped in a select/default to
10111011
// avoid leaving dangling goroutines waiting for the

coderd/notifications/enqueuer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/coder/coder/v2/coderd/database"
1818
"github.com/coder/coder/v2/coderd/database/dbtime"
19+
"github.com/coder/coder/v2/coderd/database/pubsub"
1920
"github.com/coder/coder/v2/coderd/notifications/render"
2021
"github.com/coder/coder/v2/coderd/notifications/types"
2122
"github.com/coder/coder/v2/codersdk"
@@ -36,6 +37,7 @@ func (e InvalidDefaultNotificationMethodError) Error() string {
3637

3738
type StoreEnqueuer struct {
3839
store Store
40+
ps pubsub.Pubsub
3941
log slog.Logger
4042

4143
defaultMethod database.NotificationMethod
@@ -50,7 +52,7 @@ type StoreEnqueuer struct {
5052
}
5153

5254
// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store.
53-
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
55+
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
5456
var method database.NotificationMethod
5557
// TODO(DanielleMaywood):
5658
// Currently we do not want to allow setting `inbox` as the default notification method.
@@ -63,6 +65,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem
6365

6466
return &StoreEnqueuer{
6567
store: store,
68+
ps: ps,
6669
log: log,
6770
defaultMethod: method,
6871
defaultEnabled: cfg.Enabled(),

coderd/notifications/manager.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type Manager struct {
4242
cfg codersdk.NotificationsConfig
4343

4444
store Store
45+
ps pubsub.Pubsub
4546
log slog.Logger
4647

4748
handlers map[database.NotificationMethod]Handler
@@ -168,7 +169,8 @@ func (m *Manager) loop(ctx context.Context) error {
168169

169170
var eg errgroup.Group
170171

171-
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
172+
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
173+
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.ps, m.handlers, m.helpers, m.metrics, m.clock)
172174
eg.Go(func() error {
173175
// run the notifier which will handle dequeueing and dispatching notifications.
174176
return m.notifier.run(m.success, m.failure)

coderd/notifications/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestBufferedUpdates(t *testing.T) {
5353
}
5454

5555
mgr.WithHandlers(handlers)
56-
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
56+
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, ps, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
5757
require.NoError(t, err)
5858

5959
user := dbgen.User(t, store, database.User{})
@@ -110,7 +110,7 @@ func TestBuildPayload(t *testing.T) {
110110

111111
// nolint:gocritic // Unit test.
112112
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
113-
store, _ := dbtestutil.NewDB(t)
113+
store, ps := dbtestutil.NewDB(t)
114114
logger := testutil.Logger(t)
115115

116116
// GIVEN: a set of helpers to be injected into the templates
@@ -145,7 +145,7 @@ func TestBuildPayload(t *testing.T) {
145145
}
146146
})
147147

148-
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
148+
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, ps, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
149149
require.NoError(t, err)
150150

151151
// WHEN: a notification is enqueued

coderd/notifications/metrics_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestMetrics(t *testing.T) {
7171
database.NotificationMethodInbox: &fakeHandler{},
7272
})
7373

74-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
74+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
7575
require.NoError(t, err)
7676

7777
user := createSampleUser(t, store)
@@ -264,7 +264,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
264264
database.NotificationMethodInbox: inboxHandler,
265265
})
266266

267-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
267+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
268268
require.NoError(t, err)
269269

270270
user := createSampleUser(t, store)
@@ -354,7 +354,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
354354
database.NotificationMethodInbox: &fakeHandler{},
355355
})
356356

357-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
357+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
358358
require.NoError(t, err)
359359

360360
user := createSampleUser(t, store)
@@ -441,7 +441,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
441441
database.NotificationMethodInbox: &fakeHandler{},
442442
})
443443

444-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
444+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
445445
require.NoError(t, err)
446446

447447
user := createSampleUser(t, store)

0 commit comments

Comments
 (0)