Skip to content

fix: move pubsub publishing out of database transactions to avoid conn exhaustion #17648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions coderd/database/dbgen/dbgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,8 @@ func ProvisionerJob(t testing.TB, db database.Store, ps pubsub.Pubsub, orig data
})
require.NoError(t, err, "insert job")
if ps != nil {
err = provisionerjobs.PostJob(ps, job)
require.NoError(t, err, "post job to pubsub")
// Advisory, but not essential since acquirer has a background poller to pick up missed jobs.
_ = provisionerjobs.PostJob(ps, job)
}
if !orig.StartedAt.Time.IsZero() {
job, err = db.AcquireProvisionerJob(genCtx, database.AcquireProvisionerJobParams{
Expand Down
61 changes: 46 additions & 15 deletions enterprise/coderd/prebuilds/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type StoreReconciler struct {
registerer prometheus.Registerer
metrics *MetricsCollector

cancelFn context.CancelCauseFunc
running atomic.Bool
stopped atomic.Bool
done chan struct{}
cancelFn context.CancelCauseFunc
running atomic.Bool
stopped atomic.Bool
done chan struct{}
provisionNotifyCh chan database.ProvisionerJob
}

var _ prebuilds.ReconciliationOrchestrator = &StoreReconciler{}
Expand All @@ -56,13 +57,14 @@ func NewStoreReconciler(store database.Store,
registerer prometheus.Registerer,
) *StoreReconciler {
reconciler := &StoreReconciler{
store: store,
pubsub: ps,
logger: logger,
cfg: cfg,
clock: clock,
registerer: registerer,
done: make(chan struct{}, 1),
store: store,
pubsub: ps,
logger: logger,
cfg: cfg,
clock: clock,
registerer: registerer,
done: make(chan struct{}, 1),
provisionNotifyCh: make(chan database.ProvisionerJob, 10),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thumb-suck; I want to protect against temporary network/database blips but also don't want to accumulate too many messages.

}

reconciler.metrics = NewMetricsCollector(store, logger, reconciler)
Expand Down Expand Up @@ -100,6 +102,29 @@ func (c *StoreReconciler) Run(ctx context.Context) {
// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
c.running.Store(true)

// Publish provisioning jobs outside of database transactions.
// A connection is held while a database transaction is active; PGPubsub also tries to acquire a new connection on
// Publish, so we can exhaust available connections.
//
// A single worker dequeues from the channel, which should be sufficient.
// If any messages are missed due to congestion or errors, provisionerdserver has a backup polling mechanism which
// will periodically pick up any queued jobs (see poll(time.Duration) in coderd/provisionerdserver/acquirer.go).
go func() {
for {
select {
case <-c.done:
return
case <-ctx.Done():
return
case job := <-c.provisionNotifyCh:
err := provisionerjobs.PostJob(c.pubsub, job)
if err != nil {
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
}
}
}
}()

for {
select {
// TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed,
Expand Down Expand Up @@ -576,10 +601,16 @@ func (c *StoreReconciler) provision(
return xerrors.Errorf("provision workspace: %w", err)
}

err = provisionerjobs.PostJob(c.pubsub, *provisionerJob)
if err != nil {
// Client probably doesn't care about this error, so just log it.
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
if provisionerJob == nil {
return nil
}

// Publish provisioner job event outside of transaction.
select {
case c.provisionNotifyCh <- *provisionerJob:
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
}

c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
Expand Down
195 changes: 107 additions & 88 deletions enterprise/coderd/prebuilds/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/util/slice"
Expand Down Expand Up @@ -303,107 +304,125 @@ func TestPrebuildReconciliation(t *testing.T) {
for _, prebuildLatestTransition := range tc.prebuildLatestTransitions {
for _, prebuildJobStatus := range tc.prebuildJobStatuses {
for _, templateDeleted := range tc.templateDeleted {
t.Run(fmt.Sprintf("%s - %s - %s", tc.name, prebuildLatestTransition, prebuildJobStatus), func(t *testing.T) {
t.Parallel()
t.Cleanup(func() {
if t.Failed() {
t.Logf("failed to run test: %s", tc.name)
t.Logf("templateVersionActive: %t", templateVersionActive)
t.Logf("prebuildLatestTransition: %s", prebuildLatestTransition)
t.Logf("prebuildJobStatus: %s", prebuildJobStatus)
for _, useBrokenPubsub := range []bool{true, false} {
t.Run(fmt.Sprintf("%s - %s - %s - pubsub_broken=%v", tc.name, prebuildLatestTransition, prebuildJobStatus, useBrokenPubsub), func(t *testing.T) {
t.Parallel()
t.Cleanup(func() {
if t.Failed() {
t.Logf("failed to run test: %s", tc.name)
t.Logf("templateVersionActive: %t", templateVersionActive)
t.Logf("prebuildLatestTransition: %s", prebuildLatestTransition)
t.Logf("prebuildJobStatus: %s", prebuildJobStatus)
}
})
clock := quartz.NewMock(t)
ctx := testutil.Context(t, testutil.WaitShort)
cfg := codersdk.PrebuildsConfig{}
logger := slogtest.Make(
t, &slogtest.Options{IgnoreErrors: true},
).Leveled(slog.LevelDebug)
db, pubSub := dbtestutil.NewDB(t)
if useBrokenPubsub {
pubSub = &brokenPublisher{Pubsub: pubSub}
}
})
clock := quartz.NewMock(t)
ctx := testutil.Context(t, testutil.WaitShort)
cfg := codersdk.PrebuildsConfig{}
logger := slogtest.Make(
t, &slogtest.Options{IgnoreErrors: true},
).Leveled(slog.LevelDebug)
db, pubSub := dbtestutil.NewDB(t)
controller := prebuilds.NewStoreReconciler(db, pubSub, cfg, logger, quartz.NewMock(t), prometheus.NewRegistry())

ownerID := uuid.New()
dbgen.User(t, db, database.User{
ID: ownerID,
})
org, template := setupTestDBTemplate(t, db, ownerID, templateDeleted)
templateVersionID := setupTestDBTemplateVersion(
ctx,
t,
clock,
db,
pubSub,
org.ID,
ownerID,
template.ID,
)
preset := setupTestDBPreset(
t,
db,
templateVersionID,
1,
uuid.New().String(),
)
prebuild := setupTestDBPrebuild(
t,
clock,
db,
pubSub,
prebuildLatestTransition,
prebuildJobStatus,
org.ID,
preset,
template.ID,
templateVersionID,
)

if !templateVersionActive {
// Create a new template version and mark it as active
// This marks the template version that we care about as inactive
setupTestDBTemplateVersion(ctx, t, clock, db, pubSub, org.ID, ownerID, template.ID)
}

// Run the reconciliation multiple times to ensure idempotency
// 8 was arbitrary, but large enough to reasonably trust the result
for i := 1; i <= 8; i++ {
require.NoErrorf(t, controller.ReconcileAll(ctx), "failed on iteration %d", i)

if tc.shouldCreateNewPrebuild != nil {
newPrebuildCount := 0
workspaces, err := db.GetWorkspacesByTemplateID(ctx, template.ID)
require.NoError(t, err)
for _, workspace := range workspaces {
if workspace.ID != prebuild.ID {
newPrebuildCount++

controller := prebuilds.NewStoreReconciler(db, pubSub, cfg, logger, quartz.NewMock(t), prometheus.NewRegistry())

ownerID := uuid.New()
dbgen.User(t, db, database.User{
ID: ownerID,
})
org, template := setupTestDBTemplate(t, db, ownerID, templateDeleted)
templateVersionID := setupTestDBTemplateVersion(
ctx,
t,
clock,
db,
pubSub,
org.ID,
ownerID,
template.ID,
)
preset := setupTestDBPreset(
t,
db,
templateVersionID,
1,
uuid.New().String(),
)
prebuild := setupTestDBPrebuild(
t,
clock,
db,
pubSub,
prebuildLatestTransition,
prebuildJobStatus,
org.ID,
preset,
template.ID,
templateVersionID,
)

if !templateVersionActive {
// Create a new template version and mark it as active
// This marks the template version that we care about as inactive
setupTestDBTemplateVersion(ctx, t, clock, db, pubSub, org.ID, ownerID, template.ID)
}

// Run the reconciliation multiple times to ensure idempotency
// 8 was arbitrary, but large enough to reasonably trust the result
for i := 1; i <= 8; i++ {
require.NoErrorf(t, controller.ReconcileAll(ctx), "failed on iteration %d", i)

if tc.shouldCreateNewPrebuild != nil {
newPrebuildCount := 0
workspaces, err := db.GetWorkspacesByTemplateID(ctx, template.ID)
require.NoError(t, err)
for _, workspace := range workspaces {
if workspace.ID != prebuild.ID {
newPrebuildCount++
}
}
// This test configures a preset that desires one prebuild.
// In cases where new prebuilds should be created, there should be exactly one.
require.Equal(t, *tc.shouldCreateNewPrebuild, newPrebuildCount == 1)
}
// This test configures a preset that desires one prebuild.
// In cases where new prebuilds should be created, there should be exactly one.
require.Equal(t, *tc.shouldCreateNewPrebuild, newPrebuildCount == 1)
}

if tc.shouldDeleteOldPrebuild != nil {
builds, err := db.GetWorkspaceBuildsByWorkspaceID(ctx, database.GetWorkspaceBuildsByWorkspaceIDParams{
WorkspaceID: prebuild.ID,
})
require.NoError(t, err)
if *tc.shouldDeleteOldPrebuild {
require.Equal(t, 2, len(builds))
require.Equal(t, database.WorkspaceTransitionDelete, builds[0].Transition)
} else {
require.Equal(t, 1, len(builds))
require.Equal(t, prebuildLatestTransition, builds[0].Transition)
if tc.shouldDeleteOldPrebuild != nil {
builds, err := db.GetWorkspaceBuildsByWorkspaceID(ctx, database.GetWorkspaceBuildsByWorkspaceIDParams{
WorkspaceID: prebuild.ID,
})
require.NoError(t, err)
if *tc.shouldDeleteOldPrebuild {
require.Equal(t, 2, len(builds))
require.Equal(t, database.WorkspaceTransitionDelete, builds[0].Transition)
} else {
require.Equal(t, 1, len(builds))
require.Equal(t, prebuildLatestTransition, builds[0].Transition)
}
}
}
}
})
})
}
}
}
}
}
}
}

// brokenPublisher is used to validate that Publish() calls which always fail do not affect the reconciler's behavior,
// since the messages published are not essential but merely advisory.
type brokenPublisher struct {
pubsub.Pubsub
}

func (*brokenPublisher) Publish(event string, _ []byte) error {
// I'm explicitly _not_ checking for EventJobPosted (coderd/database/provisionerjobs/provisionerjobs.go) since that
// requires too much knowledge of the underlying implementation.
return xerrors.Errorf("refusing to publish %q", event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fail slowly instead?

e.g. <-time.After(testutil.IntervalFast)

}

func TestMultiplePresetsPerTemplateVersion(t *testing.T) {
t.Parallel()

Expand Down
Loading