Skip to content

chore: refactor notifier to use quartz.TickerFunc #15134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions coderd/notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ func (m *Manager) loop(ctx context.Context) error {
var eg errgroup.Group

// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
eg.Go(func() error {
return m.notifier.run(ctx, m.success, m.failure)
return m.notifier.run(m.success, m.failure)
})

// Periodically flush notification state changes to the store.
Expand Down
22 changes: 14 additions & 8 deletions coderd/notifications/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,16 @@ func TestPendingUpdatesMetric(t *testing.T) {
// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
cfg := defaultNotificationsConfig(method)
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)

syncer := &syncInterceptor{Store: store}
interceptor := newUpdateSignallingInterceptor(syncer)
mClock := quartz.NewMock(t)
trap := mClock.Trap().NewTicker("Manager", "storeSync")
defer trap.Close()
fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
defer fetchTrap.Close()
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, logger.Named("manager"),
notifications.WithTestClock(mClock))
require.NoError(t, err)
Expand All @@ -256,24 +259,27 @@ func TestPendingUpdatesMetric(t *testing.T) {

mgr.Run(ctx)
trap.MustWait(ctx).Release() // ensures ticker has been set
fetchTrap.MustWait(ctx).Release()

// Advance to the first fetch
mClock.Advance(cfg.FetchInterval.Value()).MustWait(ctx)

// THEN:
// Wait until the handler has dispatched the given notifications.
require.Eventually(t, func() bool {
// handler has dispatched the given notifications.
func() {
handler.mu.RLock()
defer handler.mu.RUnlock()

return len(handler.succeeded) == 1 && len(handler.failed) == 1
}, testutil.WaitShort, testutil.IntervalFast)
require.Len(t, handler.succeeded, 1)
require.Len(t, handler.failed, 1)
}()

// Both handler calls should be pending in the metrics.
require.Eventually(t, func() bool {
return promtest.ToFloat64(metrics.PendingUpdates) == float64(2)
}, testutil.WaitShort, testutil.IntervalFast)
require.EqualValues(t, 2, promtest.ToFloat64(metrics.PendingUpdates))

// THEN:
// Trigger syncing updates
mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx)
mClock.Advance(cfg.StoreSyncInterval.Value() - cfg.FetchInterval.Value()).MustWait(ctx)

// Wait until we intercept the calls to sync the pending updates to the store.
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
Expand Down
107 changes: 71 additions & 36 deletions coderd/notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -278,86 +277,122 @@ func TestBackpressure(t *testing.T) {
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
}

// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitShort))

// Mock server to simulate webhook endpoint.
var received atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var payload dispatch.WebhookPayload
err := json.NewDecoder(r.Body).Decode(&payload)
assert.NoError(t, err)

w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte("noted."))
assert.NoError(t, err)

received.Add(1)
}))
defer server.Close()

endpoint, err := url.Parse(server.URL)
require.NoError(t, err)

method := database.NotificationMethodWebhook
const method = database.NotificationMethodWebhook
cfg := defaultNotificationsConfig(method)
cfg.Webhook = codersdk.NotificationsWebhookConfig{
Endpoint: *serpent.URLOf(endpoint),
}

// Tune the queue to fetch often.
const fetchInterval = time.Millisecond * 200
const batchSize = 10
cfg.FetchInterval = serpent.Duration(fetchInterval)
cfg.LeaseCount = serpent.Int64(batchSize)
// never time out for this test
cfg.LeasePeriod = serpent.Duration(time.Hour)
cfg.DispatchTimeout = serpent.Duration(time.Hour - time.Millisecond)

// Shrink buffers down and increase flush interval to provoke backpressure.
// Flush buffers every 5 fetch intervals.
const syncInterval = time.Second
cfg.StoreSyncInterval = serpent.Duration(syncInterval)
cfg.StoreSyncBufferSize = serpent.Int64(2)

handler := newDispatchInterceptor(dispatch.NewWebhookHandler(cfg.Webhook, logger.Named("webhook")))
handler := &chanHandler{calls: make(chan dispatchCall)}

// Intercept calls to submit the buffered updates to the store.
storeInterceptor := &syncInterceptor{Store: store}

mClock := quartz.NewMock(t)
syncTrap := mClock.Trap().NewTicker("Manager", "storeSync")
defer syncTrap.Close()
fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
defer fetchTrap.Close()

// GIVEN: a notification manager whose updates will be intercepted
mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(),
logger.Named("manager"), notifications.WithTestClock(mClock))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock)
require.NoError(t, err)

user := createSampleUser(t, store)

// WHEN: a set of notifications are enqueued, which causes backpressure due to the batchSize which can be processed per fetch
const totalMessages = 30
for i := 0; i < totalMessages; i++ {
for i := range totalMessages {
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, "test")
require.NoError(t, err)
}

// Start the notifier.
mgr.Run(ctx)
syncTrap.MustWait(ctx).Release()
fetchTrap.MustWait(ctx).Release()

// THEN:

// Wait for 3 fetch intervals, then check progress.
time.Sleep(fetchInterval * 3)
// Trigger a fetch
w := mClock.Advance(fetchInterval)

// one batch of dispatches is sent
for range batchSize {
call := testutil.RequireRecvCtx(ctx, t, handler.calls)
testutil.RequireSendCtx(ctx, t, call.result, dispatchResult{
retryable: false,
err: nil,
})
}

// The first fetch will not complete, because of the short sync buffer of 2. This is the
// backpressure.
select {
case <-time.After(testutil.IntervalMedium):
// success
case <-w.Done():
t.Fatal("fetch completed despite backpressure")
}

// We expect the notifier will have dispatched ONLY the initial batch of messages.
// In other words, the notifier should have dispatched 3 batches by now, but because the buffered updates have not
// been processed: there is backpressure.
require.EqualValues(t, batchSize, handler.sent.Load()+handler.err.Load())
// We expect that the store will have received NO updates.
require.EqualValues(t, 0, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())

// However, when we Stop() the manager the backpressure will be relieved and the buffered updates will ALL be flushed,
// since all the goroutines that were blocked (on writing updates to the buffer) will be unblocked and will complete.
require.NoError(t, mgr.Stop(ctx))
// Stop() waits for the in-progress flush to complete, meaning we have to advance the time such that sync triggers
// a total of (batchSize/StoreSyncBufferSize)-1 times. The -1 is because once we run the penultimate sync, it
// clears space in the buffer for the last dispatches of the batch, which allows graceful shutdown to continue
// immediately, without waiting for the last trigger.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dannykopping this behavior of the manager surprised me: it doesn't immediately flush when you call Stop(), it waits until the notifier exits. In this sense, it was never really the Stop() that relieved the backpressure in this test, it was that we wait around until the sync triggers enough times to relieve the backpressure. Calling Stop() early prevents another fetch from starting tho.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding additional clarification in the comment here.
This looks great!

this behavior of the manager surprised me

This is a smell for me; is there a way we could make this more explicit?

I worry about requiring a sequence of calls (i.e. Drain() + Stop()) to properly shutdown the manager (which would make this more clear but would introduce some potential problems if not called correctly or at all).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do worry that we can possibly wait around a long time for Graceful shutdown. It's up to 2s by default for a Sync, then the sync itself times out after 30s. The delivery of notifications times out after 60s.

Normally we expect Graceful shutdown to take 5-15 seconds or less, otherwise humans or cluster managers are starting to get likely to just kill us anyway. Maybe the initial, high level context can serve this purpose, but right now it's just tied to the CLI command, and doesn't get canceled in practice. If you moved the Manager to within coderd, then I think the API context does eventually get canceled.

stopErr := make(chan error, 1)
go func() {
stopErr <- mgr.Stop(ctx)
}()
elapsed := fetchInterval
syncEnd := time.Duration(batchSize/cfg.StoreSyncBufferSize.Value()-1) * cfg.StoreSyncInterval.Value()
t.Logf("will advance until %dms have elapsed", syncEnd.Milliseconds())
for elapsed < syncEnd {
d, wt := mClock.AdvanceNext()
elapsed += d
t.Logf("elapsed: %dms", elapsed.Milliseconds())
// fetches complete immediately, since TickerFunc only allows one call to the callback in flight at at time.
wt.MustWait(ctx)
if elapsed%cfg.StoreSyncInterval.Value() == 0 {
numSent := cfg.StoreSyncBufferSize.Value() * int64(elapsed/cfg.StoreSyncInterval.Value())
t.Logf("waiting for %d messages", numSent)
require.Eventually(t, func() bool {
// need greater or equal because the last set of messages can come immediately due
// to graceful shut down
return int64(storeInterceptor.sent.Load()) >= numSent
}, testutil.WaitShort, testutil.IntervalFast)
}
}
t.Logf("done advancing")
// The batch completes
w.MustWait(ctx)

require.NoError(t, testutil.RequireRecvCtx(ctx, t, stopErr))
require.EqualValues(t, batchSize, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())
}

Expand Down
81 changes: 37 additions & 44 deletions coderd/notifications/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ type notifier struct {
log slog.Logger
store Store

tick *quartz.Ticker
stopOnce sync.Once
quit chan any
done chan any
stopOnce sync.Once
outerCtx context.Context
gracefulCtx context.Context
gracefulCancel context.CancelFunc
done chan any

handlers map[database.NotificationMethod]Handler
metrics *Metrics
Expand All @@ -43,28 +44,29 @@ type notifier struct {
clock quartz.Clock
}

func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
func newNotifier(outerCtx context.Context, cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
hr map[database.NotificationMethod]Handler, helpers template.FuncMap, metrics *Metrics, clock quartz.Clock,
) *notifier {
tick := clock.NewTicker(cfg.FetchInterval.Value(), "notifier", "fetchInterval")
gracefulCtx, gracefulCancel := context.WithCancel(outerCtx)
return &notifier{
id: id,
cfg: cfg,
log: log.Named("notifier").With(slog.F("notifier_id", id)),
quit: make(chan any),
done: make(chan any),
tick: tick,
store: db,
handlers: hr,
helpers: helpers,
metrics: metrics,
clock: clock,
id: id,
cfg: cfg,
log: log.Named("notifier").With(slog.F("notifier_id", id)),
outerCtx: outerCtx,
gracefulCtx: gracefulCtx,
gracefulCancel: gracefulCancel,
done: make(chan any),
store: db,
handlers: hr,
helpers: helpers,
metrics: metrics,
clock: clock,
}
}

// run is the main loop of the notifier.
func (n *notifier) run(ctx context.Context, success chan<- dispatchResult, failure chan<- dispatchResult) error {
n.log.Info(ctx, "started")
func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchResult) error {
n.log.Info(n.outerCtx, "started")

defer func() {
close(n.done)
Expand All @@ -75,39 +77,32 @@ func (n *notifier) run(ctx context.Context, success chan<- dispatchResult, failu
// if 100 notifications are enqueued, we shouldn't activate this routine for each one; so how to debounce these?
// PLUS we should also have an interval (but a longer one, maybe 1m) to account for retries (those will not get
// triggered by a code path, but rather by a timeout expiring which makes the message retryable)
for {
select {
case <-ctx.Done():
return xerrors.Errorf("notifier %q context canceled: %w", n.id, ctx.Err())
case <-n.quit:
return nil
default:
}

// run the ticker with the graceful context, so we stop fetching after stop() is called
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
// Check if notifier is not paused.
ok, err := n.ensureRunning(ctx)
ok, err := n.ensureRunning(n.outerCtx)
if err != nil {
n.log.Warn(ctx, "failed to check notifier state", slog.Error(err))
n.log.Warn(n.outerCtx, "failed to check notifier state", slog.Error(err))
}

if ok {
// Call process() immediately (i.e. don't wait an initial tick).
err = n.process(ctx, success, failure)
err = n.process(n.outerCtx, success, failure)
if err != nil {
n.log.Error(ctx, "failed to process messages", slog.Error(err))
n.log.Error(n.outerCtx, "failed to process messages", slog.Error(err))
}
}
// we don't return any errors because we don't want to kill the loop because of them.
return nil
}, "notifier", "fetchInterval")

// Shortcut to bail out quickly if stop() has been called or the context canceled.
select {
case <-ctx.Done():
return xerrors.Errorf("notifier %q context canceled: %w", n.id, ctx.Err())
case <-n.quit:
return nil
case <-n.tick.C:
// sleep until next invocation
}
_ = tick.Wait()
// only errors we can return are context errors. Only return an error if the outer context
// was canceled, not if we were gracefully stopped.
if n.outerCtx.Err() != nil {
return xerrors.Errorf("notifier %q context canceled: %w", n.id, n.outerCtx.Err())
}
return nil
}

// ensureRunning checks if notifier is not paused.
Expand Down Expand Up @@ -343,9 +338,7 @@ func (n *notifier) newInhibitedDispatch(msg database.AcquireNotificationMessages
func (n *notifier) stop() {
n.stopOnce.Do(func() {
n.log.Info(context.Background(), "graceful stop requested")

n.tick.Stop()
close(n.quit)
n.gracefulCancel()
<-n.done
})
}
Loading
Loading