Skip to content

Commit 375c70d

Browse files
authored
feat: integrate Acquirer for provisioner jobs (coder#9717)
* chore: add Acquirer to provisionerdserver pkg Signed-off-by: Spike Curtis <spike@coder.com> * code review improvements & fixes Signed-off-by: Spike Curtis <spike@coder.com> * feat: integrate Acquirer for provisioner jobs Signed-off-by: Spike Curtis <spike@coder.com> * Fix imports, whitespace Signed-off-by: Spike Curtis <spike@coder.com> * provisionerdserver always closes; remove poll interval from playwright Signed-off-by: Spike Curtis <spike@coder.com> * post jobs outside transactions Signed-off-by: Spike Curtis <spike@coder.com> * graceful shutdown in test Signed-off-by: Spike Curtis <spike@coder.com> * Mark AcquireJob deprecated Signed-off-by: Spike Curtis <spike@coder.com> * Graceful shutdown on all provisionerd tests Signed-off-by: Spike Curtis <spike@coder.com> * Deprecate, not remove CLI flags Signed-off-by: Spike Curtis <spike@coder.com> --------- Signed-off-by: Spike Curtis <spike@coder.com>
1 parent 6cf531b commit 375c70d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1486
-1085
lines changed

cli/server.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,8 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
10121012

10131013
autobuildTicker := time.NewTicker(vals.AutobuildPollInterval.Value())
10141014
defer autobuildTicker.Stop()
1015-
autobuildExecutor := autobuild.NewExecutor(ctx, options.Database, coderAPI.TemplateScheduleStore, logger, autobuildTicker.C)
1015+
autobuildExecutor := autobuild.NewExecutor(
1016+
ctx, options.Database, options.Pubsub, coderAPI.TemplateScheduleStore, logger, autobuildTicker.C)
10161017
autobuildExecutor.Run()
10171018

10181019
hangDetectorTicker := time.NewTicker(vals.JobHangDetectorInterval.Value())
@@ -1378,16 +1379,12 @@ func newProvisionerDaemon(
13781379
connector[string(database.ProvisionerTypeTerraform)] = sdkproto.NewDRPCProvisionerClient(terraformClient)
13791380
}
13801381

1381-
debounce := time.Second
13821382
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
13831383
// This debounces calls to listen every second. Read the comment
13841384
// in provisionerdserver.go to learn more!
1385-
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, debounce)
1385+
return coderAPI.CreateInMemoryProvisionerDaemon(ctx)
13861386
}, &provisionerd.Options{
13871387
Logger: logger.Named("provisionerd"),
1388-
JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value(),
1389-
JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value(),
1390-
JobPollDebounce: debounce,
13911388
UpdateInterval: time.Second,
13921389
ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value(),
13931390
Connector: connector,

cli/testdata/coder_server_--help.golden

+2-2
Original file line numberDiff line numberDiff line change
@@ -393,10 +393,10 @@ updating, and deleting workspace resources.
393393
Time to force cancel provisioning tasks that are stuck.
394394

395395
--provisioner-daemon-poll-interval duration, $CODER_PROVISIONER_DAEMON_POLL_INTERVAL (default: 1s)
396-
Time to wait before polling for a new job.
396+
Deprecated and ignored.
397397

398398
--provisioner-daemon-poll-jitter duration, $CODER_PROVISIONER_DAEMON_POLL_JITTER (default: 100ms)
399-
Random jitter added to the poll interval.
399+
Deprecated and ignored.
400400

401401
--provisioner-daemon-psk string, $CODER_PROVISIONER_DAEMON_PSK
402402
Pre-shared key to authenticate external provisioner daemons to Coder

cli/testdata/server-config.yaml.golden

+2-2
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,10 @@ provisioning:
348348
# tests.
349349
# (default: false, type: bool)
350350
daemonsEcho: false
351-
# Time to wait before polling for a new job.
351+
# Deprecated and ignored.
352352
# (default: 1s, type: duration)
353353
daemonPollInterval: 1s
354-
# Random jitter added to the poll interval.
354+
# Deprecated and ignored.
355355
# (default: 100ms, type: duration)
356356
daemonPollJitter: 100ms
357357
# Time to force cancel provisioning tasks that are stuck.

coderd/activitybump_internal_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) {
134134
TemplateID: template.ID,
135135
Ttl: sql.NullInt64{Valid: true, Int64: int64(tt.workspaceTTL)},
136136
})
137-
job = dbgen.ProvisionerJob(t, db, database.ProvisionerJob{
137+
job = dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
138138
OrganizationID: org.ID,
139139
CompletedAt: tt.jobCompletedAt,
140140
})
@@ -225,7 +225,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) {
225225
func insertPrevWorkspaceBuild(t *testing.T, db database.Store, orgID, tvID, workspaceID uuid.UUID, transition database.WorkspaceTransition, buildNumber int32) {
226226
t.Helper()
227227

228-
job := dbgen.ProvisionerJob(t, db, database.ProvisionerJob{
228+
job := dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
229229
OrganizationID: orgID,
230230
})
231231
_ = dbgen.WorkspaceResource(t, db, database.WorkspaceResource{

coderd/autobuild/lifecycle_executor.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/coder/coder/v2/coderd/database/db2sdk"
1717
"github.com/coder/coder/v2/coderd/database/dbauthz"
1818
"github.com/coder/coder/v2/coderd/database/dbtime"
19+
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
20+
"github.com/coder/coder/v2/coderd/database/pubsub"
1921
"github.com/coder/coder/v2/coderd/schedule"
2022
"github.com/coder/coder/v2/coderd/schedule/cron"
2123
"github.com/coder/coder/v2/coderd/wsbuilder"
@@ -26,6 +28,7 @@ import (
2628
type Executor struct {
2729
ctx context.Context
2830
db database.Store
31+
ps pubsub.Pubsub
2932
templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
3033
log slog.Logger
3134
tick <-chan time.Time
@@ -40,11 +43,12 @@ type Stats struct {
4043
}
4144

4245
// New returns a new wsactions executor.
43-
func NewExecutor(ctx context.Context, db database.Store, tss *atomic.Pointer[schedule.TemplateScheduleStore], log slog.Logger, tick <-chan time.Time) *Executor {
46+
func NewExecutor(ctx context.Context, db database.Store, ps pubsub.Pubsub, tss *atomic.Pointer[schedule.TemplateScheduleStore], log slog.Logger, tick <-chan time.Time) *Executor {
4447
le := &Executor{
4548
//nolint:gocritic // Autostart has a limited set of permissions.
4649
ctx: dbauthz.AsAutostart(ctx),
4750
db: db,
51+
ps: ps,
4852
templateScheduleStore: tss,
4953
tick: tick,
5054
log: log.Named("autobuild"),
@@ -129,6 +133,7 @@ func (e *Executor) runOnce(t time.Time) Stats {
129133
log := e.log.With(slog.F("workspace_id", wsID))
130134

131135
eg.Go(func() error {
136+
var job *database.ProvisionerJob
132137
err := e.db.InTx(func(tx database.Store) error {
133138
// Re-check eligibility since the first check was outside the
134139
// transaction and the workspace settings may have changed.
@@ -168,7 +173,8 @@ func (e *Executor) runOnce(t time.Time) Stats {
168173
SetLastWorkspaceBuildJobInTx(&latestJob).
169174
Reason(reason)
170175

171-
if _, _, err := builder.Build(e.ctx, tx, nil); err != nil {
176+
_, job, err = builder.Build(e.ctx, tx, nil)
177+
if err != nil {
172178
log.Error(e.ctx, "unable to transition workspace",
173179
slog.F("transition", nextTransition),
174180
slog.Error(err),
@@ -230,6 +236,17 @@ func (e *Executor) runOnce(t time.Time) Stats {
230236
if err != nil {
231237
log.Error(e.ctx, "workspace scheduling failed", slog.Error(err))
232238
}
239+
if job != nil && err == nil {
240+
// Note that we can't refactor such that posting the job happens inside wsbuilder because it's called
241+
// with an outer transaction like this, and we need to make sure the outer transaction commits before
242+
// posting the job. If we post before the transaction commits, provisionerd might try to acquire the
243+
// job, fail, and then sit idle instead of picking up the job.
244+
err = provisionerjobs.PostJob(e.ps, *job)
245+
if err != nil {
246+
// Client probably doesn't care about this error, so just log it.
247+
log.Error(e.ctx, "failed to post provisioner job to pubsub", slog.Error(err))
248+
}
249+
}
233250
return nil
234251
})
235252
}

coderd/batchstats/batcher_internal_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/coder/coder/v2/coderd/database/dbgen"
1515
"github.com/coder/coder/v2/coderd/database/dbtestutil"
1616
"github.com/coder/coder/v2/coderd/database/dbtime"
17+
"github.com/coder/coder/v2/coderd/database/pubsub"
1718
"github.com/coder/coder/v2/coderd/rbac"
1819
"github.com/coder/coder/v2/codersdk/agentsdk"
1920
"github.com/coder/coder/v2/cryptorand"
@@ -26,11 +27,11 @@ func TestBatchStats(t *testing.T) {
2627
ctx, cancel := context.WithCancel(context.Background())
2728
t.Cleanup(cancel)
2829
log := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
29-
store, _ := dbtestutil.NewDB(t)
30+
store, ps := dbtestutil.NewDB(t)
3031

3132
// Set up some test dependencies.
32-
deps1 := setupDeps(t, store)
33-
deps2 := setupDeps(t, store)
33+
deps1 := setupDeps(t, store, ps)
34+
deps2 := setupDeps(t, store, ps)
3435
tick := make(chan time.Time)
3536
flushed := make(chan int, 1)
3637

@@ -168,7 +169,7 @@ type deps struct {
168169
// It creates an organization, user, template, workspace, and agent
169170
// along with all the other miscellaneous plumbing required to link
170171
// them together.
171-
func setupDeps(t *testing.T, store database.Store) deps {
172+
func setupDeps(t *testing.T, store database.Store, ps pubsub.Pubsub) deps {
172173
t.Helper()
173174

174175
org := dbgen.Organization(t, store, database.Organization{})
@@ -194,7 +195,7 @@ func setupDeps(t *testing.T, store database.Store) deps {
194195
OrganizationID: org.ID,
195196
LastUsedAt: time.Now().Add(-time.Hour),
196197
})
197-
pj := dbgen.ProvisionerJob(t, store, database.ProvisionerJob{
198+
pj := dbgen.ProvisionerJob(t, store, ps, database.ProvisionerJob{
198199
InitiatorID: user.ID,
199200
OrganizationID: org.ID,
200201
})

coderd/coderd.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7-
"encoding/json"
87
"flag"
98
"fmt"
109
"io"
@@ -366,6 +365,11 @@ func New(options *Options) *API {
366365
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
367366
Experiments: experiments,
368367
healthCheckGroup: &singleflight.Group[string, *healthcheck.Report]{},
368+
Acquirer: provisionerdserver.NewAcquirer(
369+
ctx,
370+
options.Logger.Named("acquirer"),
371+
options.Database,
372+
options.Pubsub),
369373
}
370374
if options.UpdateCheckOptions != nil {
371375
api.updateChecker = updatecheck.New(
@@ -1016,6 +1020,8 @@ type API struct {
10161020
healthCheckCache atomic.Pointer[healthcheck.Report]
10171021

10181022
statsBatcher *batchstats.Batcher
1023+
1024+
Acquirer *provisionerdserver.Acquirer
10191025
}
10201026

10211027
// Close waits for all WebSocket connections to drain before returning.
@@ -1067,7 +1073,7 @@ func compressHandler(h http.Handler) http.Handler {
10671073

10681074
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
10691075
// Useful when starting coderd and provisionerd in the same process.
1070-
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
1076+
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) {
10711077
tracer := api.TracerProvider.Tracer(tracing.TracerName)
10721078
clientSession, serverSession := provisionersdk.MemTransportPipe()
10731079
defer func() {
@@ -1077,11 +1083,8 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
10771083
}
10781084
}()
10791085

1080-
tags, err := json.Marshal(database.StringMap{
1086+
tags := provisionerdserver.Tags{
10811087
provisionerdserver.TagScope: provisionerdserver.ScopeOrganization,
1082-
})
1083-
if err != nil {
1084-
return nil, xerrors.Errorf("marshal tags: %w", err)
10851088
}
10861089

10871090
mux := drpcmux.New()
@@ -1098,14 +1101,14 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
10981101
tags,
10991102
api.Database,
11001103
api.Pubsub,
1104+
api.Acquirer,
11011105
api.Telemetry,
11021106
tracer,
11031107
&api.QuotaCommitter,
11041108
&api.Auditor,
11051109
api.TemplateScheduleStore,
11061110
api.UserQuietHoursScheduleStore,
11071111
api.DeploymentValues,
1108-
debounce,
11091112
provisionerdserver.Options{
11101113
OIDCConfig: api.OIDCConfig,
11111114
GitAuthConfigs: api.GitAuthConfigs,

coderd/coderdtest/coderdtest.go

+30-5
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
266266
lifecycleExecutor := autobuild.NewExecutor(
267267
ctx,
268268
options.Database,
269+
options.Pubsub,
269270
&templateScheduleStore,
270271
slogtest.Make(t, nil).Named("autobuild.executor").Leveled(slog.LevelDebug),
271272
options.AutobuildTicker,
@@ -453,6 +454,30 @@ func NewWithAPI(t testing.TB, options *Options) (*codersdk.Client, io.Closer, *c
453454
return client, provisionerCloser, coderAPI
454455
}
455456

457+
// provisionerdCloser wraps a provisioner daemon as an io.Closer that can be called multiple times
458+
type provisionerdCloser struct {
459+
mu sync.Mutex
460+
closed bool
461+
d *provisionerd.Server
462+
}
463+
464+
func (c *provisionerdCloser) Close() error {
465+
c.mu.Lock()
466+
defer c.mu.Unlock()
467+
if c.closed {
468+
return nil
469+
}
470+
c.closed = true
471+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
472+
defer cancel()
473+
shutdownErr := c.d.Shutdown(ctx)
474+
closeErr := c.d.Close()
475+
if shutdownErr != nil {
476+
return shutdownErr
477+
}
478+
return closeErr
479+
}
480+
456481
// NewProvisionerDaemon launches a provisionerd instance configured to work
457482
// well with coderd testing. It registers the "echo" provisioner for
458483
// quick testing.
@@ -482,17 +507,17 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
482507
assert.NoError(t, err)
483508
}()
484509

485-
closer := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
486-
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, 0)
510+
daemon := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
511+
return coderAPI.CreateInMemoryProvisionerDaemon(ctx)
487512
}, &provisionerd.Options{
488513
Logger: coderAPI.Logger.Named("provisionerd").Leveled(slog.LevelDebug),
489-
JobPollInterval: 50 * time.Millisecond,
490514
UpdateInterval: 250 * time.Millisecond,
491515
ForceCancelInterval: time.Second,
492516
Connector: provisionerd.LocalProvisioners{
493517
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
494518
},
495519
})
520+
closer := &provisionerdCloser{d: daemon}
496521
t.Cleanup(func() {
497522
_ = closer.Close()
498523
})
@@ -518,21 +543,21 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui
518543
assert.NoError(t, err)
519544
}()
520545

521-
closer := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
546+
daemon := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
522547
return client.ServeProvisionerDaemon(ctx, codersdk.ServeProvisionerDaemonRequest{
523548
Organization: org,
524549
Provisioners: []codersdk.ProvisionerType{codersdk.ProvisionerTypeEcho},
525550
Tags: tags,
526551
})
527552
}, &provisionerd.Options{
528553
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
529-
JobPollInterval: 50 * time.Millisecond,
530554
UpdateInterval: 250 * time.Millisecond,
531555
ForceCancelInterval: time.Second,
532556
Connector: provisionerd.LocalProvisioners{
533557
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
534558
},
535559
})
560+
closer := &provisionerdCloser{d: daemon}
536561
t.Cleanup(func() {
537562
_ = closer.Close()
538563
})

0 commit comments

Comments
 (0)