From d18fbfe5b95917cbce7bb08d25c5b06248cd2a8a Mon Sep 17 00:00:00 2001 From: defelmnq Date: Wed, 7 May 2025 13:28:06 +0000 Subject: [PATCH 1/6] work on flake manager --- coderd/notifications/manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index ee85bd2d7a3c4..eb73dd9054c19 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -143,6 +143,7 @@ func (m *Manager) Run(ctx context.Context) { m.runOnce.Do(func() { // Closes when Stop() is called or context is canceled. go func() { + m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) err := m.loop(ctx) if err != nil { m.log.Error(ctx, "notification manager stopped with error", slog.Error(err)) @@ -174,9 +175,8 @@ func (m *Manager) loop(ctx context.Context) error { var eg errgroup.Group - // Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications. - m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) eg.Go(func() error { + // run the notifier which will handle dequeueing and dispatching notifications. return m.notifier.run(m.success, m.failure) }) From 510ffbbce755d32bd8fad728f3147abf563375e7 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 7 May 2025 17:35:09 +0300 Subject: [PATCH 2/6] add test to trigger race --- coderd/notifications/manager_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/coderd/notifications/manager_test.go b/coderd/notifications/manager_test.go index 3eaebef7c9d0f..805ae35053fac 100644 --- a/coderd/notifications/manager_test.go +++ b/coderd/notifications/manager_test.go @@ -182,6 +182,25 @@ func TestStopBeforeRun(t *testing.T) { }, testutil.WaitShort, testutil.IntervalFast) } +func TestRunStopRace(t *testing.T) { + t.Parallel() + + // SETUP + + // nolint:gocritic // Unit test. + ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong)) + store, ps := dbtestutil.NewDB(t) + logger := testutil.Logger(t) + + // GIVEN: a standard manager + mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), store, ps, defaultHelpers(), createMetrics(), logger.Named("notifications-manager")) + require.NoError(t, err) + + mgr.Run(ctx) + err = mgr.Stop(ctx) + require.NoError(t, err) +} + type syncInterceptor struct { notifications.Store From ab8560dbde6a44c515cbc0121499f228fba2c0dd Mon Sep 17 00:00:00 2001 From: defelmnq Date: Wed, 7 May 2025 14:47:18 +0000 Subject: [PATCH 3/6] work on flake manager --- coderd/notifications/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index eb73dd9054c19..0027c7aec8c1c 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -141,9 +141,9 @@ func (m *Manager) Run(ctx context.Context) { m.log.Info(ctx, "started") m.runOnce.Do(func() { + m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) // Closes when Stop() is called or context is canceled. go func() { - m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) err := m.loop(ctx) if err != nil { m.log.Error(ctx, "notification manager stopped with error", slog.Error(err)) From 2f86f8103a41cbd5566324287bc1f5355dec6e50 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 8 May 2025 12:34:02 +0000 Subject: [PATCH 4/6] refactor notifier close using mutex for Run/Close sync --- coderd/notifications/manager.go | 108 +++++++++++++++----------------- 1 file changed, 51 insertions(+), 57 deletions(-) diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index 0027c7aec8c1c..94f1d174ffb9e 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -44,7 +44,6 @@ type Manager struct { store Store log slog.Logger - notifier *notifier handlers map[database.NotificationMethod]Handler method database.NotificationMethod helpers template.FuncMap @@ -53,11 +52,13 @@ type Manager struct { success, failure chan dispatchResult - runOnce sync.Once - stopOnce sync.Once - doneOnce sync.Once - stop chan any - done chan any + mu sync.Mutex // Protects following. + closed bool + notifier *notifier + + runOnce sync.Once + stop chan any + done chan any // clock is for testing only clock quartz.Clock @@ -138,10 +139,9 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) { // Manager requires system-level permissions to interact with the store. // Run is only intended to be run once. func (m *Manager) Run(ctx context.Context) { - m.log.Info(ctx, "started") + m.log.Info(ctx, "notification manager started") m.runOnce.Do(func() { - m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) // Closes when Stop() is called or context is canceled. go func() { err := m.loop(ctx) @@ -156,30 +156,26 @@ func (m *Manager) Run(ctx context.Context) { // events, creating a notifier, and publishing bulk dispatch result updates to the store. func (m *Manager) loop(ctx context.Context) error { defer func() { - m.doneOnce.Do(func() { - close(m.done) - }) + close(m.done) m.log.Info(context.Background(), "notification manager stopped") }() - // Caught a terminal signal before notifier was created, exit immediately. - select { - case <-m.stop: - m.log.Warn(ctx, "gracefully stopped") - return xerrors.Errorf("gracefully stopped") - case <-ctx.Done(): - m.log.Error(ctx, "ungracefully stopped", slog.Error(ctx.Err())) - return xerrors.Errorf("notifications: %w", ctx.Err()) - default: + m.mu.Lock() + if m.closed { + m.mu.Unlock() + return xerrors.New("manager already closed") } var eg errgroup.Group + m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock) eg.Go(func() error { // run the notifier which will handle dequeueing and dispatching notifications. return m.notifier.run(m.success, m.failure) }) + m.mu.Unlock() + // Periodically flush notification state changes to the store. eg.Go(func() error { // Every interval, collect the messages in the channels and bulk update them in the store. @@ -355,48 +351,46 @@ func (m *Manager) syncUpdates(ctx context.Context) { // Stop stops the notifier and waits until it has stopped. func (m *Manager) Stop(ctx context.Context) error { - var err error - m.stopOnce.Do(func() { - select { - case <-ctx.Done(): - err = ctx.Err() - return - default: - } + m.mu.Lock() + defer m.mu.Unlock() - m.log.Info(context.Background(), "graceful stop requested") + if m.closed { + return nil + } + m.closed = true - // If the notifier hasn't been started, we don't need to wait for anything. - // This is only really during testing when we want to enqueue messages only but not deliver them. - if m.notifier == nil { - m.doneOnce.Do(func() { - close(m.done) - }) - } else { - m.notifier.stop() - } + m.log.Info(context.Background(), "graceful stop requested") + + // If the notifier hasn't been started, we don't need to wait for anything. + // This is only really during testing when we want to enqueue messages only but not deliver them. + if m.notifier != nil { + m.notifier.stop() + } - // Signal the stop channel to cause loop to exit. - close(m.stop) + // Signal the stop channel to cause loop to exit. + close(m.stop) - // Wait for the manager loop to exit or the context to be canceled, whichever comes first. - select { - case <-ctx.Done(): - var errStr string - if ctx.Err() != nil { - errStr = ctx.Err().Error() - } - // For some reason, slog.Error returns {} for a context error. - m.log.Error(context.Background(), "graceful stop failed", slog.F("err", errStr)) - err = ctx.Err() - return - case <-m.done: - m.log.Info(context.Background(), "gracefully stopped") - return - } - }) + if m.notifier == nil { + return nil + } - return err + m.mu.Unlock() // Unlock to avoid blocking loop. + defer m.mu.Lock() // Re-lock the mutex due to earlier defer. + + // Wait for the manager loop to exit or the context to be canceled, whichever comes first. + select { + case <-ctx.Done(): + var errStr string + if ctx.Err() != nil { + errStr = ctx.Err().Error() + } + // For some reason, slog.Error returns {} for a context error. + m.log.Error(context.Background(), "graceful stop failed", slog.F("err", errStr)) + return ctx.Err() + case <-m.done: + m.log.Info(context.Background(), "gracefully stopped") + return nil + } } type dispatchResult struct { From 869782bfd16bfd3e5392d0124747378c922f7f13 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 8 May 2025 12:38:48 +0000 Subject: [PATCH 5/6] s/Info/Debug/ logs --- coderd/notifications/manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index 94f1d174ffb9e..1a2c418a014bb 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -139,7 +139,7 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) { // Manager requires system-level permissions to interact with the store. // Run is only intended to be run once. func (m *Manager) Run(ctx context.Context) { - m.log.Info(ctx, "notification manager started") + m.log.Debug(ctx, "notification manager started") m.runOnce.Do(func() { // Closes when Stop() is called or context is canceled. @@ -157,7 +157,7 @@ func (m *Manager) Run(ctx context.Context) { func (m *Manager) loop(ctx context.Context) error { defer func() { close(m.done) - m.log.Info(context.Background(), "notification manager stopped") + m.log.Debug(context.Background(), "notification manager stopped") }() m.mu.Lock() @@ -359,7 +359,7 @@ func (m *Manager) Stop(ctx context.Context) error { } m.closed = true - m.log.Info(context.Background(), "graceful stop requested") + m.log.Debug(context.Background(), "graceful stop requested") // If the notifier hasn't been started, we don't need to wait for anything. // This is only really during testing when we want to enqueue messages only but not deliver them. @@ -388,7 +388,7 @@ func (m *Manager) Stop(ctx context.Context) error { m.log.Error(context.Background(), "graceful stop failed", slog.F("err", errStr)) return ctx.Err() case <-m.done: - m.log.Info(context.Background(), "gracefully stopped") + m.log.Debug(context.Background(), "gracefully stopped") return nil } } From 3dd31745962cfb0be8f6881fe255296e954b7bd6 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 8 May 2025 13:26:58 +0000 Subject: [PATCH 6/6] wait medium --- coderd/notifications/manager_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/coderd/notifications/manager_test.go b/coderd/notifications/manager_test.go index 805ae35053fac..e9c309f0a09d3 100644 --- a/coderd/notifications/manager_test.go +++ b/coderd/notifications/manager_test.go @@ -188,7 +188,7 @@ func TestRunStopRace(t *testing.T) { // SETUP // nolint:gocritic // Unit test. - ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong)) + ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitMedium)) store, ps := dbtestutil.NewDB(t) logger := testutil.Logger(t) @@ -196,6 +196,9 @@ func TestRunStopRace(t *testing.T) { mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), store, ps, defaultHelpers(), createMetrics(), logger.Named("notifications-manager")) require.NoError(t, err) + // Start Run and Stop after each other (run does "go loop()"). + // This is to catch a (now fixed) race condition where the manager + // would be accessed/stopped while it was being created/starting up. mgr.Run(ctx) err = mgr.Stop(ctx) require.NoError(t, err)