From 675aa8fee99053fc9f1f7d404f954985b0e26026 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Mon, 5 Aug 2024 19:53:56 +0200 Subject: [PATCH 1/2] feat: add notification deduplication trigger Signed-off-by: Danny Kopping --- cli/server.go | 4 +- coderd/database/dump.sql | 27 ++++++- .../000239_notifications_dedupe.down.sql | 4 ++ .../000239_notifications_dedupe.up.sql | 33 +++++++++ coderd/database/models.go | 2 + coderd/database/queries.sql.go | 12 ++-- coderd/database/queries/notifications.sql | 5 +- coderd/database/unique_constraint.go | 1 + coderd/notifications/enqueuer.go | 20 +++++- coderd/notifications/manager_test.go | 9 ++- coderd/notifications/metrics_test.go | 10 +-- coderd/notifications/notifications_test.go | 72 ++++++++++++++++--- 12 files changed, 171 insertions(+), 28 deletions(-) create mode 100644 coderd/database/migrations/000239_notifications_dedupe.down.sql create mode 100644 coderd/database/migrations/000239_notifications_dedupe.up.sql diff --git a/cli/server.go b/cli/server.go index f76872a78c342..1c1edfde58b7c 100644 --- a/cli/server.go +++ b/cli/server.go @@ -60,6 +60,8 @@ import ( "github.com/coder/serpent" "github.com/coder/wgtunnel/tunnelsdk" + "github.com/coder/quartz" + "github.com/coder/coder/v2/buildinfo" "github.com/coder/coder/v2/cli/clilog" "github.com/coder/coder/v2/cli/cliui" @@ -995,7 +997,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. metrics := notifications.NewMetrics(options.PrometheusRegistry) // The enqueuer is responsible for enqueueing notifications to the given store. - enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer")) + enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"), quartz.NewReal()) if err != nil { return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err) } diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index 0bcad08271da5..a608d14ef4393 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -223,6 +223,24 @@ CREATE TYPE workspace_transition AS ENUM ( 'delete' ); +CREATE FUNCTION compute_notification_message_dedupe_hash() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + NEW.dedupe_hash := MD5(CONCAT_WS(':', + NEW.notification_template_id, + NEW.user_id, + NEW.method, + NEW.payload::text, + ARRAY_TO_STRING(NEW.targets, ','), + DATE_TRUNC('day', NEW.created_at AT TIME ZONE 'UTC')::text + )); + RETURN NEW; +END; +$$; + +COMMENT ON FUNCTION compute_notification_message_dedupe_hash() IS 'Computes a unique hash which will be used to prevent duplicate messages from being enqueued on the same day'; + CREATE FUNCTION delete_deleted_oauth2_provider_app_token_api_key() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -587,9 +605,12 @@ CREATE TABLE notification_messages ( updated_at timestamp with time zone, leased_until timestamp with time zone, next_retry_after timestamp with time zone, - queued_seconds double precision + queued_seconds double precision, + dedupe_hash text ); +COMMENT ON COLUMN notification_messages.dedupe_hash IS 'Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day'; + CREATE TABLE notification_preferences ( user_id uuid NOT NULL, notification_template_id uuid NOT NULL, @@ -1790,6 +1811,8 @@ CREATE UNIQUE INDEX idx_users_email ON users USING btree (email) WHERE (deleted CREATE UNIQUE INDEX idx_users_username ON users USING btree (username) WHERE (deleted = false); +CREATE UNIQUE INDEX notification_messages_dedupe_hash_idx ON notification_messages USING btree (dedupe_hash); + CREATE UNIQUE INDEX organizations_single_default_org ON organizations USING btree (is_default) WHERE (is_default = true); CREATE INDEX provisioner_job_logs_id_job_id_idx ON provisioner_job_logs USING btree (job_id, id); @@ -1858,6 +1881,8 @@ CREATE TRIGGER trigger_update_users AFTER INSERT OR UPDATE ON users FOR EACH ROW CREATE TRIGGER trigger_upsert_user_links BEFORE INSERT OR UPDATE ON user_links FOR EACH ROW EXECUTE FUNCTION insert_user_links_fail_if_user_deleted(); +CREATE TRIGGER update_notification_message_dedupe_hash BEFORE INSERT OR UPDATE ON notification_messages FOR EACH ROW EXECUTE FUNCTION compute_notification_message_dedupe_hash(); + ALTER TABLE ONLY api_keys ADD CONSTRAINT api_keys_user_id_uuid_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE; diff --git a/coderd/database/migrations/000239_notifications_dedupe.down.sql b/coderd/database/migrations/000239_notifications_dedupe.down.sql new file mode 100644 index 0000000000000..6c5ef693c0533 --- /dev/null +++ b/coderd/database/migrations/000239_notifications_dedupe.down.sql @@ -0,0 +1,4 @@ +DROP TRIGGER IF EXISTS update_notification_message_dedupe_hash ON notification_messages; +DROP FUNCTION IF EXISTS compute_notification_message_dedupe_hash(); +ALTER TABLE IF EXISTS notification_messages + DROP COLUMN IF EXISTS dedupe_hash; \ No newline at end of file diff --git a/coderd/database/migrations/000239_notifications_dedupe.up.sql b/coderd/database/migrations/000239_notifications_dedupe.up.sql new file mode 100644 index 0000000000000..6a46a52884aac --- /dev/null +++ b/coderd/database/migrations/000239_notifications_dedupe.up.sql @@ -0,0 +1,33 @@ +-- Add a column to store the hash. +ALTER TABLE IF EXISTS notification_messages + ADD COLUMN IF NOT EXISTS dedupe_hash TEXT NULL; + +COMMENT ON COLUMN notification_messages.dedupe_hash IS 'Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day'; + +-- Ensure that multiple notifications with identical hashes cannot be inserted into the table. +CREATE UNIQUE INDEX ON notification_messages (dedupe_hash); + +-- Computes a hash from all unique messages fields and the current day; this will help prevent duplicate messages from being sent within the same day. +-- It is possible that a message could be sent at 23:59:59 and again at 00:00:00, but this should be good enough for now. +-- This could have been a unique index, but we cannot immutably create an index on a timestamp with a timezone. +CREATE OR REPLACE FUNCTION compute_notification_message_dedupe_hash() RETURNS TRIGGER AS +$$ +BEGIN + NEW.dedupe_hash := MD5(CONCAT_WS(':', + NEW.notification_template_id, + NEW.user_id, + NEW.method, + NEW.payload::text, + ARRAY_TO_STRING(NEW.targets, ','), + DATE_TRUNC('day', NEW.created_at AT TIME ZONE 'UTC')::text + )); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION compute_notification_message_dedupe_hash IS 'Computes a unique hash which will be used to prevent duplicate messages from being enqueued on the same day'; +CREATE TRIGGER update_notification_message_dedupe_hash + BEFORE INSERT OR UPDATE + ON notification_messages + FOR EACH ROW +EXECUTE FUNCTION compute_notification_message_dedupe_hash(); \ No newline at end of file diff --git a/coderd/database/models.go b/coderd/database/models.go index 4bd012e258fbd..217c9a2319a05 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -2093,6 +2093,8 @@ type NotificationMessage struct { LeasedUntil sql.NullTime `db:"leased_until" json:"leased_until"` NextRetryAfter sql.NullTime `db:"next_retry_after" json:"next_retry_after"` QueuedSeconds sql.NullFloat64 `db:"queued_seconds" json:"queued_seconds"` + // Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day + DedupeHash sql.NullString `db:"dedupe_hash" json:"dedupe_hash"` } type NotificationPreference struct { diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index d8a6e3a1abb03..c22de0475d0e6 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -3329,7 +3329,7 @@ WITH acquired AS ( FOR UPDATE OF nm SKIP LOCKED LIMIT $4) - RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds) + RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds, dedupe_hash) SELECT -- message nm.id, @@ -3504,14 +3504,15 @@ func (q *sqlQuerier) DeleteOldNotificationMessages(ctx context.Context) error { } const enqueueNotificationMessage = `-- name: EnqueueNotificationMessage :exec -INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by) +INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by, created_at) VALUES ($1, $2, $3, $4::notification_method, $5::jsonb, $6, - $7) + $7, + $8) ` type EnqueueNotificationMessageParams struct { @@ -3522,6 +3523,7 @@ type EnqueueNotificationMessageParams struct { Payload json.RawMessage `db:"payload" json:"payload"` Targets []uuid.UUID `db:"targets" json:"targets"` CreatedBy string `db:"created_by" json:"created_by"` + CreatedAt time.Time `db:"created_at" json:"created_at"` } func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) error { @@ -3533,6 +3535,7 @@ func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg Enqueue arg.Payload, pq.Array(arg.Targets), arg.CreatedBy, + arg.CreatedAt, ) return err } @@ -3583,7 +3586,7 @@ func (q *sqlQuerier) FetchNewMessageMetadata(ctx context.Context, arg FetchNewMe } const getNotificationMessagesByStatus = `-- name: GetNotificationMessagesByStatus :many -SELECT id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds +SELECT id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds, dedupe_hash FROM notification_messages WHERE status = $1 LIMIT $2::int @@ -3619,6 +3622,7 @@ func (q *sqlQuerier) GetNotificationMessagesByStatus(ctx context.Context, arg Ge &i.LeasedUntil, &i.NextRetryAfter, &i.QueuedSeconds, + &i.DedupeHash, ); err != nil { return nil, err } diff --git a/coderd/database/queries/notifications.sql b/coderd/database/queries/notifications.sql index f5b8601871ccc..4e16ae5798924 100644 --- a/coderd/database/queries/notifications.sql +++ b/coderd/database/queries/notifications.sql @@ -13,14 +13,15 @@ WHERE nt.id = @notification_template_id AND u.id = @user_id; -- name: EnqueueNotificationMessage :exec -INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by) +INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by, created_at) VALUES (@id, @notification_template_id, @user_id, @method::notification_method, @payload::jsonb, @targets, - @created_by); + @created_by, + @created_at); -- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending. -- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned. diff --git a/coderd/database/unique_constraint.go b/coderd/database/unique_constraint.go index f3f42ea0b72ad..58aa1eebdd6fc 100644 --- a/coderd/database/unique_constraint.go +++ b/coderd/database/unique_constraint.go @@ -88,6 +88,7 @@ const ( UniqueIndexProvisionerDaemonsNameOwnerKey UniqueConstraint = "idx_provisioner_daemons_name_owner_key" // CREATE UNIQUE INDEX idx_provisioner_daemons_name_owner_key ON provisioner_daemons USING btree (name, lower(COALESCE((tags ->> 'owner'::text), ''::text))); UniqueIndexUsersEmail UniqueConstraint = "idx_users_email" // CREATE UNIQUE INDEX idx_users_email ON users USING btree (email) WHERE (deleted = false); UniqueIndexUsersUsername UniqueConstraint = "idx_users_username" // CREATE UNIQUE INDEX idx_users_username ON users USING btree (username) WHERE (deleted = false); + UniqueNotificationMessagesDedupeHashIndex UniqueConstraint = "notification_messages_dedupe_hash_idx" // CREATE UNIQUE INDEX notification_messages_dedupe_hash_idx ON notification_messages USING btree (dedupe_hash); UniqueOrganizationsSingleDefaultOrg UniqueConstraint = "organizations_single_default_org" // CREATE UNIQUE INDEX organizations_single_default_org ON organizations USING btree (is_default) WHERE (is_default = true); UniqueProvisionerKeysOrganizationIDNameIndex UniqueConstraint = "provisioner_keys_organization_id_name_idx" // CREATE UNIQUE INDEX provisioner_keys_organization_id_name_idx ON provisioner_keys USING btree (organization_id, lower((name)::text)); UniqueTemplateUsageStatsStartTimeTemplateIDUserIDIndex UniqueConstraint = "template_usage_stats_start_time_template_id_user_id_idx" // CREATE UNIQUE INDEX template_usage_stats_start_time_template_id_user_id_idx ON template_usage_stats USING btree (start_time, template_id, user_id); diff --git a/coderd/notifications/enqueuer.go b/coderd/notifications/enqueuer.go index d990a71bdb5ad..e08ed1242ef8d 100644 --- a/coderd/notifications/enqueuer.go +++ b/coderd/notifications/enqueuer.go @@ -9,6 +9,8 @@ import ( "github.com/google/uuid" "golang.org/x/xerrors" + "github.com/coder/quartz" + "cdr.dev/slog" "github.com/coder/coder/v2/coderd/database" @@ -17,7 +19,10 @@ import ( "github.com/coder/coder/v2/codersdk" ) -var ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification") +var ( + ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification") + ErrDuplicate = xerrors.New("duplicate notification") +) type StoreEnqueuer struct { store Store @@ -27,10 +32,12 @@ type StoreEnqueuer struct { // helpers holds a map of template funcs which are used when rendering templates. These need to be passed in because // the template funcs will return values which are inappropriately encapsulated in this struct. helpers template.FuncMap + // Used to manipulate time in tests. + clock quartz.Clock } // NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store. -func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger) (*StoreEnqueuer, error) { +func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) { var method database.NotificationMethod if err := method.Scan(cfg.Method.String()); err != nil { return nil, xerrors.Errorf("given notification method %q is invalid", cfg.Method) @@ -41,6 +48,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem log: log, defaultMethod: method, helpers: helpers, + clock: clock, }, nil } @@ -81,6 +89,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI Payload: input, Targets: targets, CreatedBy: createdBy, + CreatedAt: s.clock.Now().UTC(), // mimicking dbtime.Now() }) if err != nil { // We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages @@ -92,6 +101,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI return nil, ErrCannotEnqueueDisabledNotification } + // If the enqueue fails due to a dedupe hash conflict, this means that a notification has already been enqueued + // today with identical properties. It's far simpler to prevent duplicate sends in this central manner, rather than + // having each notification enqueue handle its own logic. + if database.IsUniqueViolation(err, database.UniqueNotificationMessagesDedupeHashIndex) { + return nil, ErrDuplicate + } + s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err)) return nil, xerrors.Errorf("enqueue notification: %w", err) } diff --git a/coderd/notifications/manager_test.go b/coderd/notifications/manager_test.go index 2e264c534ccfa..04a47860d0945 100644 --- a/coderd/notifications/manager_test.go +++ b/coderd/notifications/manager_test.go @@ -12,13 +12,16 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/xerrors" + "github.com/coder/quartz" + + "github.com/coder/serpent" + "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/notifications" "github.com/coder/coder/v2/coderd/notifications/dispatch" "github.com/coder/coder/v2/coderd/notifications/types" "github.com/coder/coder/v2/testutil" - "github.com/coder/serpent" ) func TestBufferedUpdates(t *testing.T) { @@ -39,7 +42,7 @@ func TestBufferedUpdates(t *testing.T) { mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ database.NotificationMethodSmtp: santa, }) - enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal()) require.NoError(t, err) user := dbgen.User(t, db, database.User{}) @@ -127,7 +130,7 @@ func TestBuildPayload(t *testing.T) { } }) - enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer")) + enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal()) require.NoError(t, err) // WHEN: a notification is enqueued diff --git a/coderd/notifications/metrics_test.go b/coderd/notifications/metrics_test.go index 139f7ae18c6c6..52d5918564ed4 100644 --- a/coderd/notifications/metrics_test.go +++ b/coderd/notifications/metrics_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/coder/quartz" + "github.com/coder/serpent" "github.com/coder/coder/v2/coderd/database" @@ -61,7 +63,7 @@ func TestMetrics(t *testing.T) { method: handler, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -228,7 +230,7 @@ func TestPendingUpdatesMetric(t *testing.T) { method: handler, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -305,7 +307,7 @@ func TestInflightDispatchesMetric(t *testing.T) { method: delayer, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) @@ -384,7 +386,7 @@ func TestCustomMethodMetricCollection(t *testing.T) { customMethod: webhookHandler, }) - enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, store) diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index d73edbf7c453b..0f05c5f2b77f0 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -16,6 +16,8 @@ import ( "golang.org/x/xerrors" + "github.com/coder/quartz" + "github.com/google/uuid" smtpmock "github.com/mocktools/go-smtp-mock/v2" "github.com/stretchr/testify/assert" @@ -64,7 +66,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, db) @@ -138,7 +140,7 @@ func TestSMTPDispatch(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, db) @@ -198,7 +200,7 @@ func TestWebhookDispatch(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) const ( @@ -294,7 +296,7 @@ func TestBackpressure(t *testing.T) { mgr, err := notifications.NewManager(cfg, storeInterceptor, createMetrics(), logger.Named("manager")) require.NoError(t, err) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler}) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, db) @@ -392,7 +394,7 @@ func TestRetries(t *testing.T) { assert.NoError(t, mgr.Stop(ctx)) }) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler}) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, db) @@ -449,7 +451,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) { mgr, err := notifications.NewManager(cfg, noopInterceptor, createMetrics(), logger.Named("manager")) require.NoError(t, err) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, db) @@ -457,7 +459,8 @@ func TestExpiredLeaseIsRequeued(t *testing.T) { // WHEN: a few notifications are enqueued which will all succeed var msgs []string for i := 0; i < msgCount; i++ { - id, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "success"}, "test") + id, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, + map[string]string{"type": "success", "index": fmt.Sprintf("%d", i)}, "test") require.NoError(t, err) msgs = append(msgs, id.String()) } @@ -559,7 +562,7 @@ func TestNotifierPaused(t *testing.T) { t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) mgr.Run(ctx) @@ -718,7 +721,7 @@ func TestDisabledBeforeEnqueue(t *testing.T) { // GIVEN: an enqueuer & a sample user cfg := defaultNotificationsConfig(database.NotificationMethodSmtp) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, db) @@ -758,7 +761,7 @@ func TestDisabledAfterEnqueue(t *testing.T) { assert.NoError(t, mgr.Stop(ctx)) }) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer")) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) require.NoError(t, err) user := createSampleUser(t, db) @@ -864,7 +867,7 @@ func TestCustomNotificationMethod(t *testing.T) { _ = mgr.Stop(ctx) }) - enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger) + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger, quartz.NewReal()) require.NoError(t, err) // WHEN: a notification of that template is enqueued, it should be delivered with the configured method - not the default. @@ -893,6 +896,53 @@ func TestCustomNotificationMethod(t *testing.T) { }, testutil.WaitLong, testutil.IntervalFast) } +// TestNotificationDuplicates validates that identical notifications cannot be sent on the same day. +func TestNotificationDuplicates(t *testing.T) { + t.Parallel() + + // SETUP + if !dbtestutil.WillUsePostgres() { + t.Skip("This test requires postgres; it is testing the dedupe hash trigger in the database") + } + + ctx, logger, db := setup(t) + + method := database.NotificationMethodSmtp + cfg := defaultNotificationsConfig(method) + + mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager")) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, mgr.Stop(ctx)) + }) + + // Set the time to a known value. + mClock := quartz.NewMock(t) + mClock.Set(time.Date(2024, 1, 15, 9, 0, 0, 0, time.UTC)) + + enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), mClock) + require.NoError(t, err) + user := createSampleUser(t, db) + + // GIVEN: two notifications are enqueued with identical properties. + _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, + map[string]string{"initiator": "danny"}, "test", user.ID) + require.NoError(t, err) + + // WHEN: the second is enqueued, the enqueuer will reject the request. + _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, + map[string]string{"initiator": "danny"}, "test", user.ID) + require.ErrorIs(t, err, notifications.ErrDuplicate) + + // THEN: when the clock is advanced 24h, the notification will be accepted. + // NOTE: the time is used in the dedupe hash, so by advancing 24h we're creating a distinct notification from the one + // which was enqueued "yesterday". + mClock.Advance(time.Hour * 24) + _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, + map[string]string{"initiator": "danny"}, "test", user.ID) + require.NoError(t, err) +} + type fakeHandler struct { mu sync.RWMutex succeeded, failed []string From 6fc765b9d00a3f54e9291087650a2fa5a4d91238 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 21 Aug 2024 08:54:59 +0000 Subject: [PATCH 2/2] Review comments Signed-off-by: Danny Kopping --- ...dedupe.down.sql => 000245_notifications_dedupe.down.sql} | 0 ...ons_dedupe.up.sql => 000245_notifications_dedupe.up.sql} | 0 coderd/notifications/enqueuer.go | 6 +++--- coderd/notifications/manager_test.go | 1 - coderd/notifications/notifications_test.go | 2 +- 5 files changed, 4 insertions(+), 5 deletions(-) rename coderd/database/migrations/{000239_notifications_dedupe.down.sql => 000245_notifications_dedupe.down.sql} (100%) rename coderd/database/migrations/{000239_notifications_dedupe.up.sql => 000245_notifications_dedupe.up.sql} (100%) diff --git a/coderd/database/migrations/000239_notifications_dedupe.down.sql b/coderd/database/migrations/000245_notifications_dedupe.down.sql similarity index 100% rename from coderd/database/migrations/000239_notifications_dedupe.down.sql rename to coderd/database/migrations/000245_notifications_dedupe.down.sql diff --git a/coderd/database/migrations/000239_notifications_dedupe.up.sql b/coderd/database/migrations/000245_notifications_dedupe.up.sql similarity index 100% rename from coderd/database/migrations/000239_notifications_dedupe.up.sql rename to coderd/database/migrations/000245_notifications_dedupe.up.sql diff --git a/coderd/notifications/enqueuer.go b/coderd/notifications/enqueuer.go index e08ed1242ef8d..2915299ef26d5 100644 --- a/coderd/notifications/enqueuer.go +++ b/coderd/notifications/enqueuer.go @@ -9,11 +9,11 @@ import ( "github.com/google/uuid" "golang.org/x/xerrors" - "github.com/coder/quartz" - "cdr.dev/slog" + "github.com/coder/quartz" "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/notifications/render" "github.com/coder/coder/v2/coderd/notifications/types" "github.com/coder/coder/v2/codersdk" @@ -89,7 +89,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI Payload: input, Targets: targets, CreatedBy: createdBy, - CreatedAt: s.clock.Now().UTC(), // mimicking dbtime.Now() + CreatedAt: dbtime.Time(s.clock.Now().UTC()), }) if err != nil { // We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages diff --git a/coderd/notifications/manager_test.go b/coderd/notifications/manager_test.go index 926d810e98d3f..4b7eff4a01263 100644 --- a/coderd/notifications/manager_test.go +++ b/coderd/notifications/manager_test.go @@ -13,7 +13,6 @@ import ( "golang.org/x/xerrors" "github.com/coder/quartz" - "github.com/coder/serpent" "github.com/coder/coder/v2/coderd/database" diff --git a/coderd/notifications/notifications_test.go b/coderd/notifications/notifications_test.go index cebe73b5eb14f..8ecae8a904923 100644 --- a/coderd/notifications/notifications_test.go +++ b/coderd/notifications/notifications_test.go @@ -1050,7 +1050,7 @@ func TestNotificationDuplicates(t *testing.T) { method := database.NotificationMethodSmtp cfg := defaultNotificationsConfig(method) - mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager")) + mgr, err := notifications.NewManager(cfg, db, defaultHelpers(), createMetrics(), logger.Named("manager")) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx))