Skip to content

Commit 325471d

Browse files
committed
feat(coderd/notifications): notify pubsub on enqueue
1 parent 729e02e commit 325471d

File tree

5 files changed

+112
-10
lines changed

5 files changed

+112
-10
lines changed

coderd/notifications/enqueuer.go

+9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ var (
2727
ErrDuplicate = xerrors.New("duplicate notification")
2828
)
2929

30+
const EventNotificationEnqueued = "notification_enqueued"
31+
3032
type InvalidDefaultNotificationMethodError struct {
3133
Method string
3234
}
@@ -83,6 +85,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
8385
// Enqueue queues a notification message for later delivery.
8486
// Messages will be dequeued by a notifier later and dispatched.
8587
func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, data map[string]any, createdBy string, targets ...uuid.UUID) ([]uuid.UUID, error) {
88+
defer func() {
89+
// Publish an event to notify that a notification has been enqueued.
90+
// Failure to publish is acceptable, as the fetcher will still process the
91+
// message on its next run.
92+
// TODO(Cian): debounce this to maybe once per second or so?
93+
_ = s.ps.Publish(EventNotificationEnqueued, nil)
94+
}()
8695
metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{
8796
UserID: userID,
8897
NotificationTemplateID: templateID,

coderd/notifications/fetcher_internal_test.go

+23
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"golang.org/x/xerrors"
1212

1313
"github.com/coder/coder/v2/coderd/database/dbmock"
14+
"github.com/coder/coder/v2/coderd/database/pubsub/psmock"
1415
)
1516

1617
func TestNotifier_FetchHelpers(t *testing.T) {
@@ -21,9 +22,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {
2122

2223
ctrl := gomock.NewController(t)
2324
dbmock := dbmock.NewMockStore(ctrl)
25+
psmock := psmock.NewMockPubsub(ctrl)
2426

2527
n := &notifier{
2628
store: dbmock,
29+
ps: psmock,
2730
helpers: template.FuncMap{},
2831
}
2932

@@ -48,9 +51,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {
4851

4952
ctrl := gomock.NewController(t)
5053
dbmock := dbmock.NewMockStore(ctrl)
54+
psmock := psmock.NewMockPubsub(ctrl)
5155

5256
n := &notifier{
5357
store: dbmock,
58+
ps: psmock,
5459
helpers: template.FuncMap{},
5560
}
5661

@@ -67,9 +72,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {
6772

6873
ctrl := gomock.NewController(t)
6974
dbmock := dbmock.NewMockStore(ctrl)
75+
psmock := psmock.NewMockPubsub(ctrl)
7076

7177
n := &notifier{
7278
store: dbmock,
79+
ps: psmock,
7380
helpers: template.FuncMap{},
7481
}
7582

@@ -90,9 +97,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
9097

9198
ctrl := gomock.NewController(t)
9299
dbmock := dbmock.NewMockStore(ctrl)
100+
psmock := psmock.NewMockPubsub(ctrl)
93101

94102
n := &notifier{
95103
store: dbmock,
104+
ps: psmock,
96105
}
97106

98107
dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("ACME Inc.", nil)
@@ -107,9 +116,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
107116
t.Parallel()
108117
ctrl := gomock.NewController(t)
109118
dbmock := dbmock.NewMockStore(ctrl)
119+
psmock := psmock.NewMockPubsub(ctrl)
110120

111121
n := &notifier{
112122
store: dbmock,
123+
ps: psmock,
113124
}
114125

115126
dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", sql.ErrNoRows)
@@ -125,9 +136,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
125136

126137
ctrl := gomock.NewController(t)
127138
dbmock := dbmock.NewMockStore(ctrl)
139+
psmock := psmock.NewMockPubsub(ctrl)
128140

129141
n := &notifier{
130142
store: dbmock,
143+
ps: psmock,
131144
}
132145

133146
dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", nil)
@@ -143,9 +156,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
143156

144157
ctrl := gomock.NewController(t)
145158
dbmock := dbmock.NewMockStore(ctrl)
159+
psmock := psmock.NewMockPubsub(ctrl)
146160

147161
n := &notifier{
148162
store: dbmock,
163+
ps: psmock,
149164
}
150165

151166
dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", xerrors.New("internal error"))
@@ -164,9 +179,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
164179

165180
ctrl := gomock.NewController(t)
166181
dbmock := dbmock.NewMockStore(ctrl)
182+
psmock := psmock.NewMockPubsub(ctrl)
167183

168184
n := &notifier{
169185
store: dbmock,
186+
ps: psmock,
170187
}
171188

172189
dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("https://example.com/logo.png", nil)
@@ -181,9 +198,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
181198
t.Parallel()
182199
ctrl := gomock.NewController(t)
183200
dbmock := dbmock.NewMockStore(ctrl)
201+
psmock := psmock.NewMockPubsub(ctrl)
184202

185203
n := &notifier{
186204
store: dbmock,
205+
ps: psmock,
187206
}
188207

189208
dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", sql.ErrNoRows)
@@ -199,9 +218,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
199218

200219
ctrl := gomock.NewController(t)
201220
dbmock := dbmock.NewMockStore(ctrl)
221+
psmock := psmock.NewMockPubsub(ctrl)
202222

203223
n := &notifier{
204224
store: dbmock,
225+
ps: psmock,
205226
}
206227

207228
dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", nil)
@@ -217,9 +238,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
217238

218239
ctrl := gomock.NewController(t)
219240
dbmock := dbmock.NewMockStore(ctrl)
241+
psmock := psmock.NewMockPubsub(ctrl)
220242

221243
n := &notifier{
222244
store: dbmock,
245+
ps: psmock,
223246
}
224247

225248
dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", xerrors.New("internal error"))

coderd/notifications/manager.go

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub,
9494
log: log,
9595
cfg: cfg,
9696
store: store,
97+
ps: ps,
9798

9899
// Buffer successful/failed notification dispatches in memory to reduce load on the store.
99100
//

coderd/notifications/notifications_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,58 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) {
20882088
})
20892089
}
20902090

2091+
func TestNotificationEnqueuePubsubNotify(t *testing.T) {
2092+
t.Parallel()
2093+
if !dbtestutil.WillUsePostgres() {
2094+
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
2095+
}
2096+
2097+
store, pubsub := dbtestutil.NewDB(t)
2098+
logger := testutil.Logger(t)
2099+
// nolint:gocritic // Unit test.
2100+
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitShort))
2101+
2102+
const method = database.NotificationMethodWebhook
2103+
cfg := defaultNotificationsConfig(method)
2104+
2105+
// Tune the queue to fetch infrequently.
2106+
const fetchInterval = time.Minute
2107+
cfg.FetchInterval = serpent.Duration(fetchInterval)
2108+
2109+
mClock := quartz.NewMock(t)
2110+
fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
2111+
defer fetchTrap.Close()
2112+
2113+
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(),
2114+
logger.Named("manager"), notifications.WithTestClock(mClock))
2115+
require.NoError(t, err)
2116+
2117+
handler := &chanHandler{calls: make(chan dispatchCall)}
2118+
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
2119+
method: handler,
2120+
database.NotificationMethodInbox: handler,
2121+
})
2122+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock)
2123+
require.NoError(t, err)
2124+
2125+
user := createSampleUser(t, store)
2126+
2127+
// Given: the manager is running and the fetch interval is set to 1 minute.
2128+
mgr.Run(ctx)
2129+
fetchTrap.MustWait(ctx).Release()
2130+
2131+
// When: a notification is enqueued
2132+
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test")
2133+
require.NoError(t, err)
2134+
2135+
// Then: we attempt to dispatch the notification immediately.
2136+
call := testutil.TryReceive(ctx, t, handler.calls)
2137+
testutil.RequireSend(ctx, t, call.result, dispatchResult{
2138+
retryable: false,
2139+
err: nil,
2140+
})
2141+
}
2142+
20912143
type fakeHandler struct {
20922144
mu sync.RWMutex
20932145
succeeded, failed []string

coderd/notifications/notifier.go

+27-10
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,6 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
108108
n.log.Info(context.Background(), "gracefully stopped")
109109
}()
110110

111-
// TODO: idea from Cian: instead of querying the database on a short interval, we could wait for pubsub notifications.
112-
// if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these?
113-
// PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get
114-
// triggered by a code path, but rather by a timeout expiring which makes the message retryable)
115-
116111
// loopTick is used to synchronize the goroutine that processes messages with the ticker.
117112
loopTick := make(chan chan struct{})
118113
// loopDone is used to signal when the processing loop has exited due to
@@ -139,7 +134,7 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
139134
}
140135
}()
141136

142-
// run the ticker with the graceful context, so we stop fetching after stop() is called
137+
// Periodically trigger the processing loop.
143138
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
144139
c := make(chan struct{})
145140
loopTick <- c
@@ -149,10 +144,32 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
149144
return nil
150145
}, "notifier", "fetchInterval")
151146

152-
// Note the order of operations here.
153-
_ = tick.Wait() // will block until gracefulCtx is done
154-
close(loopTick) // happens immediately
155-
<-loopDone // wait for the current processing loop to finish
147+
// Also signal the processing loop when a notification is enqueued.
148+
if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) {
149+
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("event", EventNotificationEnqueued))
150+
c := make(chan struct{})
151+
select {
152+
case <-n.gracefulCtx.Done():
153+
return
154+
// This is a no-op if the notifier is paused.
155+
case loopTick <- c:
156+
<-c // Wait for the processing loop to finish.
157+
default:
158+
// If the loop is busy, don't send a notification.
159+
n.log.Debug(ctx, "notifier busy, skipping notification")
160+
return
161+
}
162+
}); err != nil {
163+
// Intentionally not making this a fatal error. The notifier will still run,
164+
// albeit without notification events.
165+
n.log.Error(n.outerCtx, "failed to subscribe to notification events", slog.Error(err))
166+
} else {
167+
defer stopListen()
168+
}
169+
170+
_ = tick.Wait() // Block until the ticker exits. This will be after gracefulCtx is canceled.
171+
close(loopTick) // Signal the processing goroutine to stop.
172+
<-loopDone // Wait for the processing goroutine to exit.
156173

157174
// only errors we can return are context errors. Only return an error if the outer context
158175
// was canceled, not if we were gracefully stopped.

0 commit comments

Comments
 (0)