Skip to content

Commit eeedb44

Browse files
committed
start adding tests for pubsub notif
1 parent 1d28ccf commit eeedb44

File tree

2 files changed

+62
-4
lines changed

2 files changed

+62
-4
lines changed

coderd/notifications/notifications_test.go

+58
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,64 @@ func TestNotificationOneTimePasswordDeliveryTargets(t *testing.T) {
20882088
})
20892089
}
20902090

2091+
func TestNotificationEnqueuePubsubNotify(t *testing.T) {
2092+
t.Parallel()
2093+
if !dbtestutil.WillUsePostgres() {
2094+
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
2095+
}
2096+
2097+
// Given: we are between notify dispatch intervals.
2098+
store, pubsub := dbtestutil.NewDB(t)
2099+
logger := testutil.Logger(t)
2100+
// nolint:gocritic // Unit test.
2101+
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitShort))
2102+
2103+
const method = database.NotificationMethodWebhook
2104+
cfg := defaultNotificationsConfig(method)
2105+
2106+
// Tune the queue to fetch infrequently.
2107+
const fetchInterval = time.Minute
2108+
cfg.FetchInterval = serpent.Duration(fetchInterval)
2109+
2110+
handler := &chanHandler{calls: make(chan dispatchCall)}
2111+
2112+
mClock := quartz.NewMock(t)
2113+
fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
2114+
defer fetchTrap.Close()
2115+
2116+
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(),
2117+
logger.Named("manager"), notifications.WithTestClock(mClock))
2118+
require.NoError(t, err)
2119+
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
2120+
method: handler,
2121+
database.NotificationMethodInbox: handler,
2122+
})
2123+
enq, err := notifications.NewStoreEnqueuer(cfg, store, pubsub, defaultHelpers(), logger.Named("enqueuer"), mClock)
2124+
require.NoError(t, err)
2125+
2126+
user := createSampleUser(t, store)
2127+
2128+
// When: a notification is enqueued
2129+
2130+
// Then: we attempt to dispatch the notification immediately.
2131+
mgr.Run(ctx)
2132+
wt := fetchTrap.MustWait(ctx)
2133+
// TODO: this happens *before* manager starts listening. Need to wait.
2134+
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test")
2135+
require.NoError(t, err)
2136+
wt.Release()
2137+
d, w := mClock.AdvanceNext()
2138+
t.Logf("advanced %s", d)
2139+
2140+
call := testutil.TryReceive(ctx, t, handler.calls)
2141+
testutil.RequireSend(ctx, t, call.result, dispatchResult{
2142+
retryable: false,
2143+
err: nil,
2144+
})
2145+
2146+
<-w.Done() // TODO: this hangs and fails.
2147+
}
2148+
20912149
type fakeHandler struct {
20922150
mu sync.RWMutex
20932151
succeeded, failed []string

coderd/notifications/notifier.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
151151

152152
// Also signal the processing loop when a notification is enqueued.
153153
if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) {
154+
n.log.Debug(n.outerCtx, "got pubsub event", slog.F("event", EventNotificationEnqueued))
154155
c := make(chan struct{})
155156
select {
156157
case <-ctx.Done():
@@ -170,10 +171,9 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
170171
defer stopListen()
171172
}
172173

173-
// Note the order of operations here.
174-
_ = tick.Wait() // will block until gracefulCtx is done
175-
close(loopTick) // happens immediately
176-
<-loopDone // wait for the current processing loop to finish
174+
_ = tick.Wait() // Block until the ticker exits. This will be after gracefulCtx is canceled.
175+
close(loopTick) // Signal the processing goroutine to stop.
176+
<-loopDone // Wait for the processing goroutine to exit.
177177

178178
// only errors we can return are context errors. Only return an error if the outer context
179179
// was canceled, not if we were gracefully stopped.

0 commit comments

Comments
 (0)