Skip to content

Commit 522181f

Browse files
authored
feat(coderd): add new dispatch logic for coder inbox (coder#16764)
This PR is [resolving the dispatch part of Coder Inbocx](coder/internal#403). Since the DB layer has been merged - we now want to insert notifications into Coder Inbox in parallel of the other delivery target. To do so, we push two messages instead of one using the `Enqueue` method.
1 parent 32450a2 commit 522181f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+415
-120
lines changed

coderd/database/dump.sql

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- The migration is about an enum value change
2+
-- As we can not remove a value from an enum, we can let the down migration empty
3+
-- In order to avoid any failure, we use ADD VALUE IF NOT EXISTS to add the value
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TYPE notification_method ADD VALUE IF NOT EXISTS 'inbox';

coderd/database/models.go

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/notifications.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ SELECT
8484
nm.method,
8585
nm.attempt_count::int AS attempt_count,
8686
nm.queued_seconds::float AS queued_seconds,
87+
nm.targets,
8788
-- template
8889
nt.id AS template_id,
8990
nt.title_template,

coderd/notifications.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ func (api *API) systemNotificationTemplates(rw http.ResponseWriter, r *http.Requ
157157
func (api *API) notificationDispatchMethods(rw http.ResponseWriter, r *http.Request) {
158158
var methods []string
159159
for _, nm := range database.AllNotificationMethodValues() {
160+
// Skip inbox method as for now this is an implicit delivery target and should not appear
161+
// anywhere in the Web UI.
162+
if nm == database.NotificationMethodInbox {
163+
continue
164+
}
160165
methods = append(methods, string(nm))
161166
}
162167

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package dispatch
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"text/template"
7+
8+
"golang.org/x/xerrors"
9+
10+
"cdr.dev/slog"
11+
12+
"github.com/google/uuid"
13+
14+
"github.com/coder/coder/v2/coderd/database"
15+
"github.com/coder/coder/v2/coderd/database/dbtime"
16+
"github.com/coder/coder/v2/coderd/notifications/types"
17+
markdown "github.com/coder/coder/v2/coderd/render"
18+
)
19+
20+
type InboxStore interface {
21+
InsertInboxNotification(ctx context.Context, arg database.InsertInboxNotificationParams) (database.InboxNotification, error)
22+
}
23+
24+
// InboxHandler is responsible for dispatching notification messages to the Coder Inbox.
25+
type InboxHandler struct {
26+
log slog.Logger
27+
store InboxStore
28+
}
29+
30+
func NewInboxHandler(log slog.Logger, store InboxStore) *InboxHandler {
31+
return &InboxHandler{log: log, store: store}
32+
}
33+
34+
func (s *InboxHandler) Dispatcher(payload types.MessagePayload, titleTmpl, bodyTmpl string, _ template.FuncMap) (DeliveryFunc, error) {
35+
subject, err := markdown.PlaintextFromMarkdown(titleTmpl)
36+
if err != nil {
37+
return nil, xerrors.Errorf("render subject: %w", err)
38+
}
39+
40+
htmlBody, err := markdown.PlaintextFromMarkdown(bodyTmpl)
41+
if err != nil {
42+
return nil, xerrors.Errorf("render html body: %w", err)
43+
}
44+
45+
return s.dispatch(payload, subject, htmlBody), nil
46+
}
47+
48+
func (s *InboxHandler) dispatch(payload types.MessagePayload, title, body string) DeliveryFunc {
49+
return func(ctx context.Context, msgID uuid.UUID) (bool, error) {
50+
userID, err := uuid.Parse(payload.UserID)
51+
if err != nil {
52+
return false, xerrors.Errorf("parse user ID: %w", err)
53+
}
54+
templateID, err := uuid.Parse(payload.NotificationTemplateID)
55+
if err != nil {
56+
return false, xerrors.Errorf("parse template ID: %w", err)
57+
}
58+
59+
actions, err := json.Marshal(payload.Actions)
60+
if err != nil {
61+
return false, xerrors.Errorf("marshal actions: %w", err)
62+
}
63+
64+
// nolint:exhaustruct
65+
_, err = s.store.InsertInboxNotification(ctx, database.InsertInboxNotificationParams{
66+
ID: msgID,
67+
UserID: userID,
68+
TemplateID: templateID,
69+
Targets: payload.Targets,
70+
Title: title,
71+
Content: body,
72+
Actions: actions,
73+
CreatedAt: dbtime.Now(),
74+
})
75+
if err != nil {
76+
return false, xerrors.Errorf("insert inbox notification: %w", err)
77+
}
78+
79+
return false, nil
80+
}
81+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package dispatch_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"cdr.dev/slog"
8+
"cdr.dev/slog/sloggers/slogtest"
9+
10+
"github.com/google/uuid"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/coder/coder/v2/coderd/database"
14+
"github.com/coder/coder/v2/coderd/database/dbgen"
15+
"github.com/coder/coder/v2/coderd/database/dbtestutil"
16+
"github.com/coder/coder/v2/coderd/notifications"
17+
"github.com/coder/coder/v2/coderd/notifications/dispatch"
18+
"github.com/coder/coder/v2/coderd/notifications/types"
19+
)
20+
21+
func TestInbox(t *testing.T) {
22+
t.Parallel()
23+
24+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
25+
tests := []struct {
26+
name string
27+
msgID uuid.UUID
28+
payload types.MessagePayload
29+
expectedErr string
30+
expectedRetry bool
31+
}{
32+
{
33+
name: "OK",
34+
msgID: uuid.New(),
35+
payload: types.MessagePayload{
36+
NotificationName: "test",
37+
NotificationTemplateID: notifications.TemplateWorkspaceDeleted.String(),
38+
UserID: "valid",
39+
Actions: []types.TemplateAction{
40+
{
41+
Label: "View my workspace",
42+
URL: "https://coder.com/workspaces/1",
43+
},
44+
},
45+
},
46+
},
47+
{
48+
name: "InvalidUserID",
49+
payload: types.MessagePayload{
50+
NotificationName: "test",
51+
NotificationTemplateID: notifications.TemplateWorkspaceDeleted.String(),
52+
UserID: "invalid",
53+
Actions: []types.TemplateAction{},
54+
},
55+
expectedErr: "parse user ID",
56+
expectedRetry: false,
57+
},
58+
{
59+
name: "InvalidTemplateID",
60+
payload: types.MessagePayload{
61+
NotificationName: "test",
62+
NotificationTemplateID: "invalid",
63+
UserID: "valid",
64+
Actions: []types.TemplateAction{},
65+
},
66+
expectedErr: "parse template ID",
67+
expectedRetry: false,
68+
},
69+
}
70+
71+
for _, tc := range tests {
72+
tc := tc
73+
t.Run(tc.name, func(t *testing.T) {
74+
t.Parallel()
75+
76+
db, _ := dbtestutil.NewDB(t)
77+
78+
if tc.payload.UserID == "valid" {
79+
user := dbgen.User(t, db, database.User{})
80+
tc.payload.UserID = user.ID.String()
81+
}
82+
83+
ctx := context.Background()
84+
85+
handler := dispatch.NewInboxHandler(logger.Named("smtp"), db)
86+
dispatcherFunc, err := handler.Dispatcher(tc.payload, "", "", nil)
87+
require.NoError(t, err)
88+
89+
retryable, err := dispatcherFunc(ctx, tc.msgID)
90+
91+
if tc.expectedErr != "" {
92+
require.ErrorContains(t, err, tc.expectedErr)
93+
require.Equal(t, tc.expectedRetry, retryable)
94+
} else {
95+
require.NoError(t, err)
96+
require.False(t, retryable)
97+
uid := uuid.MustParse(tc.payload.UserID)
98+
notifs, err := db.GetInboxNotificationsByUserID(ctx, database.GetInboxNotificationsByUserIDParams{
99+
UserID: uid,
100+
ReadStatus: database.InboxNotificationReadStatusAll,
101+
})
102+
103+
require.NoError(t, err)
104+
require.Len(t, notifs, 1)
105+
require.Equal(t, tc.msgID, notifs[0].ID)
106+
}
107+
})
108+
}
109+
}

coderd/notifications/enqueuer.go

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem
5353
}
5454

5555
// Enqueue queues a notification message for later delivery, assumes no structured input data.
56-
func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, createdBy string, targets ...uuid.UUID) (*uuid.UUID, error) {
56+
func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, createdBy string, targets ...uuid.UUID) ([]uuid.UUID, error) {
5757
return s.EnqueueWithData(ctx, userID, templateID, labels, nil, createdBy, targets...)
5858
}
5959

6060
// Enqueue queues a notification message for later delivery.
6161
// Messages will be dequeued by a notifier later and dispatched.
62-
func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, data map[string]any, createdBy string, targets ...uuid.UUID) (*uuid.UUID, error) {
62+
func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, data map[string]any, createdBy string, targets ...uuid.UUID) ([]uuid.UUID, error) {
6363
metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{
6464
UserID: userID,
6565
NotificationTemplateID: templateID,
@@ -85,40 +85,48 @@ func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID
8585
return nil, xerrors.Errorf("failed encoding input labels: %w", err)
8686
}
8787

88-
id := uuid.New()
89-
err = s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
90-
ID: id,
91-
UserID: userID,
92-
NotificationTemplateID: templateID,
93-
Method: dispatchMethod,
94-
Payload: input,
95-
Targets: targets,
96-
CreatedBy: createdBy,
97-
CreatedAt: dbtime.Time(s.clock.Now().UTC()),
98-
})
99-
if err != nil {
100-
// We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages
101-
// from being enqueued if the user has disabled them via notification_preferences. The trigger will fail the insertion
102-
// with the message "cannot enqueue message: user has disabled this notification".
103-
//
104-
// This is more efficient than fetching the user's preferences for each enqueue, and centralizes the business logic.
105-
if strings.Contains(err.Error(), ErrCannotEnqueueDisabledNotification.Error()) {
106-
return nil, ErrCannotEnqueueDisabledNotification
107-
}
108-
109-
// If the enqueue fails due to a dedupe hash conflict, this means that a notification has already been enqueued
110-
// today with identical properties. It's far simpler to prevent duplicate sends in this central manner, rather than
111-
// having each notification enqueue handle its own logic.
112-
if database.IsUniqueViolation(err, database.UniqueNotificationMessagesDedupeHashIndex) {
113-
return nil, ErrDuplicate
88+
uuids := make([]uuid.UUID, 0, 2)
89+
// All the enqueued messages are enqueued both on the dispatch method set by the user (or default one) and the inbox.
90+
// As the inbox is not configurable per the user and is always enabled, we always enqueue the message on the inbox.
91+
// The logic is done here in order to have two completely separated processing and retries are handled separately.
92+
for _, method := range []database.NotificationMethod{dispatchMethod, database.NotificationMethodInbox} {
93+
id := uuid.New()
94+
err = s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
95+
ID: id,
96+
UserID: userID,
97+
NotificationTemplateID: templateID,
98+
Method: method,
99+
Payload: input,
100+
Targets: targets,
101+
CreatedBy: createdBy,
102+
CreatedAt: dbtime.Time(s.clock.Now().UTC()),
103+
})
104+
if err != nil {
105+
// We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages
106+
// from being enqueued if the user has disabled them via notification_preferences. The trigger will fail the insertion
107+
// with the message "cannot enqueue message: user has disabled this notification".
108+
//
109+
// This is more efficient than fetching the user's preferences for each enqueue, and centralizes the business logic.
110+
if strings.Contains(err.Error(), ErrCannotEnqueueDisabledNotification.Error()) {
111+
return nil, ErrCannotEnqueueDisabledNotification
112+
}
113+
114+
// If the enqueue fails due to a dedupe hash conflict, this means that a notification has already been enqueued
115+
// today with identical properties. It's far simpler to prevent duplicate sends in this central manner, rather than
116+
// having each notification enqueue handle its own logic.
117+
if database.IsUniqueViolation(err, database.UniqueNotificationMessagesDedupeHashIndex) {
118+
return nil, ErrDuplicate
119+
}
120+
121+
s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err))
122+
return nil, xerrors.Errorf("enqueue notification: %w", err)
114123
}
115124

116-
s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err))
117-
return nil, xerrors.Errorf("enqueue notification: %w", err)
125+
uuids = append(uuids, id)
118126
}
119127

120-
s.log.Debug(ctx, "enqueued notification", slog.F("msg_id", id))
121-
return &id, nil
128+
s.log.Debug(ctx, "enqueued notification", slog.F("msg_ids", uuids))
129+
return uuids, nil
122130
}
123131

124132
// buildPayload creates the payload that the notification will for variable substitution and/or routing.
@@ -165,12 +173,12 @@ func NewNoopEnqueuer() *NoopEnqueuer {
165173
return &NoopEnqueuer{}
166174
}
167175

168-
func (*NoopEnqueuer) Enqueue(context.Context, uuid.UUID, uuid.UUID, map[string]string, string, ...uuid.UUID) (*uuid.UUID, error) {
176+
func (*NoopEnqueuer) Enqueue(context.Context, uuid.UUID, uuid.UUID, map[string]string, string, ...uuid.UUID) ([]uuid.UUID, error) {
169177
// nolint:nilnil // irrelevant.
170178
return nil, nil
171179
}
172180

173-
func (*NoopEnqueuer) EnqueueWithData(context.Context, uuid.UUID, uuid.UUID, map[string]string, map[string]any, string, ...uuid.UUID) (*uuid.UUID, error) {
181+
func (*NoopEnqueuer) EnqueueWithData(context.Context, uuid.UUID, uuid.UUID, map[string]string, map[string]any, string, ...uuid.UUID) ([]uuid.UUID, error) {
174182
// nolint:nilnil // irrelevant.
175183
return nil, nil
176184
}

0 commit comments

Comments
 (0)