Skip to content

Commit 9c8c6a9

Browse files
authored
feat: add notification deduplication trigger (coder#14172)
1 parent d9f4193 commit 9c8c6a9

12 files changed

+168
-28
lines changed

cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1005,7 +1005,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
10051005
helpers := templateHelpers(options)
10061006

10071007
// The enqueuer is responsible for enqueueing notifications to the given store.
1008-
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, helpers, logger.Named("notifications.enqueuer"))
1008+
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal())
10091009
if err != nil {
10101010
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
10111011
}

coderd/database/dump.sql

Lines changed: 26 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
DROP TRIGGER IF EXISTS update_notification_message_dedupe_hash ON notification_messages;
2+
DROP FUNCTION IF EXISTS compute_notification_message_dedupe_hash();
3+
ALTER TABLE IF EXISTS notification_messages
4+
DROP COLUMN IF EXISTS dedupe_hash;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
-- Add a column to store the hash.
2+
ALTER TABLE IF EXISTS notification_messages
3+
ADD COLUMN IF NOT EXISTS dedupe_hash TEXT NULL;
4+
5+
COMMENT ON COLUMN notification_messages.dedupe_hash IS 'Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day';
6+
7+
-- Ensure that multiple notifications with identical hashes cannot be inserted into the table.
8+
CREATE UNIQUE INDEX ON notification_messages (dedupe_hash);
9+
10+
-- Computes a hash from all unique messages fields and the current day; this will help prevent duplicate messages from being sent within the same day.
11+
-- It is possible that a message could be sent at 23:59:59 and again at 00:00:00, but this should be good enough for now.
12+
-- This could have been a unique index, but we cannot immutably create an index on a timestamp with a timezone.
13+
CREATE OR REPLACE FUNCTION compute_notification_message_dedupe_hash() RETURNS TRIGGER AS
14+
$$
15+
BEGIN
16+
NEW.dedupe_hash := MD5(CONCAT_WS(':',
17+
NEW.notification_template_id,
18+
NEW.user_id,
19+
NEW.method,
20+
NEW.payload::text,
21+
ARRAY_TO_STRING(NEW.targets, ','),
22+
DATE_TRUNC('day', NEW.created_at AT TIME ZONE 'UTC')::text
23+
));
24+
RETURN NEW;
25+
END;
26+
$$ LANGUAGE plpgsql;
27+
28+
COMMENT ON FUNCTION compute_notification_message_dedupe_hash IS 'Computes a unique hash which will be used to prevent duplicate messages from being enqueued on the same day';
29+
CREATE TRIGGER update_notification_message_dedupe_hash
30+
BEFORE INSERT OR UPDATE
31+
ON notification_messages
32+
FOR EACH ROW
33+
EXECUTE FUNCTION compute_notification_message_dedupe_hash();

coderd/database/models.go

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 8 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/notifications.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ WHERE nt.id = @notification_template_id
1313
AND u.id = @user_id;
1414

1515
-- name: EnqueueNotificationMessage :exec
16-
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
16+
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by, created_at)
1717
VALUES (@id,
1818
@notification_template_id,
1919
@user_id,
2020
@method::notification_method,
2121
@payload::jsonb,
2222
@targets,
23-
@created_by);
23+
@created_by,
24+
@created_at);
2425

2526
-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
2627
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.

coderd/database/unique_constraint.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/notifications/enqueuer.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,19 @@ import (
1010
"golang.org/x/xerrors"
1111

1212
"cdr.dev/slog"
13+
"github.com/coder/quartz"
1314

1415
"github.com/coder/coder/v2/coderd/database"
16+
"github.com/coder/coder/v2/coderd/database/dbtime"
1517
"github.com/coder/coder/v2/coderd/notifications/render"
1618
"github.com/coder/coder/v2/coderd/notifications/types"
1719
"github.com/coder/coder/v2/codersdk"
1820
)
1921

20-
var ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification")
22+
var (
23+
ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification")
24+
ErrDuplicate = xerrors.New("duplicate notification")
25+
)
2126

2227
type StoreEnqueuer struct {
2328
store Store
@@ -27,10 +32,12 @@ type StoreEnqueuer struct {
2732
// helpers holds a map of template funcs which are used when rendering templates. These need to be passed in because
2833
// the template funcs will return values which are inappropriately encapsulated in this struct.
2934
helpers template.FuncMap
35+
// Used to manipulate time in tests.
36+
clock quartz.Clock
3037
}
3138

3239
// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store.
33-
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger) (*StoreEnqueuer, error) {
40+
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
3441
var method database.NotificationMethod
3542
if err := method.Scan(cfg.Method.String()); err != nil {
3643
return nil, xerrors.Errorf("given notification method %q is invalid", cfg.Method)
@@ -41,6 +48,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem
4148
log: log,
4249
defaultMethod: method,
4350
helpers: helpers,
51+
clock: clock,
4452
}, nil
4553
}
4654

@@ -81,6 +89,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
8189
Payload: input,
8290
Targets: targets,
8391
CreatedBy: createdBy,
92+
CreatedAt: dbtime.Time(s.clock.Now().UTC()),
8493
})
8594
if err != nil {
8695
// We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages
@@ -92,6 +101,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
92101
return nil, ErrCannotEnqueueDisabledNotification
93102
}
94103

104+
// If the enqueue fails due to a dedupe hash conflict, this means that a notification has already been enqueued
105+
// today with identical properties. It's far simpler to prevent duplicate sends in this central manner, rather than
106+
// having each notification enqueue handle its own logic.
107+
if database.IsUniqueViolation(err, database.UniqueNotificationMessagesDedupeHashIndex) {
108+
return nil, ErrDuplicate
109+
}
110+
95111
s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err))
96112
return nil, xerrors.Errorf("enqueue notification: %w", err)
97113
}

coderd/notifications/manager_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"golang.org/x/xerrors"
1414

15+
"github.com/coder/quartz"
16+
"github.com/coder/serpent"
17+
1518
"github.com/coder/coder/v2/coderd/database"
1619
"github.com/coder/coder/v2/coderd/database/dbgen"
1720
"github.com/coder/coder/v2/coderd/notifications"
1821
"github.com/coder/coder/v2/coderd/notifications/dispatch"
1922
"github.com/coder/coder/v2/coderd/notifications/types"
2023
"github.com/coder/coder/v2/testutil"
21-
"github.com/coder/serpent"
2224
)
2325

2426
func TestBufferedUpdates(t *testing.T) {
@@ -39,7 +41,7 @@ func TestBufferedUpdates(t *testing.T) {
3941
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
4042
database.NotificationMethodSmtp: santa,
4143
})
42-
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"))
44+
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
4345
require.NoError(t, err)
4446

4547
user := dbgen.User(t, db, database.User{})
@@ -127,7 +129,7 @@ func TestBuildPayload(t *testing.T) {
127129
}
128130
})
129131

130-
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"))
132+
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
131133
require.NoError(t, err)
132134

133135
// WHEN: a notification is enqueued

coderd/notifications/metrics_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/stretchr/testify/assert"
1414
"github.com/stretchr/testify/require"
1515

16+
"github.com/coder/quartz"
17+
1618
"github.com/coder/serpent"
1719

1820
"github.com/coder/coder/v2/coderd/database"
@@ -61,7 +63,7 @@ func TestMetrics(t *testing.T) {
6163
method: handler,
6264
})
6365

64-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
66+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
6567
require.NoError(t, err)
6668

6769
user := createSampleUser(t, store)
@@ -228,7 +230,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
228230
method: handler,
229231
})
230232

231-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
233+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
232234
require.NoError(t, err)
233235

234236
user := createSampleUser(t, store)
@@ -305,7 +307,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
305307
method: delayer,
306308
})
307309

308-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
310+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
309311
require.NoError(t, err)
310312

311313
user := createSampleUser(t, store)
@@ -384,7 +386,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
384386
customMethod: webhookHandler,
385387
})
386388

387-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
389+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
388390
require.NoError(t, err)
389391

390392
user := createSampleUser(t, store)

0 commit comments

Comments
 (0)