Skip to content

Commit b2dab33

Browse files
authored
feat: implement observability of notifications subsystem (coder#13799)
1 parent a6d66cc commit b2dab33

22 files changed

+769
-186
lines changed

cli/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
983983
)
984984
if experiments.Enabled(codersdk.ExperimentNotifications) {
985985
cfg := options.DeploymentValues.Notifications
986+
metrics := notifications.NewMetrics(options.PrometheusRegistry)
986987

987988
// The enqueuer is responsible for enqueueing notifications to the given store.
988989
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"))
@@ -994,7 +995,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
994995
// The notification manager is responsible for:
995996
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
996997
// - keeping the store updated with status updates
997-
notificationsManager, err = notifications.NewManager(cfg, options.Database, logger.Named("notifications.manager"))
998+
notificationsManager, err = notifications.NewManager(cfg, options.Database, metrics, logger.Named("notifications.manager"))
998999
if err != nil {
9991000
return xerrors.Errorf("failed to instantiate notification manager: %w", err)
10001001
}

coderd/database/dbauthz/dbauthz.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,9 +1143,9 @@ func (q *querier) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context,
11431143
return q.db.DeleteWorkspaceAgentPortSharesByTemplate(ctx, templateID)
11441144
}
11451145

1146-
func (q *querier) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
1146+
func (q *querier) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) error {
11471147
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
1148-
return database.NotificationMessage{}, err
1148+
return err
11491149
}
11501150
return q.db.EnqueueNotificationMessage(ctx, arg)
11511151
}

coderd/database/dbmem/dbmem.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -935,12 +935,17 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
935935
q.mutex.Lock()
936936
defer q.mutex.Unlock()
937937

938-
var out []database.AcquireNotificationMessagesRow
939-
for _, nm := range q.notificationMessages {
940-
if len(out) >= int(arg.Count) {
941-
break
942-
}
938+
// Shift the first "Count" notifications off the slice (FIFO).
939+
sz := len(q.notificationMessages)
940+
if sz > int(arg.Count) {
941+
sz = int(arg.Count)
942+
}
943943

944+
list := q.notificationMessages[:sz]
945+
q.notificationMessages = q.notificationMessages[sz:]
946+
947+
var out []database.AcquireNotificationMessagesRow
948+
for _, nm := range list {
944949
acquirableStatuses := []database.NotificationMessageStatus{database.NotificationMessageStatusPending, database.NotificationMessageStatusTemporaryFailure}
945950
if !slices.Contains(acquirableStatuses, nm.Status) {
946951
continue
@@ -956,9 +961,9 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
956961
ID: nm.ID,
957962
Payload: nm.Payload,
958963
Method: nm.Method,
959-
CreatedBy: nm.CreatedBy,
960964
TitleTemplate: "This is a title with {{.Labels.variable}}",
961965
BodyTemplate: "This is a body with {{.Labels.variable}}",
966+
TemplateID: nm.NotificationTemplateID,
962967
})
963968
}
964969

@@ -1815,10 +1820,10 @@ func (q *FakeQuerier) DeleteWorkspaceAgentPortSharesByTemplate(_ context.Context
18151820
return nil
18161821
}
18171822

1818-
func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
1823+
func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) error {
18191824
err := validateDatabaseType(arg)
18201825
if err != nil {
1821-
return database.NotificationMessage{}, err
1826+
return err
18221827
}
18231828

18241829
q.mutex.Lock()
@@ -1827,7 +1832,7 @@ func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database
18271832
var payload types.MessagePayload
18281833
err = json.Unmarshal(arg.Payload, &payload)
18291834
if err != nil {
1830-
return database.NotificationMessage{}, err
1835+
return err
18311836
}
18321837

18331838
nm := database.NotificationMessage{
@@ -1845,7 +1850,7 @@ func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database
18451850

18461851
q.notificationMessages = append(q.notificationMessages, nm)
18471852

1848-
return nm, err
1853+
return err
18491854
}
18501855

18511856
func (q *FakeQuerier) FavoriteWorkspace(_ context.Context, arg uuid.UUID) error {

coderd/database/dbmetrics/dbmetrics.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

Lines changed: 3 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dump.sql

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE notification_messages
2+
DROP COLUMN IF EXISTS queued_seconds;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE notification_messages
2+
ADD COLUMN queued_seconds FLOAT NULL;

coderd/database/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/querier.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 23 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/notifications.sql

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@ FROM notification_templates nt,
1010
WHERE nt.id = @notification_template_id
1111
AND u.id = @user_id;
1212

13-
-- name: EnqueueNotificationMessage :one
13+
-- name: EnqueueNotificationMessage :exec
1414
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
1515
VALUES (@id,
1616
@notification_template_id,
1717
@user_id,
1818
@method::notification_method,
1919
@payload::jsonb,
2020
@targets,
21-
@created_by)
22-
RETURNING *;
21+
@created_by);
2322

2423
-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
2524
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.
@@ -36,7 +35,8 @@ RETURNING *;
3635
WITH acquired AS (
3736
UPDATE
3837
notification_messages
39-
SET updated_at = NOW(),
38+
SET queued_seconds = GREATEST(0, EXTRACT(EPOCH FROM (NOW() - updated_at)))::FLOAT,
39+
updated_at = NOW(),
4040
status = 'leased'::notification_message_status,
4141
status_reason = 'Leased by notifier ' || sqlc.arg('notifier_id')::uuid,
4242
leased_until = NOW() + CONCAT(sqlc.arg('lease_seconds')::int, ' seconds')::interval
@@ -78,16 +78,19 @@ SELECT
7878
nm.id,
7979
nm.payload,
8080
nm.method,
81-
nm.created_by,
81+
nm.attempt_count::int AS attempt_count,
82+
nm.queued_seconds::float AS queued_seconds,
8283
-- template
84+
nt.id AS template_id,
8385
nt.title_template,
8486
nt.body_template
8587
FROM acquired nm
8688
JOIN notification_templates nt ON nm.notification_template_id = nt.id;
8789

8890
-- name: BulkMarkNotificationMessagesFailed :execrows
8991
UPDATE notification_messages
90-
SET updated_at = subquery.failed_at,
92+
SET queued_seconds = 0,
93+
updated_at = subquery.failed_at,
9194
attempt_count = attempt_count + 1,
9295
status = CASE
9396
WHEN attempt_count + 1 < @max_attempts::int THEN subquery.status
@@ -105,13 +108,14 @@ WHERE notification_messages.id = subquery.id;
105108

106109
-- name: BulkMarkNotificationMessagesSent :execrows
107110
UPDATE notification_messages
108-
SET updated_at = new_values.sent_at,
111+
SET queued_seconds = 0,
112+
updated_at = new_values.sent_at,
109113
attempt_count = attempt_count + 1,
110114
status = 'sent'::notification_message_status,
111115
status_reason = NULL,
112116
leased_until = NULL,
113117
next_retry_after = NULL
114-
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
118+
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
115119
UNNEST(@sent_ats::timestamptz[]) AS sent_at)
116120
AS new_values
117121
WHERE notification_messages.id = new_values.id;

coderd/notifications/enqueuer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
5959
}
6060

6161
id := uuid.New()
62-
msg, err := s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
62+
err = s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
6363
ID: id,
6464
UserID: userID,
6565
NotificationTemplateID: templateID,
@@ -73,7 +73,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
7373
return nil, xerrors.Errorf("enqueue notification: %w", err)
7474
}
7575

76-
s.log.Debug(ctx, "enqueued notification", slog.F("msg_id", msg.ID))
76+
s.log.Debug(ctx, "enqueued notification", slog.F("msg_id", id))
7777
return &id, nil
7878
}
7979

0 commit comments

Comments
 (0)