From ba2f90a18463a3cbd8d49b6d04d29ff5cd38ce24 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Thu, 1 May 2025 20:26:26 +0200 Subject: [PATCH 1/6] fix: move pubsub publishing out of database transactions to avoid conn starvation Signed-off-by: Danny Kopping --- enterprise/coderd/prebuilds/reconcile.go | 57 +++++++++++++++++------- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/enterprise/coderd/prebuilds/reconcile.go b/enterprise/coderd/prebuilds/reconcile.go index 1b99e46a56680..959a949250483 100644 --- a/enterprise/coderd/prebuilds/reconcile.go +++ b/enterprise/coderd/prebuilds/reconcile.go @@ -40,10 +40,11 @@ type StoreReconciler struct { registerer prometheus.Registerer metrics *MetricsCollector - cancelFn context.CancelCauseFunc - running atomic.Bool - stopped atomic.Bool - done chan struct{} + cancelFn context.CancelCauseFunc + running atomic.Bool + stopped atomic.Bool + done chan struct{} + provisionNotifyCh chan *database.ProvisionerJob } var _ prebuilds.ReconciliationOrchestrator = &StoreReconciler{} @@ -56,13 +57,14 @@ func NewStoreReconciler(store database.Store, registerer prometheus.Registerer, ) *StoreReconciler { reconciler := &StoreReconciler{ - store: store, - pubsub: ps, - logger: logger, - cfg: cfg, - clock: clock, - registerer: registerer, - done: make(chan struct{}, 1), + store: store, + pubsub: ps, + logger: logger, + cfg: cfg, + clock: clock, + registerer: registerer, + done: make(chan struct{}, 1), + provisionNotifyCh: make(chan *database.ProvisionerJob, 100), } reconciler.metrics = NewMetricsCollector(store, logger, reconciler) @@ -100,6 +102,29 @@ func (c *StoreReconciler) Run(ctx context.Context) { // NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above. c.running.Store(true) + // Publish provisioning jobs outside of database transactions. + // PGPubsub tries to acquire a new connection on Publish. A connection is held while a database transaction is active, + // so we can exhaust available connections. + go func() { + for { + select { + case <-c.done: + return + case <-ctx.Done(): + return + case job := <-c.provisionNotifyCh: + if job == nil { + continue + } + + err := provisionerjobs.PostJob(c.pubsub, *job) + if err != nil { + c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err)) + } + } + } + }() + for { select { // TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed, @@ -571,10 +596,12 @@ func (c *StoreReconciler) provision( return xerrors.Errorf("provision workspace: %w", err) } - err = provisionerjobs.PostJob(c.pubsub, *provisionerJob) - if err != nil { - // Client probably doesn't care about this error, so just log it. - c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err)) + // Publish provisioner job event outside of transaction. + select { + case c.provisionNotifyCh <- provisionerJob: + default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though. + c.logger.Warn(ctx, "provisioner job notification queue full, dropping", + slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String())) } c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition), From 74f5dd78dcf48c2a8e0e42cd43440a4f2f25ab03 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 2 May 2025 10:42:45 +0200 Subject: [PATCH 2/6] chore: commentary Signed-off-by: Danny Kopping --- enterprise/coderd/prebuilds/reconcile.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/enterprise/coderd/prebuilds/reconcile.go b/enterprise/coderd/prebuilds/reconcile.go index 959a949250483..b6932790435eb 100644 --- a/enterprise/coderd/prebuilds/reconcile.go +++ b/enterprise/coderd/prebuilds/reconcile.go @@ -103,8 +103,12 @@ func (c *StoreReconciler) Run(ctx context.Context) { c.running.Store(true) // Publish provisioning jobs outside of database transactions. - // PGPubsub tries to acquire a new connection on Publish. A connection is held while a database transaction is active, - // so we can exhaust available connections. + // A connection is held while a database transaction is active; PGPubsub also tries to acquire a new connection on + // Publish, so we can exhaust available connections. + // + // A single worker dequeues from the channel, which should be sufficient. + // If any messages are missed due to congestion or errors, provisionerdserver has a backup polling mechanism which + // will periodically pick up any queued jobs (see poll(time.Duration) in coderd/provisionerdserver/acquirer.go). go func() { for { select { From f6909ff17e7ac41db67a065f88fbb72163d0c7c6 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 2 May 2025 11:43:16 +0200 Subject: [PATCH 3/6] chore: polish Signed-off-by: Danny Kopping --- enterprise/coderd/prebuilds/reconcile.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/enterprise/coderd/prebuilds/reconcile.go b/enterprise/coderd/prebuilds/reconcile.go index b6932790435eb..3e368550742e7 100644 --- a/enterprise/coderd/prebuilds/reconcile.go +++ b/enterprise/coderd/prebuilds/reconcile.go @@ -44,7 +44,7 @@ type StoreReconciler struct { running atomic.Bool stopped atomic.Bool done chan struct{} - provisionNotifyCh chan *database.ProvisionerJob + provisionNotifyCh chan database.ProvisionerJob } var _ prebuilds.ReconciliationOrchestrator = &StoreReconciler{} @@ -64,7 +64,7 @@ func NewStoreReconciler(store database.Store, clock: clock, registerer: registerer, done: make(chan struct{}, 1), - provisionNotifyCh: make(chan *database.ProvisionerJob, 100), + provisionNotifyCh: make(chan database.ProvisionerJob, 10), } reconciler.metrics = NewMetricsCollector(store, logger, reconciler) @@ -117,11 +117,7 @@ func (c *StoreReconciler) Run(ctx context.Context) { case <-ctx.Done(): return case job := <-c.provisionNotifyCh: - if job == nil { - continue - } - - err := provisionerjobs.PostJob(c.pubsub, *job) + err := provisionerjobs.PostJob(c.pubsub, job) if err != nil { c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err)) } @@ -600,9 +596,13 @@ func (c *StoreReconciler) provision( return xerrors.Errorf("provision workspace: %w", err) } + if provisionerJob == nil { + return nil + } + // Publish provisioner job event outside of transaction. select { - case c.provisionNotifyCh <- provisionerJob: + case c.provisionNotifyCh <- *provisionerJob: default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though. c.logger.Warn(ctx, "provisioner job notification queue full, dropping", slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String())) From 1c84fac5616770b4ef5cfee1f4018aba749a56ff Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 2 May 2025 12:29:21 +0200 Subject: [PATCH 4/6] chore: augment reconciler test to validate that broken publish does not affect behaviour Signed-off-by: Danny Kopping --- coderd/database/dbgen/dbgen.go | 4 +- enterprise/coderd/prebuilds/reconcile_test.go | 195 ++++++++++-------- 2 files changed, 109 insertions(+), 90 deletions(-) diff --git a/coderd/database/dbgen/dbgen.go b/coderd/database/dbgen/dbgen.go index 854c7c2974fe6..c81b8017ef048 100644 --- a/coderd/database/dbgen/dbgen.go +++ b/coderd/database/dbgen/dbgen.go @@ -643,8 +643,8 @@ func ProvisionerJob(t testing.TB, db database.Store, ps pubsub.Pubsub, orig data }) require.NoError(t, err, "insert job") if ps != nil { - err = provisionerjobs.PostJob(ps, job) - require.NoError(t, err, "post job to pubsub") + // Advisory, but not essential since acquirer has a background poller to pick up missed jobs. + _ = provisionerjobs.PostJob(ps, job) } if !orig.StartedAt.Time.IsZero() { job, err = db.AcquireProvisionerJob(genCtx, database.AcquireProvisionerJobParams{ diff --git a/enterprise/coderd/prebuilds/reconcile_test.go b/enterprise/coderd/prebuilds/reconcile_test.go index bc886fc0a8231..6315b82ef901a 100644 --- a/enterprise/coderd/prebuilds/reconcile_test.go +++ b/enterprise/coderd/prebuilds/reconcile_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/xerrors" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/util/slice" @@ -303,100 +304,106 @@ func TestPrebuildReconciliation(t *testing.T) { for _, prebuildLatestTransition := range tc.prebuildLatestTransitions { for _, prebuildJobStatus := range tc.prebuildJobStatuses { for _, templateDeleted := range tc.templateDeleted { - t.Run(fmt.Sprintf("%s - %s - %s", tc.name, prebuildLatestTransition, prebuildJobStatus), func(t *testing.T) { - t.Parallel() - t.Cleanup(func() { - if t.Failed() { - t.Logf("failed to run test: %s", tc.name) - t.Logf("templateVersionActive: %t", templateVersionActive) - t.Logf("prebuildLatestTransition: %s", prebuildLatestTransition) - t.Logf("prebuildJobStatus: %s", prebuildJobStatus) + for _, useBrokenPubsub := range []bool{true, false} { + t.Run(fmt.Sprintf("%s - %s - %s - pubsub_broken=%v", tc.name, prebuildLatestTransition, prebuildJobStatus, useBrokenPubsub), func(t *testing.T) { + t.Parallel() + t.Cleanup(func() { + if t.Failed() { + t.Logf("failed to run test: %s", tc.name) + t.Logf("templateVersionActive: %t", templateVersionActive) + t.Logf("prebuildLatestTransition: %s", prebuildLatestTransition) + t.Logf("prebuildJobStatus: %s", prebuildJobStatus) + } + }) + clock := quartz.NewMock(t) + ctx := testutil.Context(t, testutil.WaitShort) + cfg := codersdk.PrebuildsConfig{} + logger := slogtest.Make( + t, &slogtest.Options{IgnoreErrors: true}, + ).Leveled(slog.LevelDebug) + db, pubSub := dbtestutil.NewDB(t) + if useBrokenPubsub { + pubSub = &brokenPublisher{Pubsub: pubSub} } - }) - clock := quartz.NewMock(t) - ctx := testutil.Context(t, testutil.WaitShort) - cfg := codersdk.PrebuildsConfig{} - logger := slogtest.Make( - t, &slogtest.Options{IgnoreErrors: true}, - ).Leveled(slog.LevelDebug) - db, pubSub := dbtestutil.NewDB(t) - controller := prebuilds.NewStoreReconciler(db, pubSub, cfg, logger, quartz.NewMock(t), prometheus.NewRegistry()) - - ownerID := uuid.New() - dbgen.User(t, db, database.User{ - ID: ownerID, - }) - org, template := setupTestDBTemplate(t, db, ownerID, templateDeleted) - templateVersionID := setupTestDBTemplateVersion( - ctx, - t, - clock, - db, - pubSub, - org.ID, - ownerID, - template.ID, - ) - preset := setupTestDBPreset( - t, - db, - templateVersionID, - 1, - uuid.New().String(), - ) - prebuild := setupTestDBPrebuild( - t, - clock, - db, - pubSub, - prebuildLatestTransition, - prebuildJobStatus, - org.ID, - preset, - template.ID, - templateVersionID, - ) - - if !templateVersionActive { - // Create a new template version and mark it as active - // This marks the template version that we care about as inactive - setupTestDBTemplateVersion(ctx, t, clock, db, pubSub, org.ID, ownerID, template.ID) - } - - // Run the reconciliation multiple times to ensure idempotency - // 8 was arbitrary, but large enough to reasonably trust the result - for i := 1; i <= 8; i++ { - require.NoErrorf(t, controller.ReconcileAll(ctx), "failed on iteration %d", i) - - if tc.shouldCreateNewPrebuild != nil { - newPrebuildCount := 0 - workspaces, err := db.GetWorkspacesByTemplateID(ctx, template.ID) - require.NoError(t, err) - for _, workspace := range workspaces { - if workspace.ID != prebuild.ID { - newPrebuildCount++ + + controller := prebuilds.NewStoreReconciler(db, pubSub, cfg, logger, quartz.NewMock(t), prometheus.NewRegistry()) + + ownerID := uuid.New() + dbgen.User(t, db, database.User{ + ID: ownerID, + }) + org, template := setupTestDBTemplate(t, db, ownerID, templateDeleted) + templateVersionID := setupTestDBTemplateVersion( + ctx, + t, + clock, + db, + pubSub, + org.ID, + ownerID, + template.ID, + ) + preset := setupTestDBPreset( + t, + db, + templateVersionID, + 1, + uuid.New().String(), + ) + prebuild := setupTestDBPrebuild( + t, + clock, + db, + pubSub, + prebuildLatestTransition, + prebuildJobStatus, + org.ID, + preset, + template.ID, + templateVersionID, + ) + + if !templateVersionActive { + // Create a new template version and mark it as active + // This marks the template version that we care about as inactive + setupTestDBTemplateVersion(ctx, t, clock, db, pubSub, org.ID, ownerID, template.ID) + } + + // Run the reconciliation multiple times to ensure idempotency + // 8 was arbitrary, but large enough to reasonably trust the result + for i := 1; i <= 8; i++ { + require.NoErrorf(t, controller.ReconcileAll(ctx), "failed on iteration %d", i) + + if tc.shouldCreateNewPrebuild != nil { + newPrebuildCount := 0 + workspaces, err := db.GetWorkspacesByTemplateID(ctx, template.ID) + require.NoError(t, err) + for _, workspace := range workspaces { + if workspace.ID != prebuild.ID { + newPrebuildCount++ + } } + // This test configures a preset that desires one prebuild. + // In cases where new prebuilds should be created, there should be exactly one. + require.Equal(t, *tc.shouldCreateNewPrebuild, newPrebuildCount == 1) } - // This test configures a preset that desires one prebuild. - // In cases where new prebuilds should be created, there should be exactly one. - require.Equal(t, *tc.shouldCreateNewPrebuild, newPrebuildCount == 1) - } - if tc.shouldDeleteOldPrebuild != nil { - builds, err := db.GetWorkspaceBuildsByWorkspaceID(ctx, database.GetWorkspaceBuildsByWorkspaceIDParams{ - WorkspaceID: prebuild.ID, - }) - require.NoError(t, err) - if *tc.shouldDeleteOldPrebuild { - require.Equal(t, 2, len(builds)) - require.Equal(t, database.WorkspaceTransitionDelete, builds[0].Transition) - } else { - require.Equal(t, 1, len(builds)) - require.Equal(t, prebuildLatestTransition, builds[0].Transition) + if tc.shouldDeleteOldPrebuild != nil { + builds, err := db.GetWorkspaceBuildsByWorkspaceID(ctx, database.GetWorkspaceBuildsByWorkspaceIDParams{ + WorkspaceID: prebuild.ID, + }) + require.NoError(t, err) + if *tc.shouldDeleteOldPrebuild { + require.Equal(t, 2, len(builds)) + require.Equal(t, database.WorkspaceTransitionDelete, builds[0].Transition) + } else { + require.Equal(t, 1, len(builds)) + require.Equal(t, prebuildLatestTransition, builds[0].Transition) + } } } - } - }) + }) + } } } } @@ -404,6 +411,18 @@ func TestPrebuildReconciliation(t *testing.T) { } } +// brokenPublisher is used to validate that Publish() calls which always fail do not affect the reconciler's behaviour, +// since the messages published are not essential but merely advisory. +type brokenPublisher struct { + pubsub.Pubsub +} + +func (b *brokenPublisher) Publish(event string, _ []byte) error { + // I'm explicitly _not_ checking for EventJobPosted (coderd/database/provisionerjobs/provisionerjobs.go) since that + // required too much knowledge of the underlying implementation. + return xerrors.Errorf("refusing to publish %q", event) +} + func TestMultiplePresetsPerTemplateVersion(t *testing.T) { t.Parallel() From 42cc3f53397d16fc5d930d9286f7c96a8a8584f7 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 2 May 2025 13:55:11 +0200 Subject: [PATCH 5/6] chore: make lint Signed-off-by: Danny Kopping --- enterprise/coderd/prebuilds/reconcile_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/enterprise/coderd/prebuilds/reconcile_test.go b/enterprise/coderd/prebuilds/reconcile_test.go index a67a7ad8b94a0..13ecb37d49b08 100644 --- a/enterprise/coderd/prebuilds/reconcile_test.go +++ b/enterprise/coderd/prebuilds/reconcile_test.go @@ -411,15 +411,15 @@ func TestPrebuildReconciliation(t *testing.T) { } } -// brokenPublisher is used to validate that Publish() calls which always fail do not affect the reconciler's behaviour, +// brokenPublisher is used to validate that Publish() calls which always fail do not affect the reconciler's behavior, // since the messages published are not essential but merely advisory. type brokenPublisher struct { pubsub.Pubsub } -func (b *brokenPublisher) Publish(event string, _ []byte) error { +func (*brokenPublisher) Publish(event string, _ []byte) error { // I'm explicitly _not_ checking for EventJobPosted (coderd/database/provisionerjobs/provisionerjobs.go) since that - // required too much knowledge of the underlying implementation. + // requires too much knowledge of the underlying implementation. return xerrors.Errorf("refusing to publish %q", event) } From 9ddad0bebc3121a53de2a183242938c740bae434 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Mon, 5 May 2025 10:24:19 +0200 Subject: [PATCH 6/6] fix: only break pubsub after dbgen setup Signed-off-by: Danny Kopping --- coderd/database/dbgen/dbgen.go | 4 ++-- enterprise/coderd/prebuilds/reconcile_test.go | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/coderd/database/dbgen/dbgen.go b/coderd/database/dbgen/dbgen.go index c81b8017ef048..854c7c2974fe6 100644 --- a/coderd/database/dbgen/dbgen.go +++ b/coderd/database/dbgen/dbgen.go @@ -643,8 +643,8 @@ func ProvisionerJob(t testing.TB, db database.Store, ps pubsub.Pubsub, orig data }) require.NoError(t, err, "insert job") if ps != nil { - // Advisory, but not essential since acquirer has a background poller to pick up missed jobs. - _ = provisionerjobs.PostJob(ps, job) + err = provisionerjobs.PostJob(ps, job) + require.NoError(t, err, "post job to pubsub") } if !orig.StartedAt.Time.IsZero() { job, err = db.AcquireProvisionerJob(genCtx, database.AcquireProvisionerJobParams{ diff --git a/enterprise/coderd/prebuilds/reconcile_test.go b/enterprise/coderd/prebuilds/reconcile_test.go index 13ecb37d49b08..a1666134a7965 100644 --- a/enterprise/coderd/prebuilds/reconcile_test.go +++ b/enterprise/coderd/prebuilds/reconcile_test.go @@ -322,11 +322,6 @@ func TestPrebuildReconciliation(t *testing.T) { t, &slogtest.Options{IgnoreErrors: true}, ).Leveled(slog.LevelDebug) db, pubSub := dbtestutil.NewDB(t) - if useBrokenPubsub { - pubSub = &brokenPublisher{Pubsub: pubSub} - } - - controller := prebuilds.NewStoreReconciler(db, pubSub, cfg, logger, quartz.NewMock(t), prometheus.NewRegistry()) ownerID := uuid.New() dbgen.User(t, db, database.User{ @@ -369,6 +364,11 @@ func TestPrebuildReconciliation(t *testing.T) { setupTestDBTemplateVersion(ctx, t, clock, db, pubSub, org.ID, ownerID, template.ID) } + if useBrokenPubsub { + pubSub = &brokenPublisher{Pubsub: pubSub} + } + controller := prebuilds.NewStoreReconciler(db, pubSub, cfg, logger, quartz.NewMock(t), prometheus.NewRegistry()) + // Run the reconciliation multiple times to ensure idempotency // 8 was arbitrary, but large enough to reasonably trust the result for i := 1; i <= 8; i++ { @@ -417,10 +417,13 @@ type brokenPublisher struct { pubsub.Pubsub } +// Publish deliberately fails. +// I'm explicitly _not_ checking for EventJobPosted (coderd/database/provisionerjobs/provisionerjobs.go) since that +// requires too much knowledge of the underlying implementation. func (*brokenPublisher) Publish(event string, _ []byte) error { - // I'm explicitly _not_ checking for EventJobPosted (coderd/database/provisionerjobs/provisionerjobs.go) since that - // requires too much knowledge of the underlying implementation. - return xerrors.Errorf("refusing to publish %q", event) + // Mimick some work being done. + <-time.After(testutil.IntervalFast) + return xerrors.Errorf("failed to publish %q", event) } func TestMultiplePresetsPerTemplateVersion(t *testing.T) {