Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions coderd/database/dbgen/dbgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,8 @@ func ProvisionerJob(t testing.TB, db database.Store, ps pubsub.Pubsub, orig data
})
require.NoError(t, err, "insert job")
if ps != nil {
err = provisionerjobs.PostJob(ps, job)
require.NoError(t, err, "post job to pubsub")
// Advisory, but not essential since acquirer has a background poller to pick up missed jobs.
_ = provisionerjobs.PostJob(ps, job)
}
if !orig.StartedAt.Time.IsZero() {
job, err = db.AcquireProvisionerJob(genCtx, database.AcquireProvisionerJobParams{
Expand Down
61 changes: 46 additions & 15 deletions enterprise/coderd/prebuilds/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type StoreReconciler struct {
registerer prometheus.Registerer
metrics *MetricsCollector

cancelFn context.CancelCauseFunc
running atomic.Bool
stopped atomic.Bool
done chan struct{}
cancelFn context.CancelCauseFunc
running atomic.Bool
stopped atomic.Bool
done chan struct{}
provisionNotifyCh chan database.ProvisionerJob
}

var _ prebuilds.ReconciliationOrchestrator = &StoreReconciler{}
Expand All @@ -56,13 +57,14 @@ func NewStoreReconciler(store database.Store,
registerer prometheus.Registerer,
) *StoreReconciler {
reconciler := &StoreReconciler{
store: store,
pubsub: ps,
logger: logger,
cfg: cfg,
clock: clock,
registerer: registerer,
done: make(chan struct{}, 1),
store: store,
pubsub: ps,
logger: logger,
cfg: cfg,
clock: clock,
registerer: registerer,
done: make(chan struct{}, 1),
provisionNotifyCh: make(chan database.ProvisionerJob, 10),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thumb-suck; I want to protect against temporary network/database blips but also don't want to accumulate too many messages.

}

reconciler.metrics = NewMetricsCollector(store, logger, reconciler)
Expand Down Expand Up @@ -100,6 +102,29 @@ func (c *StoreReconciler) Run(ctx context.Context) {
// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
c.running.Store(true)

// Publish provisioning jobs outside of database transactions.
// A connection is held while a database transaction is active; PGPubsub also tries to acquire a new connection on
// Publish, so we can exhaust available connections.
//
// A single worker dequeues from the channel, which should be sufficient.
// If any messages are missed due to congestion or errors, provisionerdserver has a backup polling mechanism which
// will periodically pick up any queued jobs (see poll(time.Duration) in coderd/provisionerdserver/acquirer.go).
go func() {
for {
select {
case <-c.done:
return
case <-ctx.Done():
return
case job := <-c.provisionNotifyCh:
err := provisionerjobs.PostJob(c.pubsub, job)
if err != nil {
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
}
}
}
}()

for {
select {
// TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed,
Expand Down Expand Up @@ -576,10 +601,16 @@ func (c *StoreReconciler) provision(
return xerrors.Errorf("provision workspace: %w", err)
}

err = provisionerjobs.PostJob(c.pubsub, *provisionerJob)
if err != nil {
// Client probably doesn't care about this error, so just log it.
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
if provisionerJob == nil {
return nil
}

// Publish provisioner job event outside of transaction.
select {
case c.provisionNotifyCh <- *provisionerJob:
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
}

c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
Expand Down
195 changes: 107 additions & 88 deletions enterprise/coderd/prebuilds/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/util/slice"
Expand Down Expand Up @@ -303,107 +304,125 @@ func TestPrebuildReconciliation(t *testing.T) {
for _, prebuildLatestTransition := range tc.prebuildLatestTransitions {
for _, prebuildJobStatus := range tc.prebuildJobStatuses {
for _, templateDeleted := range tc.templateDeleted {
t.Run(fmt.Sprintf("%s - %s - %s", tc.name, prebuildLatestTransition, prebuildJobStatus), func(t *testing.T) {
t.Parallel()
t.Cleanup(func() {
if t.Failed() {
t.Logf("failed to run test: %s", tc.name)
t.Logf("templateVersionActive: %t", templateVersionActive)
t.Logf("prebuildLatestTransition: %s", prebuildLatestTransition)
t.Logf("prebuildJobStatus: %s", prebuildJobStatus)
for _, useBrokenPubsub := range []bool{true, false} {
t.Run(fmt.Sprintf("%s - %s - %s - pubsub_broken=%v", tc.name, prebuildLatestTransition, prebuildJobStatus, useBrokenPubsub), func(t *testing.T) {
t.Parallel()
t.Cleanup(func() {
if t.Failed() {
t.Logf("failed to run test: %s", tc.name)
t.Logf("templateVersionActive: %t", templateVersionActive)
t.Logf("prebuildLatestTransition: %s", prebuildLatestTransition)
t.Logf("prebuildJobStatus: %s", prebuildJobStatus)
}
})
clock := quartz.NewMock(t)
ctx := testutil.Context(t, testutil.WaitShort)
cfg := codersdk.PrebuildsConfig{}
logger := slogtest.Make(
t, &slogtest.Options{IgnoreErrors: true},
).Leveled(slog.LevelDebug)
db, pubSub := dbtestutil.NewDB(t)
if useBrokenPubsub {
pubSub = &brokenPublisher{Pubsub: pubSub}
}
})
clock := quartz.NewMock(t)
ctx := testutil.Context(t, testutil.WaitShort)
cfg := codersdk.PrebuildsConfig{}
logger := slogtest.Make(
t, &slogtest.Options{IgnoreErrors: true},
).Leveled(slog.LevelDebug)
db, pubSub := dbtestutil.NewDB(t)
controller := prebuilds.NewStoreReconciler(db, pubSub, cfg, logger, quartz.NewMock(t), prometheus.NewRegistry())

ownerID := uuid.New()
dbgen.User(t, db, database.User{
ID: ownerID,
})
org, template := setupTestDBTemplate(t, db, ownerID, templateDeleted)
templateVersionID := setupTestDBTemplateVersion(
ctx,
t,
clock,
db,
pubSub,
org.ID,
ownerID,
template.ID,
)
preset := setupTestDBPreset(
t,
db,
templateVersionID,
1,
uuid.New().String(),
)
prebuild := setupTestDBPrebuild(
t,
clock,
db,
pubSub,
prebuildLatestTransition,
prebuildJobStatus,
org.ID,
preset,
template.ID,
templateVersionID,
)

if !templateVersionActive {
// Create a new template version and mark it as active
// This marks the template version that we care about as inactive
setupTestDBTemplateVersion(ctx, t, clock, db, pubSub, org.ID, ownerID, template.ID)
}

// Run the reconciliation multiple times to ensure idempotency
// 8 was arbitrary, but large enough to reasonably trust the result
for i := 1; i <= 8; i++ {
require.NoErrorf(t, controller.ReconcileAll(ctx), "failed on iteration %d", i)

if tc.shouldCreateNewPrebuild != nil {
newPrebuildCount := 0
workspaces, err := db.GetWorkspacesByTemplateID(ctx, template.ID)
require.NoError(t, err)
for _, workspace := range workspaces {
if workspace.ID != prebuild.ID {
newPrebuildCount++

controller := prebuilds.NewStoreReconciler(db, pubSub, cfg, logger, quartz.NewMock(t), prometheus.NewRegistry())

ownerID := uuid.New()
dbgen.User(t, db, database.User{
ID: ownerID,
})
org, template := setupTestDBTemplate(t, db, ownerID, templateDeleted)
templateVersionID := setupTestDBTemplateVersion(
ctx,
t,
clock,
db,
pubSub,
org.ID,
ownerID,
template.ID,
)
preset := setupTestDBPreset(
t,
db,
templateVersionID,
1,
uuid.New().String(),
)
prebuild := setupTestDBPrebuild(
t,
clock,
db,
pubSub,
prebuildLatestTransition,
prebuildJobStatus,
org.ID,
preset,
template.ID,
templateVersionID,
)

if !templateVersionActive {
// Create a new template version and mark it as active
// This marks the template version that we care about as inactive
setupTestDBTemplateVersion(ctx, t, clock, db, pubSub, org.ID, ownerID, template.ID)
}

// Run the reconciliation multiple times to ensure idempotency
// 8 was arbitrary, but large enough to reasonably trust the result
for i := 1; i <= 8; i++ {
require.NoErrorf(t, controller.ReconcileAll(ctx), "failed on iteration %d", i)

if tc.shouldCreateNewPrebuild != nil {
newPrebuildCount := 0
workspaces, err := db.GetWorkspacesByTemplateID(ctx, template.ID)
require.NoError(t, err)
for _, workspace := range workspaces {
if workspace.ID != prebuild.ID {
newPrebuildCount++
}
}
// This test configures a preset that desires one prebuild.
// In cases where new prebuilds should be created, there should be exactly one.
require.Equal(t, *tc.shouldCreateNewPrebuild, newPrebuildCount == 1)
}
// This test configures a preset that desires one prebuild.
// In cases where new prebuilds should be created, there should be exactly one.
require.Equal(t, *tc.shouldCreateNewPrebuild, newPrebuildCount == 1)
}

if tc.shouldDeleteOldPrebuild != nil {
builds, err := db.GetWorkspaceBuildsByWorkspaceID(ctx, database.GetWorkspaceBuildsByWorkspaceIDParams{
WorkspaceID: prebuild.ID,
})
require.NoError(t, err)
if *tc.shouldDeleteOldPrebuild {
require.Equal(t, 2, len(builds))
require.Equal(t, database.WorkspaceTransitionDelete, builds[0].Transition)
} else {
require.Equal(t, 1, len(builds))
require.Equal(t, prebuildLatestTransition, builds[0].Transition)
if tc.shouldDeleteOldPrebuild != nil {
builds, err := db.GetWorkspaceBuildsByWorkspaceID(ctx, database.GetWorkspaceBuildsByWorkspaceIDParams{
WorkspaceID: prebuild.ID,
})
require.NoError(t, err)
if *tc.shouldDeleteOldPrebuild {
require.Equal(t, 2, len(builds))
require.Equal(t, database.WorkspaceTransitionDelete, builds[0].Transition)
} else {
require.Equal(t, 1, len(builds))
require.Equal(t, prebuildLatestTransition, builds[0].Transition)
}
}
}
}
})
})
}
}
}
}
}
}
}

// brokenPublisher is used to validate that Publish() calls which always fail do not affect the reconciler's behavior,
// since the messages published are not essential but merely advisory.
type brokenPublisher struct {
pubsub.Pubsub
}

func (*brokenPublisher) Publish(event string, _ []byte) error {
// I'm explicitly _not_ checking for EventJobPosted (coderd/database/provisionerjobs/provisionerjobs.go) since that
// requires too much knowledge of the underlying implementation.
return xerrors.Errorf("refusing to publish %q", event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fail slowly instead?

e.g. <-time.After(testutil.IntervalFast)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 9ddad0b

}

func TestMultiplePresetsPerTemplateVersion(t *testing.T) {
t.Parallel()

Expand Down
Loading