Skip to content

feat(coderd/notifications): notify pubsub on enqueue #17412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
helpers := templateHelpers(options)

// The enqueuer is responsible for enqueueing notifications to the given store.
enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal())
enqueuer, err := notifications.NewStoreEnqueuer(notificationsCfg, options.Database, options.Pubsub, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal())
if err != nil {
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
}
Expand Down
12 changes: 11 additions & 1 deletion coderd/notifications/enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/notifications/render"
"github.com/coder/coder/v2/coderd/notifications/types"
"github.com/coder/coder/v2/codersdk"
Expand All @@ -26,6 +27,8 @@ var (
ErrDuplicate = xerrors.New("duplicate notification")
)

const EventNotificationEnqueued = "notification_enqueued"

type InvalidDefaultNotificationMethodError struct {
Method string
}
Expand All @@ -36,6 +39,7 @@ func (e InvalidDefaultNotificationMethodError) Error() string {

type StoreEnqueuer struct {
store Store
ps pubsub.Pubsub
log slog.Logger

defaultMethod database.NotificationMethod
Expand All @@ -50,7 +54,7 @@ type StoreEnqueuer struct {
}

// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store.
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
var method database.NotificationMethod
// TODO(DanielleMaywood):
// Currently we do not want to allow setting `inbox` as the default notification method.
Expand All @@ -63,6 +67,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem

return &StoreEnqueuer{
store: store,
ps: ps,
log: log,
defaultMethod: method,
defaultEnabled: cfg.Enabled(),
Expand Down Expand Up @@ -159,6 +164,11 @@ func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID
}

s.log.Debug(ctx, "enqueued notification", slog.F("msg_ids", uuids))
// Publish an event to notify that a notification has been enqueued.
// Failure to publish is acceptable, as the fetcher will still process the
// message on its next run.
// TODO(Cian): debounce this to maybe once per second or so?
_ = s.ps.Publish(EventNotificationEnqueued, nil)
return uuids, nil
}

Expand Down
23 changes: 23 additions & 0 deletions coderd/notifications/fetcher_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/database/dbmock"
"github.com/coder/coder/v2/coderd/database/pubsub/psmock"
)

func TestNotifier_FetchHelpers(t *testing.T) {
Expand All @@ -21,9 +22,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
helpers: template.FuncMap{},
}

Expand All @@ -48,9 +51,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
helpers: template.FuncMap{},
}

Expand All @@ -67,9 +72,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
helpers: template.FuncMap{},
}

Expand All @@ -90,9 +97,11 @@ func TestNotifier_FetchAppName(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
}

dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("ACME Inc.", nil)
Expand All @@ -107,9 +116,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
}

dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", sql.ErrNoRows)
Expand All @@ -125,9 +136,11 @@ func TestNotifier_FetchAppName(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
}

dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", nil)
Expand All @@ -143,9 +156,11 @@ func TestNotifier_FetchAppName(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
}

dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", xerrors.New("internal error"))
Expand All @@ -164,9 +179,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
}

dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("https://example.com/logo.png", nil)
Expand All @@ -181,9 +198,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
}

dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", sql.ErrNoRows)
Expand All @@ -199,9 +218,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
}

dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", nil)
Expand All @@ -217,9 +238,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {

ctrl := gomock.NewController(t)
dbmock := dbmock.NewMockStore(ctrl)
psmock := psmock.NewMockPubsub(ctrl)

n := &notifier{
store: dbmock,
ps: psmock,
}

dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", xerrors.New("internal error"))
Expand Down
4 changes: 3 additions & 1 deletion coderd/notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Manager struct {
cfg codersdk.NotificationsConfig

store Store
ps pubsub.Pubsub
log slog.Logger

notifier *notifier
Expand Down Expand Up @@ -93,6 +94,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub,
log: log,
cfg: cfg,
store: store,
ps: ps,

// Buffer successful/failed notification dispatches in memory to reduce load on the store.
//
Expand Down Expand Up @@ -175,7 +177,7 @@ func (m *Manager) loop(ctx context.Context) error {
var eg errgroup.Group

// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.ps, m.handlers, m.helpers, m.metrics, m.clock)
eg.Go(func() error {
return m.notifier.run(m.success, m.failure)
})
Expand Down
6 changes: 3 additions & 3 deletions coderd/notifications/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestBufferedUpdates(t *testing.T) {
}

mgr.WithHandlers(handlers)
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, ps, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := dbgen.User(t, store, database.User{})
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestBuildPayload(t *testing.T) {

// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, ps := dbtestutil.NewDB(t)
logger := testutil.Logger(t)

// GIVEN: a set of helpers to be injected into the templates
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestBuildPayload(t *testing.T) {
}
})

enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, ps, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
require.NoError(t, err)

// WHEN: a notification is enqueued
Expand Down
8 changes: 4 additions & 4 deletions coderd/notifications/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestMetrics(t *testing.T) {
database.NotificationMethodInbox: &fakeHandler{},
})

enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, store)
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
database.NotificationMethodInbox: inboxHandler,
})

enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, store)
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
database.NotificationMethodInbox: &fakeHandler{},
})

enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, store)
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
database.NotificationMethodInbox: &fakeHandler{},
})

enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, store)
Expand Down
Loading
Loading