Skip to content

Commit e571901

Browse files
committed
feat: add notification deduplication trigger
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent e164b1e commit e571901

File tree

12 files changed

+167
-27
lines changed

12 files changed

+167
-27
lines changed

cli/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ import (
6060
"github.com/coder/serpent"
6161
"github.com/coder/wgtunnel/tunnelsdk"
6262

63+
"github.com/coder/quartz"
64+
6365
"github.com/coder/coder/v2/buildinfo"
6466
"github.com/coder/coder/v2/cli/clilog"
6567
"github.com/coder/coder/v2/cli/cliui"
@@ -995,7 +997,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
995997
metrics := notifications.NewMetrics(options.PrometheusRegistry)
996998

997999
// The enqueuer is responsible for enqueueing notifications to the given store.
998-
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"))
1000+
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"), quartz.NewReal())
9991001
if err != nil {
10001002
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
10011003
}

coderd/database/dump.sql

Lines changed: 26 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
DROP TRIGGER IF EXISTS update_notification_message_dedupe_hash ON notification_messages;
2+
DROP FUNCTION IF EXISTS compute_notification_message_dedupe_hash();
3+
ALTER TABLE IF EXISTS notification_messages
4+
DROP COLUMN IF EXISTS dedupe_hash;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
-- Add a column to store the hash.
2+
ALTER TABLE IF EXISTS notification_messages
3+
ADD COLUMN IF NOT EXISTS dedupe_hash TEXT NULL;
4+
5+
COMMENT ON COLUMN notification_messages.dedupe_hash IS 'Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day';
6+
7+
-- Ensure that multiple notifications with identical hashes cannot be inserted into the table.
8+
CREATE UNIQUE INDEX ON notification_messages (dedupe_hash);
9+
10+
-- Computes a hash from all unique messages fields and the current day; this will help prevent duplicate messages from being sent within the same day.
11+
-- It is possible that a message could be sent at 23:59:59 and again at 00:00:00, but this should be good enough for now.
12+
-- This could have been a unique index, but we cannot immutably create an index on a timestamp with a timezone.
13+
CREATE OR REPLACE FUNCTION compute_notification_message_dedupe_hash() RETURNS TRIGGER AS
14+
$$
15+
BEGIN
16+
NEW.dedupe_hash := MD5(CONCAT_WS(':',
17+
NEW.notification_template_id,
18+
NEW.user_id,
19+
NEW.method,
20+
NEW.payload::text,
21+
ARRAY_TO_STRING(NEW.targets, ','),
22+
DATE_TRUNC('day', NEW.created_at AT TIME ZONE 'UTC')::text
23+
));
24+
RETURN NEW;
25+
END;
26+
$$ LANGUAGE plpgsql;
27+
28+
COMMENT ON FUNCTION compute_notification_message_dedupe_hash IS 'Computes a unique hash which will be used to prevent duplicate messages from being enqueued on the same day';
29+
CREATE TRIGGER update_notification_message_dedupe_hash
30+
BEFORE INSERT OR UPDATE
31+
ON notification_messages
32+
FOR EACH ROW
33+
EXECUTE FUNCTION compute_notification_message_dedupe_hash();

coderd/database/models.go

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 8 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/notifications.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ WHERE nt.id = @notification_template_id
1313
AND u.id = @user_id;
1414

1515
-- name: EnqueueNotificationMessage :exec
16-
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
16+
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by, created_at)
1717
VALUES (@id,
1818
@notification_template_id,
1919
@user_id,
2020
@method::notification_method,
2121
@payload::jsonb,
2222
@targets,
23-
@created_by);
23+
@created_by,
24+
@created_at);
2425

2526
-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
2627
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.

coderd/database/unique_constraint.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/notifications/enqueuer.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"github.com/google/uuid"
1010
"golang.org/x/xerrors"
1111

12+
"github.com/coder/quartz"
13+
1214
"cdr.dev/slog"
1315

1416
"github.com/coder/coder/v2/coderd/database"
@@ -17,7 +19,10 @@ import (
1719
"github.com/coder/coder/v2/codersdk"
1820
)
1921

20-
var ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification")
22+
var (
23+
ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification")
24+
ErrDuplicate = xerrors.New("duplicate notification")
25+
)
2126

2227
type StoreEnqueuer struct {
2328
store Store
@@ -27,10 +32,12 @@ type StoreEnqueuer struct {
2732
// helpers holds a map of template funcs which are used when rendering templates. These need to be passed in because
2833
// the template funcs will return values which are inappropriately encapsulated in this struct.
2934
helpers template.FuncMap
35+
// Used to manipulate time in tests.
36+
clock quartz.Clock
3037
}
3138

3239
// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store.
33-
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger) (*StoreEnqueuer, error) {
40+
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
3441
var method database.NotificationMethod
3542
if err := method.Scan(cfg.Method.String()); err != nil {
3643
return nil, xerrors.Errorf("given notification method %q is invalid", cfg.Method)
@@ -41,6 +48,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem
4148
log: log,
4249
defaultMethod: method,
4350
helpers: helpers,
51+
clock: clock,
4452
}, nil
4553
}
4654

@@ -81,6 +89,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
8189
Payload: input,
8290
Targets: targets,
8391
CreatedBy: createdBy,
92+
CreatedAt: s.clock.Now().UTC(), // mimicking dbtime.Now()
8493
})
8594
if err != nil {
8695
// We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages
@@ -92,6 +101,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
92101
return nil, ErrCannotEnqueueDisabledNotification
93102
}
94103

104+
// If the enqueue fails due to a dedupe hash conflict, this means that a notification has already been enqueued
105+
// today with identical properties. It's far simpler to prevent duplicate sends in this central manner, rather than
106+
// having each notification enqueue handle its own logic.
107+
if database.IsUniqueViolation(err, database.UniqueNotificationMessagesDedupeHashIndex) {
108+
return nil, ErrDuplicate
109+
}
110+
95111
s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err))
96112
return nil, xerrors.Errorf("enqueue notification: %w", err)
97113
}

coderd/notifications/manager_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"golang.org/x/xerrors"
1414

15+
"github.com/coder/quartz"
16+
17+
"github.com/coder/serpent"
18+
1519
"github.com/coder/coder/v2/coderd/database"
1620
"github.com/coder/coder/v2/coderd/database/dbgen"
1721
"github.com/coder/coder/v2/coderd/notifications"
1822
"github.com/coder/coder/v2/coderd/notifications/dispatch"
1923
"github.com/coder/coder/v2/coderd/notifications/types"
2024
"github.com/coder/coder/v2/testutil"
21-
"github.com/coder/serpent"
2225
)
2326

2427
func TestBufferedUpdates(t *testing.T) {
@@ -39,7 +42,7 @@ func TestBufferedUpdates(t *testing.T) {
3942
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
4043
database.NotificationMethodSmtp: santa,
4144
})
42-
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"))
45+
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
4346
require.NoError(t, err)
4447

4548
user := dbgen.User(t, db, database.User{})
@@ -127,7 +130,7 @@ func TestBuildPayload(t *testing.T) {
127130
}
128131
})
129132

130-
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"))
133+
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
131134
require.NoError(t, err)
132135

133136
// WHEN: a notification is enqueued

coderd/notifications/metrics_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/stretchr/testify/assert"
1414
"github.com/stretchr/testify/require"
1515

16+
"github.com/coder/quartz"
17+
1618
"github.com/coder/serpent"
1719

1820
"github.com/coder/coder/v2/coderd/database"
@@ -61,7 +63,7 @@ func TestMetrics(t *testing.T) {
6163
method: handler,
6264
})
6365

64-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
66+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
6567
require.NoError(t, err)
6668

6769
user := createSampleUser(t, store)
@@ -228,7 +230,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
228230
method: handler,
229231
})
230232

231-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
233+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
232234
require.NoError(t, err)
233235

234236
user := createSampleUser(t, store)
@@ -305,7 +307,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
305307
method: delayer,
306308
})
307309

308-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
310+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
309311
require.NoError(t, err)
310312

311313
user := createSampleUser(t, store)
@@ -384,7 +386,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
384386
customMethod: webhookHandler,
385387
})
386388

387-
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
389+
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
388390
require.NoError(t, err)
389391

390392
user := createSampleUser(t, store)

0 commit comments

Comments
 (0)