diff --git a/cli/server.go b/cli/server.go index af8ced3f24295..dc713c0cb765c 100644 --- a/cli/server.go +++ b/cli/server.go @@ -983,6 +983,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. ) if experiments.Enabled(codersdk.ExperimentNotifications) { cfg := options.DeploymentValues.Notifications + metrics := notifications.NewMetrics(options.PrometheusRegistry) // The enqueuer is responsible for enqueueing notifications to the given store. enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer")) @@ -994,7 +995,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. // The notification manager is responsible for: // - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications) // - keeping the store updated with status updates - notificationsManager, err = notifications.NewManager(cfg, options.Database, logger.Named("notifications.manager")) + notificationsManager, err = notifications.NewManager(cfg, options.Database, metrics, logger.Named("notifications.manager")) if err != nil { return xerrors.Errorf("failed to instantiate notification manager: %w", err) } diff --git a/coderd/database/dbauthz/dbauthz.go b/coderd/database/dbauthz/dbauthz.go index 1feea0c23bbe7..85df46dce620c 100644 --- a/coderd/database/dbauthz/dbauthz.go +++ b/coderd/database/dbauthz/dbauthz.go @@ -1143,9 +1143,9 @@ func (q *querier) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context, return q.db.DeleteWorkspaceAgentPortSharesByTemplate(ctx, templateID) } -func (q *querier) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { +func (q *querier) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) error { if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil { - return database.NotificationMessage{}, err + return err } return q.db.EnqueueNotificationMessage(ctx, arg) } diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index 5842ad8a2f921..9effbe1bb69f2 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -935,12 +935,17 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas q.mutex.Lock() defer q.mutex.Unlock() - var out []database.AcquireNotificationMessagesRow - for _, nm := range q.notificationMessages { - if len(out) >= int(arg.Count) { - break - } + // Shift the first "Count" notifications off the slice (FIFO). + sz := len(q.notificationMessages) + if sz > int(arg.Count) { + sz = int(arg.Count) + } + list := q.notificationMessages[:sz] + q.notificationMessages = q.notificationMessages[sz:] + + var out []database.AcquireNotificationMessagesRow + for _, nm := range list { acquirableStatuses := []database.NotificationMessageStatus{database.NotificationMessageStatusPending, database.NotificationMessageStatusTemporaryFailure} if !slices.Contains(acquirableStatuses, nm.Status) { continue @@ -956,9 +961,9 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas ID: nm.ID, Payload: nm.Payload, Method: nm.Method, - CreatedBy: nm.CreatedBy, TitleTemplate: "This is a title with {{.Labels.variable}}", BodyTemplate: "This is a body with {{.Labels.variable}}", + TemplateID: nm.NotificationTemplateID, }) } @@ -1815,10 +1820,10 @@ func (q *FakeQuerier) DeleteWorkspaceAgentPortSharesByTemplate(_ context.Context return nil } -func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { +func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) error { err := validateDatabaseType(arg) if err != nil { - return database.NotificationMessage{}, err + return err } q.mutex.Lock() @@ -1827,7 +1832,7 @@ func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database var payload types.MessagePayload err = json.Unmarshal(arg.Payload, &payload) if err != nil { - return database.NotificationMessage{}, err + return err } nm := database.NotificationMessage{ @@ -1845,7 +1850,7 @@ func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database q.notificationMessages = append(q.notificationMessages, nm) - return nm, err + return err } func (q *FakeQuerier) FavoriteWorkspace(_ context.Context, arg uuid.UUID) error { diff --git a/coderd/database/dbmetrics/dbmetrics.go b/coderd/database/dbmetrics/dbmetrics.go index 638aeaac14746..e705deafaf315 100644 --- a/coderd/database/dbmetrics/dbmetrics.go +++ b/coderd/database/dbmetrics/dbmetrics.go @@ -382,11 +382,11 @@ func (m metricsStore) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Conte return r0 } -func (m metricsStore) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { +func (m metricsStore) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) error { start := time.Now() - r0, r1 := m.s.EnqueueNotificationMessage(ctx, arg) + r0 := m.s.EnqueueNotificationMessage(ctx, arg) m.queryLatencies.WithLabelValues("EnqueueNotificationMessage").Observe(time.Since(start).Seconds()) - return r0, r1 + return r0 } func (m metricsStore) FavoriteWorkspace(ctx context.Context, arg uuid.UUID) error { diff --git a/coderd/database/dbmock/dbmock.go b/coderd/database/dbmock/dbmock.go index 5fc5403a64f7f..b69d982e46cc6 100644 --- a/coderd/database/dbmock/dbmock.go +++ b/coderd/database/dbmock/dbmock.go @@ -659,12 +659,11 @@ func (mr *MockStoreMockRecorder) DeleteWorkspaceAgentPortSharesByTemplate(arg0, } // EnqueueNotificationMessage mocks base method. -func (m *MockStore) EnqueueNotificationMessage(arg0 context.Context, arg1 database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { +func (m *MockStore) EnqueueNotificationMessage(arg0 context.Context, arg1 database.EnqueueNotificationMessageParams) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EnqueueNotificationMessage", arg0, arg1) - ret0, _ := ret[0].(database.NotificationMessage) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // EnqueueNotificationMessage indicates an expected call of EnqueueNotificationMessage. diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index 55102d827d50c..4f2d21e19b37e 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -563,7 +563,8 @@ CREATE TABLE notification_messages ( created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, updated_at timestamp with time zone, leased_until timestamp with time zone, - next_retry_after timestamp with time zone + next_retry_after timestamp with time zone, + queued_seconds double precision ); CREATE TABLE notification_templates ( diff --git a/coderd/database/migrations/000225_notifications_metrics.down.sql b/coderd/database/migrations/000225_notifications_metrics.down.sql new file mode 100644 index 0000000000000..100e51a5ea617 --- /dev/null +++ b/coderd/database/migrations/000225_notifications_metrics.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE notification_messages +DROP COLUMN IF EXISTS queued_seconds; \ No newline at end of file diff --git a/coderd/database/migrations/000225_notifications_metrics.up.sql b/coderd/database/migrations/000225_notifications_metrics.up.sql new file mode 100644 index 0000000000000..ab8f49dec237e --- /dev/null +++ b/coderd/database/migrations/000225_notifications_metrics.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE notification_messages +ADD COLUMN queued_seconds FLOAT NULL; \ No newline at end of file diff --git a/coderd/database/models.go b/coderd/database/models.go index c92d51b4366d3..8ace22909db93 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -2031,6 +2031,7 @@ type NotificationMessage struct { UpdatedAt sql.NullTime `db:"updated_at" json:"updated_at"` LeasedUntil sql.NullTime `db:"leased_until" json:"leased_until"` NextRetryAfter sql.NullTime `db:"next_retry_after" json:"next_retry_after"` + QueuedSeconds sql.NullFloat64 `db:"queued_seconds" json:"queued_seconds"` } // Templates from which to create notification messages. diff --git a/coderd/database/querier.go b/coderd/database/querier.go index c4ce70cea28fe..917db96b207d1 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -100,7 +100,7 @@ type sqlcQuerier interface { DeleteTailnetTunnel(ctx context.Context, arg DeleteTailnetTunnelParams) (DeleteTailnetTunnelRow, error) DeleteWorkspaceAgentPortShare(ctx context.Context, arg DeleteWorkspaceAgentPortShareParams) error DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context, templateID uuid.UUID) error - EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) (NotificationMessage, error) + EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) error FavoriteWorkspace(ctx context.Context, id uuid.UUID) error // This is used to build up the notification_message's JSON payload. FetchNewMessageMetadata(ctx context.Context, arg FetchNewMessageMetadataParams) (FetchNewMessageMetadataRow, error) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 5e85fd10d9838..ee439996e34dd 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -3292,7 +3292,8 @@ const acquireNotificationMessages = `-- name: AcquireNotificationMessages :many WITH acquired AS ( UPDATE notification_messages - SET updated_at = NOW(), + SET queued_seconds = GREATEST(0, EXTRACT(EPOCH FROM (NOW() - updated_at)))::FLOAT, + updated_at = NOW(), status = 'leased'::notification_message_status, status_reason = 'Leased by notifier ' || $1::uuid, leased_until = NOW() + CONCAT($2::int, ' seconds')::interval @@ -3328,14 +3329,16 @@ WITH acquired AS ( FOR UPDATE OF nm SKIP LOCKED LIMIT $4) - RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after) + RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds) SELECT -- message nm.id, nm.payload, nm.method, - nm.created_by, + nm.attempt_count::int AS attempt_count, + nm.queued_seconds::float AS queued_seconds, -- template + nt.id AS template_id, nt.title_template, nt.body_template FROM acquired nm @@ -3353,7 +3356,9 @@ type AcquireNotificationMessagesRow struct { ID uuid.UUID `db:"id" json:"id"` Payload json.RawMessage `db:"payload" json:"payload"` Method NotificationMethod `db:"method" json:"method"` - CreatedBy string `db:"created_by" json:"created_by"` + AttemptCount int32 `db:"attempt_count" json:"attempt_count"` + QueuedSeconds float64 `db:"queued_seconds" json:"queued_seconds"` + TemplateID uuid.UUID `db:"template_id" json:"template_id"` TitleTemplate string `db:"title_template" json:"title_template"` BodyTemplate string `db:"body_template" json:"body_template"` } @@ -3386,7 +3391,9 @@ func (q *sqlQuerier) AcquireNotificationMessages(ctx context.Context, arg Acquir &i.ID, &i.Payload, &i.Method, - &i.CreatedBy, + &i.AttemptCount, + &i.QueuedSeconds, + &i.TemplateID, &i.TitleTemplate, &i.BodyTemplate, ); err != nil { @@ -3405,7 +3412,8 @@ func (q *sqlQuerier) AcquireNotificationMessages(ctx context.Context, arg Acquir const bulkMarkNotificationMessagesFailed = `-- name: BulkMarkNotificationMessagesFailed :execrows UPDATE notification_messages -SET updated_at = subquery.failed_at, +SET queued_seconds = 0, + updated_at = subquery.failed_at, attempt_count = attempt_count + 1, status = CASE WHEN attempt_count + 1 < $1::int THEN subquery.status @@ -3448,13 +3456,14 @@ func (q *sqlQuerier) BulkMarkNotificationMessagesFailed(ctx context.Context, arg const bulkMarkNotificationMessagesSent = `-- name: BulkMarkNotificationMessagesSent :execrows UPDATE notification_messages -SET updated_at = new_values.sent_at, +SET queued_seconds = 0, + updated_at = new_values.sent_at, attempt_count = attempt_count + 1, status = 'sent'::notification_message_status, status_reason = NULL, leased_until = NULL, next_retry_after = NULL -FROM (SELECT UNNEST($1::uuid[]) AS id, +FROM (SELECT UNNEST($1::uuid[]) AS id, UNNEST($2::timestamptz[]) AS sent_at) AS new_values WHERE notification_messages.id = new_values.id @@ -3488,7 +3497,7 @@ func (q *sqlQuerier) DeleteOldNotificationMessages(ctx context.Context) error { return err } -const enqueueNotificationMessage = `-- name: EnqueueNotificationMessage :one +const enqueueNotificationMessage = `-- name: EnqueueNotificationMessage :exec INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by) VALUES ($1, $2, @@ -3497,7 +3506,6 @@ VALUES ($1, $5::jsonb, $6, $7) -RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after ` type EnqueueNotificationMessageParams struct { @@ -3510,8 +3518,8 @@ type EnqueueNotificationMessageParams struct { CreatedBy string `db:"created_by" json:"created_by"` } -func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) (NotificationMessage, error) { - row := q.db.QueryRowContext(ctx, enqueueNotificationMessage, +func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) error { + _, err := q.db.ExecContext(ctx, enqueueNotificationMessage, arg.ID, arg.NotificationTemplateID, arg.UserID, @@ -3520,24 +3528,7 @@ func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg Enqueue pq.Array(arg.Targets), arg.CreatedBy, ) - var i NotificationMessage - err := row.Scan( - &i.ID, - &i.NotificationTemplateID, - &i.UserID, - &i.Method, - &i.Status, - &i.StatusReason, - &i.CreatedBy, - &i.Payload, - &i.AttemptCount, - pq.Array(&i.Targets), - &i.CreatedAt, - &i.UpdatedAt, - &i.LeasedUntil, - &i.NextRetryAfter, - ) - return i, err + return err } const fetchNewMessageMetadata = `-- name: FetchNewMessageMetadata :one @@ -3580,7 +3571,7 @@ func (q *sqlQuerier) FetchNewMessageMetadata(ctx context.Context, arg FetchNewMe } const getNotificationMessagesByStatus = `-- name: GetNotificationMessagesByStatus :many -SELECT id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after FROM notification_messages WHERE status = $1 LIMIT $2::int +SELECT id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds FROM notification_messages WHERE status = $1 LIMIT $2::int ` type GetNotificationMessagesByStatusParams struct { @@ -3612,6 +3603,7 @@ func (q *sqlQuerier) GetNotificationMessagesByStatus(ctx context.Context, arg Ge &i.UpdatedAt, &i.LeasedUntil, &i.NextRetryAfter, + &i.QueuedSeconds, ); err != nil { return nil, err } diff --git a/coderd/database/queries/notifications.sql b/coderd/database/queries/notifications.sql index 2949c8f86e27b..edbb0fd6f5d58 100644 --- a/coderd/database/queries/notifications.sql +++ b/coderd/database/queries/notifications.sql @@ -10,7 +10,7 @@ FROM notification_templates nt, WHERE nt.id = @notification_template_id AND u.id = @user_id; --- name: EnqueueNotificationMessage :one +-- name: EnqueueNotificationMessage :exec INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by) VALUES (@id, @notification_template_id, @@ -18,8 +18,7 @@ VALUES (@id, @method::notification_method, @payload::jsonb, @targets, - @created_by) -RETURNING *; + @created_by); -- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending. -- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned. @@ -36,7 +35,8 @@ RETURNING *; WITH acquired AS ( UPDATE notification_messages - SET updated_at = NOW(), + SET queued_seconds = GREATEST(0, EXTRACT(EPOCH FROM (NOW() - updated_at)))::FLOAT, + updated_at = NOW(), status = 'leased'::notification_message_status, status_reason = 'Leased by notifier ' || sqlc.arg('notifier_id')::uuid, leased_until = NOW() + CONCAT(sqlc.arg('lease_seconds')::int, ' seconds')::interval @@ -78,8 +78,10 @@ SELECT nm.id, nm.payload, nm.method, - nm.created_by, + nm.attempt_count::int AS attempt_count, + nm.queued_seconds::float AS queued_seconds, -- template + nt.id AS template_id, nt.title_template, nt.body_template FROM acquired nm @@ -87,7 +89,8 @@ FROM acquired nm -- name: BulkMarkNotificationMessagesFailed :execrows UPDATE notification_messages -SET updated_at = subquery.failed_at, +SET queued_seconds = 0, + updated_at = subquery.failed_at, attempt_count = attempt_count + 1, status = CASE WHEN attempt_count + 1 < @max_attempts::int THEN subquery.status @@ -105,13 +108,14 @@ WHERE notification_messages.id = subquery.id; -- name: BulkMarkNotificationMessagesSent :execrows UPDATE notification_messages -SET updated_at = new_values.sent_at, +SET queued_seconds = 0, + updated_at = new_values.sent_at, attempt_count = attempt_count + 1, status = 'sent'::notification_message_status, status_reason = NULL, leased_until = NULL, next_retry_after = NULL -FROM (SELECT UNNEST(@ids::uuid[]) AS id, +FROM (SELECT UNNEST(@ids::uuid[]) AS id, UNNEST(@sent_ats::timestamptz[]) AS sent_at) AS new_values WHERE notification_messages.id = new_values.id; diff --git a/coderd/notifications/enqueuer.go b/coderd/notifications/enqueuer.go index f7b5c4655f477..8838ba9be1949 100644 --- a/coderd/notifications/enqueuer.go +++ b/coderd/notifications/enqueuer.go @@ -59,7 +59,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI } id := uuid.New() - msg, err := s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{ + err = s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{ ID: id, UserID: userID, NotificationTemplateID: templateID, @@ -73,7 +73,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI return nil, xerrors.Errorf("enqueue notification: %w", err) } - s.log.Debug(ctx, "enqueued notification", slog.F("msg_id", msg.ID)) + s.log.Debug(ctx, "enqueued notification", slog.F("msg_id", id)) return &id, nil } diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index 36e82d65af31b..5f5d30974a302 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -43,6 +43,9 @@ type Manager struct { notifier *notifier handlers map[database.NotificationMethod]Handler + method database.NotificationMethod + + metrics *Metrics success, failure chan dispatchResult @@ -56,7 +59,13 @@ type Manager struct { // // helpers is a map of template helpers which are used to customize notification messages to use global settings like // access URL etc. -func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger) (*Manager, error) { +func NewManager(cfg codersdk.NotificationsConfig, store Store, metrics *Metrics, log slog.Logger) (*Manager, error) { + // TODO(dannyk): add the ability to use multiple notification methods. + var method database.NotificationMethod + if err := method.Scan(cfg.Method.String()); err != nil { + return nil, xerrors.Errorf("notification method %q is invalid", cfg.Method) + } + // If dispatch timeout exceeds lease period, it is possible that messages can be delivered in duplicate because the // lease can expire before the notifier gives up on the dispatch, which results in the message becoming eligible for // being re-acquired. @@ -78,6 +87,9 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger) success: make(chan dispatchResult, cfg.StoreSyncBufferSize), failure: make(chan dispatchResult, cfg.StoreSyncBufferSize), + metrics: metrics, + method: method, + stop: make(chan any), done: make(chan any), @@ -137,7 +149,7 @@ func (m *Manager) loop(ctx context.Context) error { var eg errgroup.Group // Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications. - m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers) + m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.method, m.metrics) eg.Go(func() error { return m.notifier.run(ctx, m.success, m.failure) }) @@ -171,12 +183,12 @@ func (m *Manager) loop(ctx context.Context) error { if len(m.success)+len(m.failure) > 0 { m.log.Warn(ctx, "flushing buffered updates before stop", slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure))) - m.bulkUpdate(ctx) + m.syncUpdates(ctx) m.log.Warn(ctx, "flushing updates done") } return nil case <-tick.C: - m.bulkUpdate(ctx) + m.syncUpdates(ctx) } } }) @@ -194,8 +206,13 @@ func (m *Manager) BufferedUpdatesCount() (success int, failure int) { return len(m.success), len(m.failure) } -// bulkUpdate updates messages in the store based on the given successful and failed message dispatch results. -func (m *Manager) bulkUpdate(ctx context.Context) { +// syncUpdates updates messages in the store based on the given successful and failed message dispatch results. +func (m *Manager) syncUpdates(ctx context.Context) { + // Ensure we update the metrics to reflect the current state after each invocation. + defer func() { + m.metrics.PendingUpdates.Set(float64(len(m.success) + len(m.failure))) + }() + select { case <-ctx.Done(): return @@ -205,6 +222,8 @@ func (m *Manager) bulkUpdate(ctx context.Context) { nSuccess := len(m.success) nFailure := len(m.failure) + m.metrics.PendingUpdates.Set(float64(nSuccess + nFailure)) + // Nothing to do. if nSuccess+nFailure == 0 { return @@ -266,6 +285,7 @@ func (m *Manager) bulkUpdate(ctx context.Context) { logger.Error(ctx, "bulk update failed", slog.Error(err)) return } + m.metrics.SyncedUpdates.Add(float64(n)) logger.Debug(ctx, "bulk update completed", slog.F("updated", n)) }() @@ -289,6 +309,7 @@ func (m *Manager) bulkUpdate(ctx context.Context) { logger.Error(ctx, "bulk update failed", slog.Error(err)) return } + m.metrics.SyncedUpdates.Add(float64(n)) logger.Debug(ctx, "bulk update completed", slog.F("updated", n)) }() @@ -347,21 +368,3 @@ type dispatchResult struct { err error retryable bool } - -func newSuccessfulDispatch(notifier, msg uuid.UUID) dispatchResult { - return dispatchResult{ - notifier: notifier, - msg: msg, - ts: time.Now(), - } -} - -func newFailedDispatch(notifier, msg uuid.UUID, err error, retryable bool) dispatchResult { - return dispatchResult{ - notifier: notifier, - msg: msg, - ts: time.Now(), - err: err, - retryable: retryable, - } -} diff --git a/coderd/notifications/manager_test.go b/coderd/notifications/manager_test.go index fe161cc2cd8f6..abf29ca7a4539 100644 --- a/coderd/notifications/manager_test.go +++ b/coderd/notifications/manager_test.go @@ -28,14 +28,14 @@ func TestBufferedUpdates(t *testing.T) { // setup ctx, logger, db := setupInMemory(t) - interceptor := &bulkUpdateInterceptor{Store: db} + interceptor := &syncInterceptor{Store: db} santa := &santaHandler{} cfg := defaultNotificationsConfig(database.NotificationMethodSmtp) cfg.StoreSyncInterval = serpent.Duration(time.Hour) // Ensure we don't sync the store automatically. // GIVEN: a manager which will pass or fail notifications based on their "nice" labels - mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("notifications-manager")) + mgr, err := notifications.NewManager(cfg, interceptor, createMetrics(), logger.Named("notifications-manager")) require.NoError(t, err) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ database.NotificationMethodSmtp: santa, @@ -148,7 +148,7 @@ func TestStopBeforeRun(t *testing.T) { ctx, logger, db := setupInMemory(t) // GIVEN: a standard manager - mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), db, logger.Named("notifications-manager")) + mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), db, createMetrics(), logger.Named("notifications-manager")) require.NoError(t, err) // THEN: validate that the manager can be stopped safely without Run() having been called yet @@ -158,7 +158,7 @@ func TestStopBeforeRun(t *testing.T) { }, testutil.WaitShort, testutil.IntervalFast) } -type bulkUpdateInterceptor struct { +type syncInterceptor struct { notifications.Store sent atomic.Int32 @@ -166,7 +166,7 @@ type bulkUpdateInterceptor struct { err atomic.Value } -func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { +func (b *syncInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { updated, err := b.Store.BulkMarkNotificationMessagesSent(ctx, arg) b.sent.Add(int32(updated)) if err != nil { @@ -175,7 +175,7 @@ func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Con return updated, err } -func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { +func (b *syncInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { updated, err := b.Store.BulkMarkNotificationMessagesFailed(ctx, arg) b.failed.Add(int32(updated)) if err != nil { @@ -213,15 +213,15 @@ func newEnqueueInterceptor(db notifications.Store, metadataFn func() database.Fe return &enqueueInterceptor{Store: db, payload: make(chan types.MessagePayload, 1), metadataFn: metadataFn} } -func (e *enqueueInterceptor) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { +func (e *enqueueInterceptor) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) error { var payload types.MessagePayload err := json.Unmarshal(arg.Payload, &payload) if err != nil { - return database.NotificationMessage{}, err + return err } e.payload <- payload - return database.NotificationMessage{}, err + return err } func (e *enqueueInterceptor) FetchNewMessageMetadata(_ context.Context, _ database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) { diff --git a/coderd/notifications/metrics.go b/coderd/notifications/metrics.go new file mode 100644 index 0000000000000..204bc260c7742 --- /dev/null +++ b/coderd/notifications/metrics.go @@ -0,0 +1,80 @@ +package notifications + +import ( + "fmt" + "strings" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type Metrics struct { + DispatchAttempts *prometheus.CounterVec + RetryCount *prometheus.CounterVec + + QueuedSeconds *prometheus.HistogramVec + + InflightDispatches *prometheus.GaugeVec + DispatcherSendSeconds *prometheus.HistogramVec + + PendingUpdates prometheus.Gauge + SyncedUpdates prometheus.Counter +} + +const ( + ns = "coderd" + subsystem = "notifications" + + LabelMethod = "method" + LabelTemplateID = "notification_template_id" + LabelResult = "result" + + ResultSuccess = "success" + ResultTempFail = "temp_fail" + ResultPermFail = "perm_fail" +) + +func NewMetrics(reg prometheus.Registerer) *Metrics { + return &Metrics{ + DispatchAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "dispatch_attempts_total", Namespace: ns, Subsystem: subsystem, + Help: fmt.Sprintf("The number of dispatch attempts, aggregated by the result type (%s)", + strings.Join([]string{ResultSuccess, ResultTempFail, ResultPermFail}, ", ")), + }, []string{LabelMethod, LabelTemplateID, LabelResult}), + RetryCount: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "retry_count", Namespace: ns, Subsystem: subsystem, + Help: "The count of notification dispatch retry attempts.", + }, []string{LabelMethod, LabelTemplateID}), + + // Aggregating on LabelTemplateID as well would cause a cardinality explosion. + QueuedSeconds: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "queued_seconds", Namespace: ns, Subsystem: subsystem, + Buckets: []float64{1, 2.5, 5, 7.5, 10, 15, 20, 30, 60, 120, 300, 600, 3600}, + Help: "The time elapsed between a notification being enqueued in the store and retrieved for dispatching " + + "(measures the latency of the notifications system). This should generally be within CODER_NOTIFICATIONS_FETCH_INTERVAL " + + "seconds; higher values for a sustained period indicates delayed processing and CODER_NOTIFICATIONS_LEASE_COUNT " + + "can be increased to accommodate this.", + }, []string{LabelMethod}), + + InflightDispatches: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "inflight_dispatches", Namespace: ns, Subsystem: subsystem, + Help: "The number of dispatch attempts which are currently in progress.", + }, []string{LabelMethod, LabelTemplateID}), + // Aggregating on LabelTemplateID as well would cause a cardinality explosion. + DispatcherSendSeconds: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "dispatcher_send_seconds", Namespace: ns, Subsystem: subsystem, + Buckets: []float64{0.001, 0.05, 0.1, 0.5, 1, 2, 5, 10, 15, 30, 60, 120}, + Help: "The time taken to dispatch notifications.", + }, []string{LabelMethod}), + + // Currently no requirement to discriminate between success and failure updates which are pending. + PendingUpdates: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "pending_updates", Namespace: ns, Subsystem: subsystem, + Help: "The number of dispatch attempt results waiting to be flushed to the store.", + }), + SyncedUpdates: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "synced_updates_total", Namespace: ns, Subsystem: subsystem, + Help: "The number of dispatch attempt results flushed to the store.", + }), + } +} diff --git a/coderd/notifications/metrics_test.go b/coderd/notifications/metrics_test.go new file mode 100644 index 0000000000000..53fb8279789e2 --- /dev/null +++ b/coderd/notifications/metrics_test.go @@ -0,0 +1,444 @@ +package notifications_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/coder/serpent" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/notifications" + "github.com/coder/coder/v2/coderd/notifications/dispatch" + "github.com/coder/coder/v2/coderd/notifications/types" + "github.com/coder/coder/v2/testutil" +) + +func TestMetrics(t *testing.T) { + t.Parallel() + + // SETUP + if !dbtestutil.WillUsePostgres() { + t.Skip("This test requires postgres; it relies on business-logic only implemented in the database") + } + + ctx, logger, store := setup(t) + + reg := prometheus.NewRegistry() + metrics := notifications.NewMetrics(reg) + template := notifications.TemplateWorkspaceDeleted + + const ( + method = database.NotificationMethodSmtp + maxAttempts = 3 + debug = false + ) + + // GIVEN: a notification manager whose intervals are tuned low (for test speed) and whose dispatches are intercepted + cfg := defaultNotificationsConfig(method) + cfg.MaxSendAttempts = maxAttempts + // Tune the intervals low to increase test speed. + cfg.FetchInterval = serpent.Duration(time.Millisecond * 50) + cfg.RetryInterval = serpent.Duration(time.Millisecond * 50) + cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) // Twice as long as fetch interval to ensure we catch pending updates. + + mgr, err := notifications.NewManager(cfg, store, metrics, logger.Named("manager")) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, mgr.Stop(ctx)) + }) + handler := &fakeHandler{} + mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ + method: handler, + }) + + enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) + require.NoError(t, err) + + user := createSampleUser(t, store) + + // Build fingerprints for the two different series we expect. + methodTemplateFP := fingerprintLabels(notifications.LabelMethod, string(method), notifications.LabelTemplateID, template.String()) + methodFP := fingerprintLabels(notifications.LabelMethod, string(method)) + + expected := map[string]func(metric *dto.Metric, series string) bool{ + "coderd_notifications_dispatch_attempts_total": func(metric *dto.Metric, series string) bool { + // This metric has 3 possible dispositions; find if any of them match first before we check the metric's value. + results := map[string]float64{ + notifications.ResultSuccess: 1, // Only 1 successful delivery. + notifications.ResultTempFail: maxAttempts - 1, // 2 temp failures, on the 3rd it'll be marked permanent failure. + notifications.ResultPermFail: 1, // 1 permanent failure after retries exhausted. + } + + var match string + for result, val := range results { + seriesFP := fingerprintLabels(notifications.LabelMethod, string(method), notifications.LabelTemplateID, template.String(), notifications.LabelResult, result) + if !hasMatchingFingerprint(metric, seriesFP) { + continue + } + + match = result + + if debug { + t.Logf("coderd_notifications_dispatch_attempts_total{result=%q} == %v: %v", result, val, metric.Counter.GetValue()) + } + + break + } + + // Could not find a matching series. + if match == "" { + assert.Failf(t, "found unexpected series %q", series) + return false + } + + // nolint:forcetypeassert // Already checked above. + target := results[match] + return metric.Counter.GetValue() == target + }, + "coderd_notifications_retry_count": func(metric *dto.Metric, series string) bool { + assert.Truef(t, hasMatchingFingerprint(metric, methodTemplateFP), "found unexpected series %q", series) + + if debug { + t.Logf("coderd_notifications_retry_count == %v: %v", maxAttempts-1, metric.Counter.GetValue()) + } + + // 1 original attempts + 2 retries = maxAttempts + return metric.Counter.GetValue() == maxAttempts-1 + }, + "coderd_notifications_queued_seconds": func(metric *dto.Metric, series string) bool { + assert.Truef(t, hasMatchingFingerprint(metric, methodFP), "found unexpected series %q", series) + + if debug { + t.Logf("coderd_notifications_queued_seconds > 0: %v", metric.Histogram.GetSampleSum()) + } + + // Notifications will queue for a non-zero amount of time. + return metric.Histogram.GetSampleSum() > 0 + }, + "coderd_notifications_dispatcher_send_seconds": func(metric *dto.Metric, series string) bool { + assert.Truef(t, hasMatchingFingerprint(metric, methodFP), "found unexpected series %q", series) + + if debug { + t.Logf("coderd_notifications_dispatcher_send_seconds > 0: %v", metric.Histogram.GetSampleSum()) + } + + // Dispatches should take a non-zero amount of time. + return metric.Histogram.GetSampleSum() > 0 + }, + "coderd_notifications_inflight_dispatches": func(metric *dto.Metric, series string) bool { + // This is a gauge, so it can be difficult to get the timing right to catch it. + // See TestInflightDispatchesMetric for a more precise test. + return true + }, + "coderd_notifications_pending_updates": func(metric *dto.Metric, series string) bool { + // This is a gauge, so it can be difficult to get the timing right to catch it. + // See TestPendingUpdatesMetric for a more precise test. + return true + }, + "coderd_notifications_synced_updates_total": func(metric *dto.Metric, series string) bool { + if debug { + t.Logf("coderd_notifications_synced_updates_total = %v: %v", maxAttempts+1, metric.Counter.GetValue()) + } + + // 1 message will exceed its maxAttempts, 1 will succeed on the first try. + return metric.Counter.GetValue() == maxAttempts+1 + }, + } + + // WHEN: 2 notifications are enqueued, 1 of which will fail until its retries are exhausted, and another which will succeed + _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") // this will succeed + require.NoError(t, err) + _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "failure"}, "test2") // this will fail and retry (maxAttempts - 1) times + require.NoError(t, err) + + mgr.Run(ctx) + + // THEN: expect all the defined metrics to be present and have their expected values + require.EventuallyWithT(t, func(ct *assert.CollectT) { + handler.mu.RLock() + defer handler.mu.RUnlock() + + gathered, err := reg.Gather() + assert.NoError(t, err) + + succeeded := len(handler.succeeded) + failed := len(handler.failed) + if debug { + t.Logf("SUCCEEDED == 1: %v, FAILED == %v: %v\n", succeeded, maxAttempts, failed) + } + + // Ensure that all metrics have a) the expected label combinations (series) and b) the expected values. + for _, family := range gathered { + hasExpectedValue, ok := expected[family.GetName()] + if !assert.Truef(ct, ok, "found unexpected metric family %q", family.GetName()) { + t.Logf("found unexpected metric family %q", family.GetName()) + // Bail out fast if precondition is not met. + ct.FailNow() + } + + for _, metric := range family.Metric { + assert.True(ct, hasExpectedValue(metric, metric.String())) + } + } + + // One message will succeed. + assert.Equal(ct, succeeded, 1) + // One message will fail, and exhaust its maxAttempts. + assert.Equal(ct, failed, maxAttempts) + }, testutil.WaitShort, testutil.IntervalFast) +} + +func TestPendingUpdatesMetric(t *testing.T) { + t.Parallel() + + // SETUP + ctx, logger, store := setupInMemory(t) + + reg := prometheus.NewRegistry() + metrics := notifications.NewMetrics(reg) + template := notifications.TemplateWorkspaceDeleted + + const method = database.NotificationMethodSmtp + + // GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric + cfg := defaultNotificationsConfig(method) + cfg.FetchInterval = serpent.Duration(time.Millisecond * 50) + cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere. + cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) + + syncer := &syncInterceptor{Store: store} + interceptor := newUpdateSignallingInterceptor(syncer) + mgr, err := notifications.NewManager(cfg, interceptor, metrics, logger.Named("manager")) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, mgr.Stop(ctx)) + }) + handler := &fakeHandler{} + mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ + method: handler, + }) + + enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) + require.NoError(t, err) + + user := createSampleUser(t, store) + + // WHEN: 2 notifications are enqueued, one of which will fail and one which will succeed + _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") // this will succeed + require.NoError(t, err) + _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "failure"}, "test2") // this will fail and retry (maxAttempts - 1) times + require.NoError(t, err) + + mgr.Run(ctx) + + // THEN: + // Wait until the handler has dispatched the given notifications. + require.Eventually(t, func() bool { + handler.mu.RLock() + defer handler.mu.RUnlock() + + return len(handler.succeeded) == 1 && len(handler.failed) == 1 + }, testutil.WaitShort, testutil.IntervalFast) + + // Wait until we intercept the calls to sync the pending updates to the store. + success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess) + failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure) + + // Ensure that the value set in the metric is equivalent to the number of actual pending updates. + pending := promtest.ToFloat64(metrics.PendingUpdates) + require.EqualValues(t, pending, success+failure) + + // Unpause the interceptor so the updates can proceed. + interceptor.proceed.Broadcast() + + // Validate that the store synced the expected number of updates. + require.Eventually(t, func() bool { + return syncer.sent.Load() == 1 && syncer.failed.Load() == 1 + }, testutil.WaitShort, testutil.IntervalFast) + + // Wait for the updates to be synced and the metric to reflect that. + require.Eventually(t, func() bool { + return promtest.ToFloat64(metrics.PendingUpdates) == 0 + }, testutil.WaitShort, testutil.IntervalFast) +} + +func TestInflightDispatchesMetric(t *testing.T) { + t.Parallel() + + // SETUP + ctx, logger, store := setupInMemory(t) + + reg := prometheus.NewRegistry() + metrics := notifications.NewMetrics(reg) + template := notifications.TemplateWorkspaceDeleted + + const method = database.NotificationMethodSmtp + + // GIVEN: a notification manager whose dispatches are intercepted and delayed to measure the number of inflight requests + cfg := defaultNotificationsConfig(method) + cfg.LeaseCount = 10 + cfg.FetchInterval = serpent.Duration(time.Millisecond * 50) + cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere. + cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) + + mgr, err := notifications.NewManager(cfg, store, metrics, logger.Named("manager")) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, mgr.Stop(ctx)) + }) + + handler := &fakeHandler{} + // Delayer will delay all dispatches by 2x fetch intervals to ensure we catch the requests inflight. + delayer := newDelayingHandler(cfg.FetchInterval.Value()*2, handler) + mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ + method: delayer, + }) + + enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) + require.NoError(t, err) + + user := createSampleUser(t, store) + + // WHEN: notifications are enqueued which will succeed (and be delayed during dispatch) + const msgCount = 2 + for i := 0; i < msgCount; i++ { + _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") + require.NoError(t, err) + } + + mgr.Run(ctx) + + // THEN: + // Ensure we see the dispatches of the messages inflight. + require.Eventually(t, func() bool { + return promtest.ToFloat64(metrics.InflightDispatches.WithLabelValues(string(method), template.String())) == msgCount + }, testutil.WaitShort, testutil.IntervalFast) + + // Wait until the handler has dispatched the given notifications. + require.Eventually(t, func() bool { + handler.mu.RLock() + defer handler.mu.RUnlock() + + return len(handler.succeeded) == msgCount + }, testutil.WaitShort, testutil.IntervalFast) + + // Wait for the updates to be synced and the metric to reflect that. + require.Eventually(t, func() bool { + return promtest.ToFloat64(metrics.InflightDispatches) == 0 + }, testutil.WaitShort, testutil.IntervalFast) +} + +// hasMatchingFingerprint checks if the given metric's series fingerprint matches the reference fingerprint. +func hasMatchingFingerprint(metric *dto.Metric, fp model.Fingerprint) bool { + return fingerprintLabelPairs(metric.Label) == fp +} + +// fingerprintLabelPairs produces a fingerprint unique to the given combination of label pairs. +func fingerprintLabelPairs(lbs []*dto.LabelPair) model.Fingerprint { + pairs := make([]string, 0, len(lbs)*2) + for _, lp := range lbs { + pairs = append(pairs, lp.GetName(), lp.GetValue()) + } + + return fingerprintLabels(pairs...) +} + +// fingerprintLabels produces a fingerprint unique to the given pairs of label values. +// MUST contain an even number of arguments (key:value), otherwise it will panic. +func fingerprintLabels(lbs ...string) model.Fingerprint { + if len(lbs)%2 != 0 { + panic("imbalanced set of label pairs given") + } + + lbsSet := make(model.LabelSet, len(lbs)/2) + for i := 0; i < len(lbs); i += 2 { + k := lbs[i] + v := lbs[i+1] + lbsSet[model.LabelName(k)] = model.LabelValue(v) + } + + return lbsSet.Fingerprint() // FastFingerprint does not sort the labels. +} + +// updateSignallingInterceptor intercepts bulk update calls to the store, and waits on the "proceed" condition to be +// signaled by the caller so it can continue. +type updateSignallingInterceptor struct { + notifications.Store + + proceed *sync.Cond + + updateSuccess chan int + updateFailure chan int +} + +func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSignallingInterceptor { + return &updateSignallingInterceptor{ + Store: interceptor, + + proceed: sync.NewCond(&sync.Mutex{}), + + updateSuccess: make(chan int, 1), + updateFailure: make(chan int, 1), + } +} + +func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { + u.updateSuccess <- len(arg.IDs) + + u.proceed.L.Lock() + defer u.proceed.L.Unlock() + + // Wait until signaled so we have a chance to read the number of pending updates. + u.proceed.Wait() + + return u.Store.BulkMarkNotificationMessagesSent(ctx, arg) +} + +func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { + u.updateFailure <- len(arg.IDs) + + u.proceed.L.Lock() + defer u.proceed.L.Unlock() + + // Wait until signaled so we have a chance to read the number of pending updates. + u.proceed.Wait() + + return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg) +} + +type delayingHandler struct { + h notifications.Handler + + delay time.Duration +} + +func newDelayingHandler(delay time.Duration, handler notifications.Handler) *delayingHandler { + return &delayingHandler{ + delay: delay, + h: handler, + } +} + +func (d *delayingHandler) Dispatcher(payload types.MessagePayload, title, body string) (dispatch.DeliveryFunc, error) { + deliverFn, err := d.h.Dispatcher(payload, title, body) + if err != nil { + return nil, err + } + + return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) { + time.Sleep(d.delay) + + return deliverFn(ctx, msgID) + }, nil +} diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index c38daa1531ecb..481244bf21f2a 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -7,13 +7,13 @@ import ( "net/http" "net/http/httptest" "net/url" + "slices" "sort" "sync" "sync/atomic" "testing" "time" - "golang.org/x/exp/slices" "golang.org/x/xerrors" "github.com/google/uuid" @@ -54,10 +54,10 @@ func TestBasicNotificationRoundtrip(t *testing.T) { // GIVEN: a manager with standard config but a faked dispatch handler handler := &fakeHandler{} - interceptor := &bulkUpdateInterceptor{Store: db} + interceptor := &syncInterceptor{Store: db} cfg := defaultNotificationsConfig(method) cfg.RetryInterval = serpent.Duration(time.Hour) // Ensure retries don't interfere with the test - mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, interceptor, createMetrics(), logger.Named("manager")) require.NoError(t, err) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler}) t.Cleanup(func() { @@ -88,7 +88,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) { require.Eventually(t, func() bool { return interceptor.sent.Load() == 1 && interceptor.failed.Load() == 1 - }, testutil.WaitShort, testutil.IntervalFast) + }, testutil.WaitLong, testutil.IntervalFast) // THEN: we verify that the store contains notifications in their expected state success, err := db.GetNotificationMessagesByStatus(ctx, database.GetNotificationMessagesByStatusParams{ @@ -131,7 +131,7 @@ func TestSMTPDispatch(t *testing.T) { Hello: "localhost", } handler := newDispatchInterceptor(dispatch.NewSMTPHandler(cfg.SMTP, logger.Named("smtp"))) - mgr, err := notifications.NewManager(cfg, db, logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager")) require.NoError(t, err) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler}) t.Cleanup(func() { @@ -192,7 +192,7 @@ func TestWebhookDispatch(t *testing.T) { cfg.Webhook = codersdk.NotificationsWebhookConfig{ Endpoint: *serpent.URLOf(endpoint), } - mgr, err := notifications.NewManager(cfg, db, logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager")) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) @@ -285,10 +285,10 @@ func TestBackpressure(t *testing.T) { handler := newDispatchInterceptor(dispatch.NewWebhookHandler(cfg.Webhook, logger.Named("webhook"))) // Intercept calls to submit the buffered updates to the store. - storeInterceptor := &bulkUpdateInterceptor{Store: db} + storeInterceptor := &syncInterceptor{Store: db} // GIVEN: a notification manager whose updates will be intercepted - mgr, err := notifications.NewManager(cfg, storeInterceptor, logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, storeInterceptor, createMetrics(), logger.Named("manager")) require.NoError(t, err) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler}) enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) @@ -381,9 +381,9 @@ func TestRetries(t *testing.T) { handler := newDispatchInterceptor(dispatch.NewWebhookHandler(cfg.Webhook, logger.Named("webhook"))) // Intercept calls to submit the buffered updates to the store. - storeInterceptor := &bulkUpdateInterceptor{Store: db} + storeInterceptor := &syncInterceptor{Store: db} - mgr, err := notifications.NewManager(cfg, storeInterceptor, logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, storeInterceptor, createMetrics(), logger.Named("manager")) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) @@ -439,12 +439,12 @@ func TestExpiredLeaseIsRequeued(t *testing.T) { cfg.LeasePeriod = serpent.Duration(leasePeriod) cfg.DispatchTimeout = serpent.Duration(leasePeriod - time.Millisecond) - noopInterceptor := newNoopBulkUpdater(db) + noopInterceptor := newNoopStoreSyncer(db) mgrCtx, cancelManagerCtx := context.WithCancel(context.Background()) t.Cleanup(cancelManagerCtx) - mgr, err := notifications.NewManager(cfg, noopInterceptor, logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, noopInterceptor, createMetrics(), logger.Named("manager")) require.NoError(t, err) enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) require.NoError(t, err) @@ -489,9 +489,9 @@ func TestExpiredLeaseIsRequeued(t *testing.T) { // Start a new notification manager. // Intercept calls to submit the buffered updates to the store. - storeInterceptor := &bulkUpdateInterceptor{Store: db} + storeInterceptor := &syncInterceptor{Store: db} handler := newDispatchInterceptor(&fakeHandler{}) - mgr, err = notifications.NewManager(cfg, storeInterceptor, logger.Named("manager")) + mgr, err = notifications.NewManager(cfg, storeInterceptor, createMetrics(), logger.Named("manager")) require.NoError(t, err) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler}) @@ -532,7 +532,7 @@ func TestInvalidConfig(t *testing.T) { cfg.DispatchTimeout = serpent.Duration(leasePeriod) // WHEN: the manager is created with invalid config - _, err := notifications.NewManager(cfg, db, logger.Named("manager")) + _, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager")) // THEN: the manager will fail to be created, citing invalid config as error require.ErrorIs(t, err, notifications.ErrInvalidDispatchTimeout) @@ -550,9 +550,7 @@ func TestNotifierPaused(t *testing.T) { user := createSampleUser(t, db) cfg := defaultNotificationsConfig(method) - fetchInterval := time.Nanosecond // Let - cfg.FetchInterval = *serpent.DurationOf(&fetchInterval) - mgr, err := notifications.NewManager(cfg, db, logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager")) require.NoError(t, err) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler}) t.Cleanup(func() { @@ -604,10 +602,8 @@ func TestNotifierPaused(t *testing.T) { } type fakeHandler struct { - mu sync.RWMutex - - succeeded []string - failed []string + mu sync.RWMutex + succeeded, failed []string } func (f *fakeHandler) Dispatcher(payload types.MessagePayload, _, _ string) (dispatch.DeliveryFunc, error) { @@ -625,62 +621,20 @@ func (f *fakeHandler) Dispatcher(payload types.MessagePayload, _, _ string) (dis }, nil } -type dispatchInterceptor struct { - handler notifications.Handler - - sent atomic.Int32 - retryable atomic.Int32 - unretryable atomic.Int32 - err atomic.Int32 - lastErr atomic.Value -} - -func newDispatchInterceptor(h notifications.Handler) *dispatchInterceptor { - return &dispatchInterceptor{ - handler: h, - } -} - -func (i *dispatchInterceptor) Dispatcher(payload types.MessagePayload, title, body string) (dispatch.DeliveryFunc, error) { - return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) { - deliveryFn, err := i.handler.Dispatcher(payload, title, body) - if err != nil { - return false, err - } - - retryable, err = deliveryFn(ctx, msgID) - - if err != nil { - i.err.Add(1) - i.lastErr.Store(err) - } - - switch { - case !retryable && err == nil: - i.sent.Add(1) - case retryable: - i.retryable.Add(1) - case !retryable && err != nil: - i.unretryable.Add(1) - } - return retryable, err - }, nil -} - -// noopBulkUpdater pretends to perform bulk updates, but does not; leading to messages being stuck in "leased" state. -type noopBulkUpdater struct { +// noopStoreSyncer pretends to perform store syncs, but does not; leading to messages being stuck in "leased" state. +type noopStoreSyncer struct { *acquireSignalingInterceptor } -func newNoopBulkUpdater(db notifications.Store) *noopBulkUpdater { - return &noopBulkUpdater{newAcquireSignalingInterceptor(db)} +func newNoopStoreSyncer(db notifications.Store) *noopStoreSyncer { + return &noopStoreSyncer{newAcquireSignalingInterceptor(db)} } -func (*noopBulkUpdater) BulkMarkNotificationMessagesSent(_ context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { +func (*noopStoreSyncer) BulkMarkNotificationMessagesSent(_ context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { return int64(len(arg.IDs)), nil } -func (*noopBulkUpdater) BulkMarkNotificationMessagesFailed(_ context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { +func (*noopStoreSyncer) BulkMarkNotificationMessagesFailed(_ context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { return int64(len(arg.IDs)), nil } diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index d400b52166b78..c39de6168db81 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -33,10 +33,12 @@ type notifier struct { quit chan any done chan any + method database.NotificationMethod handlers map[database.NotificationMethod]Handler + metrics *Metrics } -func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler) *notifier { +func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, method database.NotificationMethod, metrics *Metrics) *notifier { return ¬ifier{ id: id, cfg: cfg, @@ -46,6 +48,8 @@ func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger tick: time.NewTicker(cfg.FetchInterval.Value()), store: db, handlers: hr, + method: method, + metrics: metrics, } } @@ -99,8 +103,6 @@ func (n *notifier) run(ctx context.Context, success chan<- dispatchResult, failu // ensureRunning checks if notifier is not paused. func (n *notifier) ensureRunning(ctx context.Context) (bool, error) { - n.log.Debug(ctx, "check if notifier is paused") - settingsJSON, err := n.store.GetNotificationsSettings(ctx) if err != nil { return false, xerrors.Errorf("get notifications settings: %w", err) @@ -129,14 +131,13 @@ func (n *notifier) ensureRunning(ctx context.Context) (bool, error) { // resulting in a failed attempt for each notification when their contexts are canceled; this is not possible with the // default configurations but could be brought about by an operator tuning things incorrectly. func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, failure chan<- dispatchResult) error { - n.log.Debug(ctx, "attempting to dequeue messages") - msgs, err := n.fetch(ctx) if err != nil { return xerrors.Errorf("fetch messages: %w", err) } n.log.Debug(ctx, "dequeued messages", slog.F("count", len(msgs))) + if len(msgs) == 0 { return nil } @@ -147,7 +148,9 @@ func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, f deliverFn, err := n.prepare(ctx, msg) if err != nil { n.log.Warn(ctx, "dispatcher construction failed", slog.F("msg_id", msg.ID), slog.Error(err)) - failure <- newFailedDispatch(n.id, msg.ID, err, false) + failure <- n.newFailedDispatch(msg, err, false) + + n.metrics.PendingUpdates.Set(float64(len(success) + len(failure))) continue } @@ -162,7 +165,7 @@ func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, f return xerrors.Errorf("dispatch failed: %w", err) } - n.log.Debug(ctx, "dispatch completed", slog.F("count", len(msgs))) + n.log.Debug(ctx, "batch completed", slog.F("count", len(msgs))) return nil } @@ -228,9 +231,21 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification ctx, cancel := context.WithTimeout(ctx, n.cfg.DispatchTimeout.Value()) defer cancel() - logger := n.log.With(slog.F("msg_id", msg.ID), slog.F("method", msg.Method)) + logger := n.log.With(slog.F("msg_id", msg.ID), slog.F("method", msg.Method), slog.F("attempt", msg.AttemptCount+1)) + + if msg.AttemptCount > 0 { + n.metrics.RetryCount.WithLabelValues(string(n.method), msg.TemplateID.String()).Inc() + } + n.metrics.InflightDispatches.WithLabelValues(string(n.method), msg.TemplateID.String()).Inc() + n.metrics.QueuedSeconds.WithLabelValues(string(n.method)).Observe(msg.QueuedSeconds) + + start := time.Now() retryable, err := deliver(ctx, msg.ID) + + n.metrics.DispatcherSendSeconds.WithLabelValues(string(n.method)).Observe(time.Since(start).Seconds()) + n.metrics.InflightDispatches.WithLabelValues(string(n.method), msg.TemplateID.String()).Dec() + if err != nil { // Don't try to accumulate message responses if the context has been canceled. // @@ -248,24 +263,55 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification case <-ctx.Done(): logger.Warn(context.Background(), "cannot record dispatch failure result", slog.Error(ctx.Err())) return ctx.Err() - default: + case failure <- n.newFailedDispatch(msg, err, retryable): logger.Warn(ctx, "message dispatch failed", slog.Error(err)) - failure <- newFailedDispatch(n.id, msg.ID, err, retryable) } } else { select { case <-ctx.Done(): logger.Warn(context.Background(), "cannot record dispatch success result", slog.Error(ctx.Err())) return ctx.Err() - default: + case success <- n.newSuccessfulDispatch(msg): logger.Debug(ctx, "message dispatch succeeded") - success <- newSuccessfulDispatch(n.id, msg.ID) } } + n.metrics.PendingUpdates.Set(float64(len(success) + len(failure))) return nil } +func (n *notifier) newSuccessfulDispatch(msg database.AcquireNotificationMessagesRow) dispatchResult { + n.metrics.DispatchAttempts.WithLabelValues(string(n.method), msg.TemplateID.String(), ResultSuccess).Inc() + + return dispatchResult{ + notifier: n.id, + msg: msg.ID, + ts: time.Now(), + } +} + +// revive:disable-next-line:flag-parameter // Not used for control flow, rather just choosing which metric to increment. +func (n *notifier) newFailedDispatch(msg database.AcquireNotificationMessagesRow, err error, retryable bool) dispatchResult { + var result string + + // If retryable and not the last attempt, it's a temporary failure. + if retryable && msg.AttemptCount < int32(n.cfg.MaxSendAttempts)-1 { + result = ResultTempFail + } else { + result = ResultPermFail + } + + n.metrics.DispatchAttempts.WithLabelValues(string(n.method), msg.TemplateID.String(), result).Inc() + + return dispatchResult{ + notifier: n.id, + msg: msg.ID, + ts: time.Now(), + err: err, + retryable: retryable, + } +} + // stop stops the notifier from processing any new notifications. // This is a graceful stop, so any in-flight notifications will be completed before the notifier stops. // Once a notifier has stopped, it cannot be restarted. diff --git a/coderd/notifications/spec.go b/coderd/notifications/spec.go index bba0d4e183c5c..c41189ba3d582 100644 --- a/coderd/notifications/spec.go +++ b/coderd/notifications/spec.go @@ -18,7 +18,7 @@ type Store interface { AcquireNotificationMessages(ctx context.Context, params database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) - EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) + EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) error FetchNewMessageMetadata(ctx context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) GetNotificationMessagesByStatus(ctx context.Context, arg database.GetNotificationMessagesByStatusParams) ([]database.NotificationMessage, error) GetNotificationsSettings(ctx context.Context) (string, error) diff --git a/coderd/notifications/types/payload.go b/coderd/notifications/types/payload.go index a3067f456c18e..f6b18215e5357 100644 --- a/coderd/notifications/types/payload.go +++ b/coderd/notifications/types/payload.go @@ -8,7 +8,6 @@ type MessagePayload struct { Version string `json:"_version"` NotificationName string `json:"notification_name"` - CreatedBy string `json:"created_by"` UserID string `json:"user_id"` UserEmail string `json:"user_email"` diff --git a/coderd/notifications/utils_test.go b/coderd/notifications/utils_test.go index 74432f9c2617e..24cd361ede276 100644 --- a/coderd/notifications/utils_test.go +++ b/coderd/notifications/utils_test.go @@ -3,9 +3,12 @@ package notifications_test import ( "context" "database/sql" + "sync/atomic" "testing" "time" + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "cdr.dev/slog" @@ -17,6 +20,9 @@ import ( "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbmem" "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/notifications" + "github.com/coder/coder/v2/coderd/notifications/dispatch" + "github.com/coder/coder/v2/coderd/notifications/types" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/testutil" ) @@ -57,13 +63,13 @@ func defaultNotificationsConfig(method database.NotificationMethod) codersdk.Not return codersdk.NotificationsConfig{ Method: serpent.String(method), MaxSendAttempts: 5, - RetryInterval: serpent.Duration(time.Minute * 5), - StoreSyncInterval: serpent.Duration(time.Second * 2), - StoreSyncBufferSize: 50, - LeasePeriod: serpent.Duration(time.Minute * 2), + FetchInterval: serpent.Duration(time.Millisecond * 100), + StoreSyncInterval: serpent.Duration(time.Millisecond * 200), + LeasePeriod: serpent.Duration(time.Second * 10), + DispatchTimeout: serpent.Duration(time.Second * 5), + RetryInterval: serpent.Duration(time.Millisecond * 50), LeaseCount: 10, - FetchInterval: serpent.Duration(time.Second * 10), - DispatchTimeout: serpent.Duration(time.Minute), + StoreSyncBufferSize: 50, SMTP: codersdk.NotificationsEmailConfig{}, Webhook: codersdk.NotificationsWebhookConfig{}, } @@ -81,3 +87,47 @@ func createSampleUser(t *testing.T, db database.Store) database.User { Username: "bob", }) } + +func createMetrics() *notifications.Metrics { + return notifications.NewMetrics(prometheus.NewRegistry()) +} + +type dispatchInterceptor struct { + handler notifications.Handler + + sent atomic.Int32 + retryable atomic.Int32 + unretryable atomic.Int32 + err atomic.Int32 + lastErr atomic.Value +} + +func newDispatchInterceptor(h notifications.Handler) *dispatchInterceptor { + return &dispatchInterceptor{handler: h} +} + +func (i *dispatchInterceptor) Dispatcher(payload types.MessagePayload, title, body string) (dispatch.DeliveryFunc, error) { + return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) { + deliveryFn, err := i.handler.Dispatcher(payload, title, body) + if err != nil { + return false, err + } + + retryable, err = deliveryFn(ctx, msgID) + + if err != nil { + i.err.Add(1) + i.lastErr.Store(err) + } + + switch { + case !retryable && err == nil: + i.sent.Add(1) + case retryable: + i.retryable.Add(1) + case !retryable && err != nil: + i.unretryable.Add(1) + } + return retryable, err + }, nil +}