Skip to content

Commit 729e02e

Browse files
committed
chore(coderd/notifications): plumb through pubsub into enqueuer and dispatcher
1 parent 08b8f53 commit 729e02e

File tree

7 files changed

+53
-35
lines changed

7 files changed

+53
-35
lines changed

cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
958958
helpers := templateHelpers(options)
959959

960960
// The enqueuer is responsible for enqueueing notifications to the given store.
961-
enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal())
961+
enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, options.Pubsub, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal())
962962
if err != nil {
963963
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
964964
}

coderd/notifications/enqueuer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/coder/coder/v2/coderd/database"
1818
"github.com/coder/coder/v2/coderd/database/dbtime"
19+
"github.com/coder/coder/v2/coderd/database/pubsub"
1920
"github.com/coder/coder/v2/coderd/notifications/render"
2021
"github.com/coder/coder/v2/coderd/notifications/types"
2122
"github.com/coder/coder/v2/codersdk"
@@ -36,6 +37,7 @@ func (e InvalidDefaultNotificationMethodError) Error() string {
3637

3738
type StoreEnqueuer struct {
3839
store Store
40+
ps pubsub.Pubsub
3941
log slog.Logger
4042

4143
defaultMethod database.NotificationMethod
@@ -50,7 +52,7 @@ type StoreEnqueuer struct {
5052
}
5153

5254
// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store.
53-
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
55+
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
5456
var method database.NotificationMethod
5557
// TODO(DanielleMaywood):
5658
// Currently we do not want to allow setting `inbox` as the default notification method.
@@ -63,6 +65,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem
6365

6466
return &StoreEnqueuer{
6567
store: store,
68+
ps: ps,
6669
log: log,
6770
defaultMethod: method,
6871
defaultEnabled: cfg.Enabled(),

coderd/notifications/manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type Manager struct {
4242
cfg codersdk.NotificationsConfig
4343

4444
store Store
45+
ps pubsub.Pubsub
4546
log slog.Logger
4647

4748
notifier *notifier
@@ -175,7 +176,7 @@ func (m *Manager) loop(ctx context.Context) error {
175176
var eg errgroup.Group
176177

177178
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
178-
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
179+
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.ps, m.handlers, m.helpers, m.metrics, m.clock)
179180
eg.Go(func() error {
180181
return m.notifier.run(m.success, m.failure)
181182
})

coderd/notifications/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestBufferedUpdates(t *testing.T) {
5353
}
5454

5555
mgr.WithHandlers(handlers)
56-
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
56+
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, ps, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
5757
require.NoError(t, err)
5858

5959
user := dbgen.User(t, store, database.User{})
@@ -110,7 +110,7 @@ func TestBuildPayload(t *testing.T) {
110110

111111
// nolint:gocritic // Unit test.
112112
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
113-
store, _ := dbtestutil.NewDB(t)
113+
store, ps := dbtestutil.NewDB(t)
114114
logger := testutil.Logger(t)
115115

116116
// GIVEN: a set of helpers to be injected into the templates
@@ -145,7 +145,7 @@ func TestBuildPayload(t *testing.T) {
145145
}
146146
})
147147

148-
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
148+
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, ps, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
149149
require.NoError(t, err)
150150

151151
// WHEN: a notification is enqueued

coderd/notifications/metrics_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestMetrics(t *testing.T) {
7171
database.NotificationMethodInbox: &fakeHandler{},
7272
})
7373

74-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
74+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
7575
require.NoError(t, err)
7676

7777
user := createSampleUser(t, store)
@@ -264,7 +264,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
264264
database.NotificationMethodInbox: inboxHandler,
265265
})
266266

267-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
267+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
268268
require.NoError(t, err)
269269

270270
user := createSampleUser(t, store)
@@ -354,7 +354,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
354354
database.NotificationMethodInbox: &fakeHandler{},
355355
})
356356

357-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
357+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
358358
require.NoError(t, err)
359359

360360
user := createSampleUser(t, store)
@@ -441,7 +441,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
441441
database.NotificationMethodInbox: &fakeHandler{},
442442
})
443443

444-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
444+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
445445
require.NoError(t, err)
446446

447447
user := createSampleUser(t, store)

coderd/notifications/notifications_test.go

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
8989
t.Cleanup(func() {
9090
assert.NoError(t, mgr.Stop(ctx))
9191
})
92-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
92+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
9393
require.NoError(t, err)
9494

9595
user := createSampleUser(t, store)
@@ -170,7 +170,7 @@ func TestSMTPDispatch(t *testing.T) {
170170
t.Cleanup(func() {
171171
assert.NoError(t, mgr.Stop(ctx))
172172
})
173-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
173+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
174174
require.NoError(t, err)
175175

176176
user := createSampleUser(t, store)
@@ -235,7 +235,7 @@ func TestWebhookDispatch(t *testing.T) {
235235
t.Cleanup(func() {
236236
assert.NoError(t, mgr.Stop(ctx))
237237
})
238-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
238+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
239239
require.NoError(t, err)
240240

241241
const (
@@ -326,7 +326,7 @@ func TestBackpressure(t *testing.T) {
326326
method: handler,
327327
database.NotificationMethodInbox: handler,
328328
})
329-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock)
329+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock)
330330
require.NoError(t, err)
331331

332332
user := createSampleUser(t, store)
@@ -477,7 +477,7 @@ func TestRetries(t *testing.T) {
477477
method: handler,
478478
database.NotificationMethodInbox: &fakeHandler{},
479479
})
480-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
480+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
481481
require.NoError(t, err)
482482

483483
user := createSampleUser(t, store)
@@ -541,7 +541,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
541541

542542
mgr, err := notifications.NewManager(cfg, noopInterceptor, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
543543
require.NoError(t, err)
544-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
544+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
545545
require.NoError(t, err)
546546

547547
user := createSampleUser(t, store)
@@ -666,7 +666,7 @@ func TestNotifierPaused(t *testing.T) {
666666
t.Cleanup(func() {
667667
assert.NoError(t, mgr.Stop(ctx))
668668
})
669-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
669+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
670670
require.NoError(t, err)
671671

672672
// Pause the notifier.
@@ -1386,6 +1386,7 @@ func TestNotificationTemplates_Golden(t *testing.T) {
13861386
smtpEnqueuer, err := notifications.NewStoreEnqueuer(
13871387
notificationCfg,
13881388
*db,
1389+
pubsub,
13891390
defaultHelpers(),
13901391
logger.Named("enqueuer"),
13911392
quartz.NewReal(),
@@ -1511,6 +1512,7 @@ func TestNotificationTemplates_Golden(t *testing.T) {
15111512
httpEnqueuer, err := notifications.NewStoreEnqueuer(
15121513
defaultNotificationsConfig(database.NotificationMethodWebhook),
15131514
*db,
1515+
pubsub,
15141516
defaultHelpers(),
15151517
logger.Named("enqueuer"),
15161518
quartz.NewReal(),
@@ -1614,11 +1616,11 @@ func TestDisabledByDefaultBeforeEnqueue(t *testing.T) {
16141616

16151617
// nolint:gocritic // Unit test.
16161618
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
1617-
store, _ := dbtestutil.NewDB(t)
1619+
store, pubsub := dbtestutil.NewDB(t)
16181620
logger := testutil.Logger(t)
16191621

16201622
cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
1621-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
1623+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
16221624
require.NoError(t, err)
16231625
user := createSampleUser(t, store)
16241626

@@ -1640,12 +1642,12 @@ func TestDisabledBeforeEnqueue(t *testing.T) {
16401642

16411643
// nolint:gocritic // Unit test.
16421644
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
1643-
store, _ := dbtestutil.NewDB(t)
1645+
store, pubsub := dbtestutil.NewDB(t)
16441646
logger := testutil.Logger(t)
16451647

16461648
// GIVEN: an enqueuer & a sample user
16471649
cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
1648-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
1650+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
16491651
require.NoError(t, err)
16501652
user := createSampleUser(t, store)
16511653

@@ -1688,7 +1690,7 @@ func TestDisabledAfterEnqueue(t *testing.T) {
16881690
assert.NoError(t, mgr.Stop(ctx))
16891691
})
16901692

1691-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
1693+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
16921694
require.NoError(t, err)
16931695
user := createSampleUser(t, store)
16941696

@@ -1797,7 +1799,7 @@ func TestCustomNotificationMethod(t *testing.T) {
17971799
_ = mgr.Stop(ctx)
17981800
})
17991801

1800-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
1802+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
18011803
require.NoError(t, err)
18021804

18031805
// WHEN: a notification of that template is enqueued, it should be delivered with the configured method - not the default.
@@ -1890,7 +1892,7 @@ func TestNotificationDuplicates(t *testing.T) {
18901892
mClock := quartz.NewMock(t)
18911893
mClock.Set(time.Date(2024, 1, 15, 9, 0, 0, 0, time.UTC))
18921894

1893-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock)
1895+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock)
18941896
require.NoError(t, err)
18951897
user := createSampleUser(t, store)
18961898

@@ -1916,12 +1918,12 @@ func TestNotificationDuplicates(t *testing.T) {
19161918
func TestNotificationMethodCannotDefaultToInbox(t *testing.T) {
19171919
t.Parallel()
19181920

1919-
store, _ := dbtestutil.NewDB(t)
1921+
store, pubsub := dbtestutil.NewDB(t)
19201922
logger := testutil.Logger(t)
19211923

19221924
cfg := defaultNotificationsConfig(database.NotificationMethodInbox)
19231925

1924-
_, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t))
1926+
_, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t))
19251927
require.ErrorIs(t, err, notifications.InvalidDefaultNotificationMethodError{Method: string(database.NotificationMethodInbox)})
19261928
}
19271929

@@ -1996,7 +1998,7 @@ func TestNotificationTargetMatrix(t *testing.T) {
19961998
mClock := quartz.NewMock(t)
19971999
mClock.Set(time.Date(2024, 1, 15, 9, 0, 0, 0, time.UTC))
19982000

1999-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock)
2001+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock)
20002002
require.NoError(t, err)
20012003
user := createSampleUser(t, store)
20022004

@@ -2017,7 +2019,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) {
20172019

20182020
// nolint:gocritic // Unit test.
20192021
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
2020-
store, _ := dbtestutil.NewDB(t)
2022+
store, pubsub := dbtestutil.NewDB(t)
20212023
logger := testutil.Logger(t)
20222024

20232025
// Given: Coder Inbox is enabled and SMTP/Webhook are disabled.
@@ -2026,7 +2028,7 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) {
20262028
cfg.SMTP = codersdk.NotificationsEmailConfig{}
20272029
cfg.Webhook = codersdk.NotificationsWebhookConfig{}
20282030

2029-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t))
2031+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t))
20302032
require.NoError(t, err)
20312033
user := createSampleUser(t, store)
20322034

@@ -2042,15 +2044,15 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) {
20422044

20432045
// nolint:gocritic // Unit test.
20442046
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
2045-
store, _ := dbtestutil.NewDB(t)
2047+
store, pubsub := dbtestutil.NewDB(t)
20462048
logger := testutil.Logger(t)
20472049

20482050
// Given: Coder Inbox/Webhook are disabled and SMTP is enabled.
20492051
cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
20502052
cfg.Inbox.Enabled = false
20512053
cfg.Webhook = codersdk.NotificationsWebhookConfig{}
20522054

2053-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t))
2055+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t))
20542056
require.NoError(t, err)
20552057
user := createSampleUser(t, store)
20562058

@@ -2066,15 +2068,15 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) {
20662068

20672069
// nolint:gocritic // Unit test.
20682070
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
2069-
store, _ := dbtestutil.NewDB(t)
2071+
store, pubsub := dbtestutil.NewDB(t)
20702072
logger := testutil.Logger(t)
20712073

20722074
// Given: Coder Inbox/SMTP are disabled and Webhook is enabled.
20732075
cfg := defaultNotificationsConfig(database.NotificationMethodWebhook)
20742076
cfg.Inbox.Enabled = false
20752077
cfg.SMTP = codersdk.NotificationsEmailConfig{}
20762078

2077-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t))
2079+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewMock(t))
20782080
require.NoError(t, err)
20792081
user := createSampleUser(t, store)
20802082

coderd/notifications/notifier.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"golang.org/x/xerrors"
1313

1414
"github.com/coder/coder/v2/coderd/database/dbtime"
15+
"github.com/coder/coder/v2/coderd/database/pubsub"
1516
"github.com/coder/coder/v2/coderd/notifications/dispatch"
1617
"github.com/coder/coder/v2/coderd/notifications/render"
1718
"github.com/coder/coder/v2/coderd/notifications/types"
@@ -52,6 +53,7 @@ type notifier struct {
5253
cfg codersdk.NotificationsConfig
5354
log slog.Logger
5455
store Store
56+
ps pubsub.Pubsub
5557

5658
stopOnce sync.Once
5759
outerCtx context.Context
@@ -67,8 +69,17 @@ type notifier struct {
6769
clock quartz.Clock
6870
}
6971

70-
func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
71-
hr map[database.NotificationMethod]Handler, helpers template.FuncMap, metrics *Metrics, clock quartz.Clock,
72+
func newNotifier(
73+
outerCtx context.Context,
74+
cfg codersdk.NotificationsConfig,
75+
id uuid.UUID,
76+
log slog.Logger,
77+
db Store,
78+
ps pubsub.Pubsub,
79+
hr map[database.NotificationMethod]Handler,
80+
helpers template.FuncMap,
81+
metrics *Metrics,
82+
clock quartz.Clock,
7283
) *notifier {
7384
gracefulCtx, gracefulCancel := context.WithCancel(outerCtx)
7485
return &notifier{
@@ -80,6 +91,7 @@ func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id
8091
gracefulCancel: gracefulCancel,
8192
done: make(chan any),
8293
store: db,
94+
ps: ps,
8395
handlers: hr,
8496
helpers: helpers,
8597
metrics: metrics,

0 commit comments

Comments
 (0)