Skip to content

Commit 44d12aa

Browse files
committed
Refactoring reconciliation loop into control & logic, adding initial (incomplete) tests
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 159b3ae commit 44d12aa

File tree

4 files changed

+417
-193
lines changed

4 files changed

+417
-193
lines changed

coderd/database/lock.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313
LockIDNotificationsReportGenerator
1414
LockIDCryptoKeyRotation
1515
LockIDReconcileTemplatePrebuilds
16+
LockIDDeterminePrebuildsState
1617
)
1718

1819
// GenLockID generates a unique and consistent lock ID from a given string.

enterprise/coderd/prebuilds/controller.go

Lines changed: 67 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,15 @@ import (
66
"database/sql"
77
"encoding/base32"
88
"fmt"
9-
"math"
10-
mrand "math/rand"
119
"strings"
1210
"sync/atomic"
1311
"time"
1412

15-
"github.com/coder/coder/v2/coderd/util/slice"
1613
"github.com/hashicorp/go-multierror"
1714

18-
"golang.org/x/exp/slices"
19-
2015
"github.com/coder/coder/v2/coderd/audit"
16+
"github.com/coder/coder/v2/coderd/database"
17+
"github.com/coder/coder/v2/coderd/database/dbauthz"
2118
"github.com/coder/coder/v2/coderd/database/dbtime"
2219
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
2320
"github.com/coder/coder/v2/coderd/database/pubsub"
@@ -31,9 +28,6 @@ import (
3128
"github.com/google/uuid"
3229
"golang.org/x/sync/errgroup"
3330
"golang.org/x/xerrors"
34-
35-
"github.com/coder/coder/v2/coderd/database"
36-
"github.com/coder/coder/v2/coderd/database/dbauthz"
3731
)
3832

3933
type Controller struct {
@@ -134,39 +128,41 @@ func (c *Controller) reconcile(ctx context.Context, templateID *uuid.UUID) {
134128
return nil
135129
}
136130

137-
defer logger.Debug(ctx, "acquired top-level prebuilds reconciliation lock", slog.F("acquire_wait_secs", fmt.Sprintf("%.4f", time.Since(start).Seconds())))
138-
139-
innerCtx, cancel := context.WithTimeout(ctx, time.Second*30)
140-
defer cancel()
131+
logger.Debug(ctx, "acquired top-level prebuilds reconciliation lock", slog.F("acquire_wait_secs", fmt.Sprintf("%.4f", time.Since(start).Seconds())))
141132

142133
var id uuid.NullUUID
143134
if templateID != nil {
144135
id.UUID = *templateID
136+
id.Valid = true
145137
}
146138

147-
presetsWithPrebuilds, err := db.GetTemplatePresetsWithPrebuilds(ctx, id)
148-
if len(presetsWithPrebuilds) == 0 {
149-
logger.Debug(innerCtx, "no templates found with prebuilds configured")
150-
return nil
151-
}
152-
153-
runningPrebuilds, err := db.GetRunningPrebuilds(ctx)
139+
state, err := c.determineState(ctx, db, id)
154140
if err != nil {
155-
return xerrors.Errorf("failed to get running prebuilds: %w", err)
141+
return xerrors.Errorf("determine current state: %w", err)
156142
}
157-
158-
prebuildsInProgress, err := db.GetPrebuildsInProgress(ctx)
159-
if err != nil {
160-
return xerrors.Errorf("failed to get prebuilds in progress: %w", err)
143+
if len(state.presets) == 0 {
144+
logger.Debug(ctx, "no templates found with prebuilds configured")
145+
return nil
161146
}
162147

163148
// TODO: bounded concurrency? probably not but consider
164149
var eg errgroup.Group
165-
for _, preset := range presetsWithPrebuilds {
150+
for _, preset := range state.presets {
151+
ps, err := state.filterByPreset(preset.PresetID)
152+
if err != nil {
153+
logger.Warn(ctx, "failed to find preset state", slog.Error(err), slog.F("preset_id", preset.PresetID.String()))
154+
continue
155+
}
156+
157+
if !preset.UsingActiveVersion && len(ps.running) == 0 && len(ps.inProgress) == 0 {
158+
logger.Debug(ctx, "skipping reconciliation for preset; inactive, no running prebuilds, and no in-progress operationss",
159+
slog.F("preset_id", preset.PresetID.String()))
160+
continue
161+
}
162+
166163
eg.Go(func() error {
167164
// Pass outer context.
168-
// TODO: name these better to avoid the comment.
169-
err := c.reconcilePrebuildsForPreset(ctx, preset, runningPrebuilds, prebuildsInProgress)
165+
err := c.reconcilePrebuildsForPreset(ctx, ps)
170166
if err != nil {
171167
logger.Error(ctx, "failed to reconcile prebuilds for preset", slog.Error(err), slog.F("preset_id", preset.PresetID))
172168
}
@@ -186,186 +182,64 @@ func (c *Controller) reconcile(ctx context.Context, templateID *uuid.UUID) {
186182
}
187183
}
188184

189-
type reconciliationActions struct {
190-
deleteIDs []uuid.UUID
191-
createIDs []uuid.UUID
192-
193-
actual int32 // Running prebuilds for active version.
194-
desired int32 // Active template version's desired instances as defined in preset.
195-
eligible int32 // Prebuilds which can be claimed.
196-
outdated int32 // Prebuilds which no longer match the active template version.
197-
extraneous int32 // Extra running prebuilds for active version (somehow).
198-
starting, stopping, deleting int32 // Prebuilds currently being provisioned up or down.
199-
}
200-
201-
// calculateActions MUST be called within the context of a transaction (TODO: isolation)
202-
// with an advisory lock to prevent TOCTOU races.
203-
func (c *Controller) calculateActions(ctx context.Context, preset database.GetTemplatePresetsWithPrebuildsRow, running []database.GetRunningPrebuildsRow, inProgress []database.GetPrebuildsInProgressRow) (*reconciliationActions, error) {
204-
// TODO: align workspace states with how we represent them on the FE and the CLI
205-
// right now there's some slight differences which can lead to additional prebuilds being created
206-
207-
// TODO: add mechanism to prevent prebuilds being reconciled from being claimable by users; i.e. if a prebuild is
208-
// about to be deleted, it should not be deleted if it has been claimed - beware of TOCTOU races!
209-
210-
var (
211-
actual int32 // Running prebuilds for active version.
212-
desired int32 // Active template version's desired instances as defined in preset.
213-
eligible int32 // Prebuilds which can be claimed.
214-
outdated int32 // Prebuilds which no longer match the active template version.
215-
extraneous int32 // Extra running prebuilds for active version (somehow).
216-
starting, stopping, deleting int32 // Prebuilds currently being provisioned up or down.
217-
)
218-
219-
if preset.UsingActiveVersion {
220-
actual = int32(len(running))
221-
desired = preset.DesiredInstances
185+
// determineState determines the current state of prebuilds & the presets which define them.
186+
// This function MUST be called within
187+
func (c *Controller) determineState(ctx context.Context, store database.Store, id uuid.NullUUID) (*reconciliationState, error) {
188+
if err := ctx.Err(); err != nil {
189+
return nil, err
222190
}
223191

224-
for _, prebuild := range running {
225-
if preset.UsingActiveVersion {
226-
if prebuild.Eligible {
227-
eligible++
228-
}
192+
var state reconciliationState
229193

230-
extraneous = int32(math.Max(float64(actual-preset.DesiredInstances), 0))
231-
}
194+
err := store.InTx(func(db database.Store) error {
195+
start := time.Now()
232196

233-
if prebuild.TemplateVersionID == preset.TemplateVersionID && !preset.UsingActiveVersion {
234-
outdated++
197+
// TODO: give up after some time waiting on this?
198+
err := db.AcquireLock(ctx, database.LockIDDeterminePrebuildsState)
199+
if err != nil {
200+
return xerrors.Errorf("failed to acquire state determination lock: %w", err)
235201
}
236-
}
237202

238-
for _, progress := range inProgress {
239-
switch progress.Transition {
240-
case database.WorkspaceTransitionStart:
241-
starting++
242-
case database.WorkspaceTransitionStop:
243-
stopping++
244-
case database.WorkspaceTransitionDelete:
245-
deleting++
246-
default:
247-
c.logger.Warn(ctx, "unknown transition found in prebuilds in progress result", slog.F("transition", progress.Transition))
248-
}
249-
}
203+
c.logger.Debug(ctx, "acquired state determination lock", slog.F("acquire_wait_secs", fmt.Sprintf("%.4f", time.Since(start).Seconds())))
250204

251-
var (
252-
toCreate = int(math.Max(0, float64(
253-
desired- // The number specified in the preset
254-
(actual+starting)- // The current number of prebuilds (or builds in-flight)
255-
stopping), // The number of prebuilds currently being stopped (should be 0)
256-
))
257-
toDelete = int(math.Max(0, float64(
258-
outdated- // The number of prebuilds running above the desired count for active version
259-
deleting), // The number of prebuilds currently being deleted
260-
))
261-
262-
actions = &reconciliationActions{
263-
actual: actual,
264-
desired: desired,
265-
eligible: eligible,
266-
outdated: outdated,
267-
extraneous: extraneous,
268-
starting: starting,
269-
stopping: stopping,
270-
deleting: deleting,
205+
presetsWithPrebuilds, err := db.GetTemplatePresetsWithPrebuilds(ctx, id)
206+
if len(presetsWithPrebuilds) == 0 {
207+
return nil
271208
}
272-
)
273-
274-
// Bail early to avoid scheduling new prebuilds while operations are in progress.
275-
if (toCreate+toDelete) > 0 && (starting+stopping+deleting) > 0 {
276-
c.logger.Warn(ctx, "prebuild operations in progress, skipping reconciliation",
277-
slog.F("template_id", preset.TemplateID.String()), slog.F("starting", starting),
278-
slog.F("stopping", stopping), slog.F("deleting", deleting),
279-
slog.F("wanted_to_create", toCreate), slog.F("wanted_to_delete", toDelete))
280-
return actions, nil
281-
}
282-
283-
// It's possible that an operator could stop/start prebuilds which interfere with the reconciliation loop, so
284-
// we check if there are somehow more prebuilds than we expect, and then pick random victims to be deleted.
285-
if extraneous > 0 {
286-
// Sort running IDs randomly so we can pick random victims.
287-
slices.SortFunc(running, func(_, _ database.GetRunningPrebuildsRow) int {
288-
if mrand.Float64() > 0.5 {
289-
return -1
290-
}
291-
292-
return 1
293-
})
294209

295-
var victims []uuid.UUID
296-
for i := 0; i < int(extraneous); i++ {
297-
if i >= len(running) {
298-
// This should never happen.
299-
c.logger.Warn(ctx, "unexpected reconciliation state; extraneous count exceeds running prebuilds count!",
300-
slog.F("running_count", len(running)),
301-
slog.F("extraneous", extraneous))
302-
continue
303-
}
304-
305-
victims = append(victims, running[i].WorkspaceID)
210+
allRunningPrebuilds, err := db.GetRunningPrebuilds(ctx)
211+
if err != nil {
212+
return xerrors.Errorf("failed to get running prebuilds: %w", err)
306213
}
307214

308-
actions.deleteIDs = append(actions.deleteIDs, victims...)
309-
310-
c.logger.Warn(ctx, "found extra prebuilds running, picking random victim(s)",
311-
slog.F("template_id", preset.TemplateID.String()), slog.F("desired", desired), slog.F("actual", actual), slog.F("extra", extraneous),
312-
slog.F("victims", victims))
313-
314-
// Prevent the rest of the reconciliation from completing
315-
return actions, nil
316-
}
317-
318-
// If the template has become deleted or deprecated since the last reconciliation, we need to ensure we
319-
// scale those prebuilds down to zero.
320-
if preset.Deleted || preset.Deprecated {
321-
toCreate = 0
322-
toDelete = int(actual + outdated)
323-
}
324-
325-
for i := 0; i < toCreate; i++ {
326-
actions.createIDs = append(actions.createIDs, uuid.New())
327-
}
328-
329-
if toDelete > 0 && len(running) != toDelete {
330-
c.logger.Warn(ctx, "mismatch between running prebuilds and expected deletion count!",
331-
slog.F("template_id", preset.TemplateID.String()), slog.F("running", len(running)), slog.F("to_delete", toDelete))
332-
}
333-
334-
// TODO: implement lookup to not perform same action on workspace multiple times in $period
335-
// i.e. a workspace cannot be deleted for some reason, which continually makes it eligible for deletion
336-
for i := 0; i < toDelete; i++ {
337-
if i >= len(running) {
338-
// Above warning will have already addressed this.
339-
continue
215+
allPrebuildsInProgress, err := db.GetPrebuildsInProgress(ctx)
216+
if err != nil {
217+
return xerrors.Errorf("failed to get prebuilds in progress: %w", err)
340218
}
341219

342-
actions.deleteIDs = append(actions.deleteIDs, running[i].WorkspaceID)
343-
}
220+
state = newReconciliationState(presetsWithPrebuilds, allRunningPrebuilds, allPrebuildsInProgress)
221+
return nil
222+
}, &database.TxOptions{
223+
Isolation: sql.LevelRepeatableRead, // This mirrors the MVCC snapshotting Postgres does when using CTEs
224+
ReadOnly: true,
225+
TxIdentifier: "prebuilds_state_determination",
226+
})
344227

345-
return actions, nil
228+
return &state, err
346229
}
347230

348-
func (c *Controller) reconcilePrebuildsForPreset(ctx context.Context, preset database.GetTemplatePresetsWithPrebuildsRow,
349-
allRunning []database.GetRunningPrebuildsRow, allInProgress []database.GetPrebuildsInProgressRow,
350-
) error {
351-
logger := c.logger.With(slog.F("template_id", preset.TemplateID.String()))
231+
func (c *Controller) reconcilePrebuildsForPreset(ctx context.Context, ps *presetState) error {
232+
if ps == nil {
233+
return xerrors.Errorf("unexpected nil preset state")
234+
}
352235

353-
var lastErr multierror.Error
354-
vlogger := logger.With(slog.F("template_version_id", preset.TemplateVersionID), slog.F("preset_id", preset.PresetID))
236+
logger := c.logger.With(slog.F("template_id", ps.preset.TemplateID.String()))
355237

356-
running := slice.Filter(allRunning, func(prebuild database.GetRunningPrebuildsRow) bool {
357-
if !prebuild.DesiredPresetID.Valid && !prebuild.CurrentPresetID.Valid {
358-
return false
359-
}
360-
return prebuild.CurrentPresetID.UUID == preset.PresetID &&
361-
prebuild.TemplateVersionID == preset.TemplateVersionID // Not strictly necessary since presets are 1:1 with template versions, but no harm in being extra safe.
362-
})
363-
364-
inProgress := slice.Filter(allInProgress, func(prebuild database.GetPrebuildsInProgressRow) bool {
365-
return prebuild.TemplateVersionID == preset.TemplateVersionID
366-
})
238+
var lastErr multierror.Error
239+
vlogger := logger.With(slog.F("template_version_id", ps.preset.TemplateVersionID), slog.F("preset_id", ps.preset.PresetID))
367240

368-
actions, err := c.calculateActions(ctx, preset, running, inProgress)
241+
// TODO: move log lines up from calculateActions.
242+
actions, err := ps.calculateActions()
369243
if err != nil {
370244
vlogger.Error(ctx, "failed to calculate reconciliation actions", slog.Error(err))
371245
return xerrors.Errorf("failed to calculate reconciliation actions: %w", err)
@@ -380,12 +254,12 @@ func (c *Controller) reconcilePrebuildsForPreset(ctx context.Context, preset dat
380254
})
381255

382256
levelFn := vlogger.Debug
383-
if len(actions.createIDs) > 0 || len(actions.deleteIDs) > 0 {
257+
if actions.create > 0 || len(actions.deleteIDs) > 0 {
384258
// Only log with info level when there's a change that needs to be effected.
385259
levelFn = vlogger.Info
386260
}
387261
levelFn(ctx, "template prebuild state retrieved",
388-
slog.F("to_create", len(actions.createIDs)), slog.F("to_delete", len(actions.deleteIDs)),
262+
slog.F("to_create", actions.create), slog.F("to_delete", len(actions.deleteIDs)),
389263
slog.F("desired", actions.desired), slog.F("actual", actions.actual),
390264
slog.F("outdated", actions.outdated), slog.F("extraneous", actions.extraneous),
391265
slog.F("starting", actions.starting), slog.F("stopping", actions.stopping),
@@ -396,15 +270,15 @@ func (c *Controller) reconcilePrebuildsForPreset(ctx context.Context, preset dat
396270
// TODO: max per reconciliation iteration?
397271

398272
// TODO: i've removed the surrounding tx, but if we restore it then we need to pass down the store to these funcs.
399-
for _, id := range actions.createIDs {
400-
if err := c.createPrebuild(ownerCtx, id, preset.TemplateID, preset.PresetID); err != nil {
273+
for range actions.create {
274+
if err := c.createPrebuild(ownerCtx, uuid.New(), ps.preset.TemplateID, ps.preset.PresetID); err != nil {
401275
vlogger.Error(ctx, "failed to create prebuild", slog.Error(err))
402276
lastErr.Errors = append(lastErr.Errors, err)
403277
}
404278
}
405279

406280
for _, id := range actions.deleteIDs {
407-
if err := c.deletePrebuild(ownerCtx, id, preset.TemplateID, preset.PresetID); err != nil {
281+
if err := c.deletePrebuild(ownerCtx, id, ps.preset.TemplateID, ps.preset.PresetID); err != nil {
408282
vlogger.Error(ctx, "failed to delete prebuild", slog.Error(err))
409283
lastErr.Errors = append(lastErr.Errors, err)
410284
}

0 commit comments

Comments
 (0)