Skip to content

Commit 62f6dec

Browse files
committed
Dispatch notification using custom method
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent f59e320 commit 62f6dec

File tree

6 files changed

+156
-51
lines changed

6 files changed

+156
-51
lines changed

coderd/database/queries.sql.go

Lines changed: 12 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/notifications.sql

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
-- This is used to build up the notification_message's JSON payload.
33
SELECT nt.name AS notification_name,
44
nt.actions AS actions,
5+
nt.method AS custom_method,
56
u.id AS user_id,
67
u.email AS user_email,
78
COALESCE(NULLIF(u.name, ''), NULLIF(u.username, ''))::text AS user_name,
8-
COALESCE(u.username, '') AS user_username
9+
u.username AS user_username
910
FROM notification_templates nt,
1011
users u
1112
WHERE nt.id = @notification_template_id
@@ -167,5 +168,6 @@ FROM notification_templates
167168
WHERE id = @id::uuid;
168169

169170
-- name: GetNotificationTemplatesByKind :many
170-
SELECT * FROM notification_templates
171+
SELECT *
172+
FROM notification_templates
171173
WHERE kind = @kind::notification_template_kind;

coderd/notifications/enqueuer.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ type StoreEnqueuer struct {
2020
store Store
2121
log slog.Logger
2222

23-
// TODO: expand this to allow for each notification to have custom delivery methods, or multiple, or none.
24-
// For example, Larry might want email notifications for "workspace deleted" notifications, but Harry wants
25-
// Slack notifications, and Mary doesn't want any.
26-
method database.NotificationMethod
23+
defaultMethod database.NotificationMethod
2724
// helpers holds a map of template funcs which are used when rendering templates. These need to be passed in because
2825
// the template funcs will return values which are inappropriately encapsulated in this struct.
2926
helpers template.FuncMap
@@ -37,17 +34,31 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem
3734
}
3835

3936
return &StoreEnqueuer{
40-
store: store,
41-
log: log,
42-
method: method,
43-
helpers: helpers,
37+
store: store,
38+
log: log,
39+
defaultMethod: method,
40+
helpers: helpers,
4441
}, nil
4542
}
4643

4744
// Enqueue queues a notification message for later delivery.
4845
// Messages will be dequeued by a notifier later and dispatched.
4946
func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, createdBy string, targets ...uuid.UUID) (*uuid.UUID, error) {
50-
payload, err := s.buildPayload(ctx, userID, templateID, labels)
47+
metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{
48+
UserID: userID,
49+
NotificationTemplateID: templateID,
50+
})
51+
if err != nil {
52+
s.log.Warn(ctx, "failed to fetch message metadata", slog.F("template_id", templateID), slog.F("user_id", userID), slog.Error(err))
53+
return nil, xerrors.Errorf("new message metadata: %w", err)
54+
}
55+
56+
dispatchMethod := s.defaultMethod
57+
if metadata.CustomMethod.Valid {
58+
dispatchMethod = metadata.CustomMethod.NotificationMethod
59+
}
60+
61+
payload, err := s.buildPayload(metadata, labels)
5162
if err != nil {
5263
s.log.Warn(ctx, "failed to build payload", slog.F("template_id", templateID), slog.F("user_id", userID), slog.Error(err))
5364
return nil, xerrors.Errorf("enqueue notification (payload build): %w", err)
@@ -63,7 +74,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
6374
ID: id,
6475
UserID: userID,
6576
NotificationTemplateID: templateID,
66-
Method: s.method,
77+
Method: dispatchMethod,
6778
Payload: input,
6879
Targets: targets,
6980
CreatedBy: createdBy,
@@ -80,15 +91,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
8091
// buildPayload creates the payload that the notification will for variable substitution and/or routing.
8192
// The payload contains information about the recipient, the event that triggered the notification, and any subsequent
8293
// actions which can be taken by the recipient.
83-
func (s *StoreEnqueuer) buildPayload(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string) (*types.MessagePayload, error) {
84-
metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{
85-
UserID: userID,
86-
NotificationTemplateID: templateID,
87-
})
88-
if err != nil {
89-
return nil, xerrors.Errorf("new message metadata: %w", err)
90-
}
91-
94+
func (s *StoreEnqueuer) buildPayload(metadata database.FetchNewMessageMetadataRow, labels map[string]string) (*types.MessagePayload, error) {
9295
payload := types.MessagePayload{
9396
Version: "1.0",
9497

coderd/notifications/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (m *Manager) loop(ctx context.Context) error {
149149
var eg errgroup.Group
150150

151151
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
152-
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.method, m.metrics)
152+
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics)
153153
eg.Go(func() error {
154154
return m.notifier.run(ctx, m.success, m.failure)
155155
})

coderd/notifications/notifications_test.go

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ func TestNotifierPaused(t *testing.T) {
604604
}, testutil.WaitShort, testutil.IntervalFast)
605605
}
606606

607-
func TestNotifcationTemplatesBody(t *testing.T) {
607+
func TestNotificationTemplatesBody(t *testing.T) {
608608
t.Parallel()
609609

610610
if !dbtestutil.WillUsePostgres() {
@@ -705,6 +705,104 @@ func TestNotifcationTemplatesBody(t *testing.T) {
705705
}
706706
}
707707

708+
func TestCustomNotificationMethod(t *testing.T) {
709+
t.Parallel()
710+
711+
// SETUP
712+
if !dbtestutil.WillUsePostgres() {
713+
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
714+
}
715+
716+
ctx, logger, db := setup(t)
717+
718+
received := make(chan uuid.UUID, 1)
719+
720+
// SETUP:
721+
// Start mock server to simulate webhook endpoint.
722+
mockWebhookSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
723+
var payload dispatch.WebhookPayload
724+
err := json.NewDecoder(r.Body).Decode(&payload)
725+
assert.NoError(t, err)
726+
727+
received <- payload.MsgID
728+
close(received)
729+
730+
w.WriteHeader(http.StatusOK)
731+
_, err = w.Write([]byte("noted."))
732+
require.NoError(t, err)
733+
}))
734+
defer mockWebhookSrv.Close()
735+
736+
// Start mock SMTP server.
737+
mockSMTPSrv := smtpmock.New(smtpmock.ConfigurationAttr{
738+
LogToStdout: false,
739+
LogServerActivity: true,
740+
})
741+
require.NoError(t, mockSMTPSrv.Start())
742+
t.Cleanup(func() {
743+
assert.NoError(t, mockSMTPSrv.Stop())
744+
})
745+
746+
endpoint, err := url.Parse(mockWebhookSrv.URL)
747+
require.NoError(t, err)
748+
749+
// GIVEN: a notification template which has a method explicitly set
750+
var (
751+
template = notifications.TemplateWorkspaceDormant
752+
defaultMethod = database.NotificationMethodSmtp
753+
customMethod = database.NotificationMethodWebhook
754+
)
755+
out, err := db.UpdateNotificationTemplateMethodByID(ctx, database.UpdateNotificationTemplateMethodByIDParams{
756+
ID: template,
757+
Method: database.NullNotificationMethod{NotificationMethod: customMethod, Valid: true},
758+
})
759+
require.NoError(t, err)
760+
require.Equal(t, customMethod, out.Method.NotificationMethod)
761+
762+
// GIVEN: a manager configured with multiple dispatch methods
763+
cfg := defaultNotificationsConfig(defaultMethod)
764+
cfg.SMTP = codersdk.NotificationsEmailConfig{
765+
From: "danny@coder.com",
766+
Hello: "localhost",
767+
Smarthost: serpent.HostPort{Host: "localhost", Port: fmt.Sprintf("%d", mockSMTPSrv.PortNumber())},
768+
}
769+
cfg.Webhook = codersdk.NotificationsWebhookConfig{
770+
Endpoint: *serpent.URLOf(endpoint),
771+
}
772+
773+
mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager"))
774+
require.NoError(t, err)
775+
t.Cleanup(func() {
776+
_ = mgr.Stop(ctx)
777+
})
778+
779+
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger)
780+
require.NoError(t, err)
781+
782+
// WHEN: a notification of that template is enqueued, it should be delivered with the configured method - not the default.
783+
user := createSampleUser(t, db)
784+
msgID, err := enq.Enqueue(ctx, user.ID, template, map[string]string{}, "test")
785+
786+
// THEN: the notification should be received by the custom dispatch method
787+
mgr.Run(ctx)
788+
789+
receivedMsgID := testutil.RequireRecvCtx(ctx, t, received)
790+
require.Equal(t, msgID.String(), receivedMsgID.String())
791+
792+
// Ensure no messages received by default method (SMTP):
793+
msgs := mockSMTPSrv.MessagesAndPurge()
794+
require.Len(t, msgs, 0)
795+
796+
// Enqueue a notification which does not have a custom method set to ensure default works correctly.
797+
msgID, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test")
798+
require.NoError(t, err)
799+
require.EventuallyWithT(t, func(ct *assert.CollectT) {
800+
msgs := mockSMTPSrv.MessagesAndPurge()
801+
assert.Len(ct, msgs, 1)
802+
assert.Contains(ct, msgs[0].MsgRequest(), fmt.Sprintf("Message-Id: %s", msgID))
803+
}, testutil.WaitLong, testutil.IntervalFast)
804+
}
805+
708806
type fakeHandler struct {
709807
mu sync.RWMutex
710808
succeeded, failed []string

coderd/notifications/notifier.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,21 @@ type notifier struct {
3333
quit chan any
3434
done chan any
3535

36-
method database.NotificationMethod
37-
handlers map[database.NotificationMethod]Handler
36+
handlers map[database.NotificationMethod]Handler
3837
metrics *Metrics
3938
}
4039

41-
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, method database.NotificationMethod, metrics *Metrics) *notifier {
40+
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, metrics *Metrics) *notifier {
4241
return &notifier{
43-
id: id,
44-
cfg: cfg,
45-
log: log.Named("notifier").With(slog.F("notifier_id", id)),
46-
quit: make(chan any),
47-
done: make(chan any),
48-
tick: time.NewTicker(cfg.FetchInterval.Value()),
49-
store: db,
50-
handlers: hr,
51-
method: method,
52-
metrics: metrics,
42+
id: id,
43+
cfg: cfg,
44+
log: log.Named("notifier").With(slog.F("notifier_id", id)),
45+
quit: make(chan any),
46+
done: make(chan any),
47+
tick: time.NewTicker(cfg.FetchInterval.Value()),
48+
store: db,
49+
handlers: hr,
50+
metrics: metrics,
5351
}
5452
}
5553

@@ -234,17 +232,17 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
234232
logger := n.log.With(slog.F("msg_id", msg.ID), slog.F("method", msg.Method), slog.F("attempt", msg.AttemptCount+1))
235233

236234
if msg.AttemptCount > 0 {
237-
n.metrics.RetryCount.WithLabelValues(string(n.method), msg.TemplateID.String()).Inc()
235+
n.metrics.RetryCount.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Inc()
238236
}
239237

240-
n.metrics.InflightDispatches.WithLabelValues(string(n.method), msg.TemplateID.String()).Inc()
241-
n.metrics.QueuedSeconds.WithLabelValues(string(n.method)).Observe(msg.QueuedSeconds)
238+
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Inc()
239+
n.metrics.QueuedSeconds.WithLabelValues(string(msg.Method)).Observe(msg.QueuedSeconds)
242240

243241
start := time.Now()
244242
retryable, err := deliver(ctx, msg.ID)
245243

246-
n.metrics.DispatcherSendSeconds.WithLabelValues(string(n.method)).Observe(time.Since(start).Seconds())
247-
n.metrics.InflightDispatches.WithLabelValues(string(n.method), msg.TemplateID.String()).Dec()
244+
n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(time.Since(start).Seconds())
245+
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Dec()
248246

249247
if err != nil {
250248
// Don't try to accumulate message responses if the context has been canceled.
@@ -281,7 +279,7 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
281279
}
282280

283281
func (n *notifier) newSuccessfulDispatch(msg database.AcquireNotificationMessagesRow) dispatchResult {
284-
n.metrics.DispatchAttempts.WithLabelValues(string(n.method), msg.TemplateID.String(), ResultSuccess).Inc()
282+
n.metrics.DispatchAttempts.WithLabelValues(string(msg.Method), msg.TemplateID.String(), ResultSuccess).Inc()
285283

286284
return dispatchResult{
287285
notifier: n.id,
@@ -301,7 +299,7 @@ func (n *notifier) newFailedDispatch(msg database.AcquireNotificationMessagesRow
301299
result = ResultPermFail
302300
}
303301

304-
n.metrics.DispatchAttempts.WithLabelValues(string(n.method), msg.TemplateID.String(), result).Inc()
302+
n.metrics.DispatchAttempts.WithLabelValues(string(msg.Method), msg.TemplateID.String(), result).Inc()
305303

306304
return dispatchResult{
307305
notifier: n.id,

0 commit comments

Comments
 (0)