Skip to content

Commit 23d0e3c

Browse files
committed
feat: integrate Acquirer for provisioner jobs
Signed-off-by: Spike Curtis <spike@coder.com>
1 parent 1ceac51 commit 23d0e3c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1488
-1170
lines changed

cli/server.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,8 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
10121012

10131013
autobuildTicker := time.NewTicker(vals.AutobuildPollInterval.Value())
10141014
defer autobuildTicker.Stop()
1015-
autobuildExecutor := autobuild.NewExecutor(ctx, options.Database, coderAPI.TemplateScheduleStore, logger, autobuildTicker.C)
1015+
autobuildExecutor := autobuild.NewExecutor(
1016+
ctx, options.Database, options.Pubsub, coderAPI.TemplateScheduleStore, logger, autobuildTicker.C)
10161017
autobuildExecutor.Run()
10171018

10181019
hangDetectorTicker := time.NewTicker(vals.JobHangDetectorInterval.Value())
@@ -1378,16 +1379,12 @@ func newProvisionerDaemon(
13781379
connector[string(database.ProvisionerTypeTerraform)] = sdkproto.NewDRPCProvisionerClient(terraformClient)
13791380
}
13801381

1381-
debounce := time.Second
13821382
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
13831383
// This debounces calls to listen every second. Read the comment
13841384
// in provisionerdserver.go to learn more!
1385-
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, debounce)
1385+
return coderAPI.CreateInMemoryProvisionerDaemon(ctx)
13861386
}, &provisionerd.Options{
13871387
Logger: logger.Named("provisionerd"),
1388-
JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value(),
1389-
JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value(),
1390-
JobPollDebounce: debounce,
13911388
UpdateInterval: time.Second,
13921389
ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value(),
13931390
Connector: connector,

cli/testdata/coder_server_--help.golden

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -392,12 +392,6 @@ updating, and deleting workspace resources.
392392
--provisioner-force-cancel-interval duration, $CODER_PROVISIONER_FORCE_CANCEL_INTERVAL (default: 10m0s)
393393
Time to force cancel provisioning tasks that are stuck.
394394

395-
--provisioner-daemon-poll-interval duration, $CODER_PROVISIONER_DAEMON_POLL_INTERVAL (default: 1s)
396-
Time to wait before polling for a new job.
397-
398-
--provisioner-daemon-poll-jitter duration, $CODER_PROVISIONER_DAEMON_POLL_JITTER (default: 100ms)
399-
Random jitter added to the poll interval.
400-
401395
--provisioner-daemon-psk string, $CODER_PROVISIONER_DAEMON_PSK
402396
Pre-shared key to authenticate external provisioner daemons to Coder
403397
server.

cli/testdata/server-config.yaml.golden

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -353,12 +353,6 @@ provisioning:
353353
# tests.
354354
# (default: false, type: bool)
355355
daemonsEcho: false
356-
# Time to wait before polling for a new job.
357-
# (default: 1s, type: duration)
358-
daemonPollInterval: 1s
359-
# Random jitter added to the poll interval.
360-
# (default: 100ms, type: duration)
361-
daemonPollJitter: 100ms
362356
# Time to force cancel provisioning tasks that are stuck.
363357
# (default: 10m0s, type: duration)
364358
forceCancelInterval: 10m0s

coderd/apidoc/docs.go

Lines changed: 0 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 0 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/autobuild/lifecycle_executor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/coder/coder/v2/coderd/database/db2sdk"
1717
"github.com/coder/coder/v2/coderd/database/dbauthz"
1818
"github.com/coder/coder/v2/coderd/database/dbtime"
19+
"github.com/coder/coder/v2/coderd/database/pubsub"
1920
"github.com/coder/coder/v2/coderd/schedule"
2021
"github.com/coder/coder/v2/coderd/schedule/cron"
2122
"github.com/coder/coder/v2/coderd/wsbuilder"
@@ -26,6 +27,7 @@ import (
2627
type Executor struct {
2728
ctx context.Context
2829
db database.Store
30+
ps pubsub.Pubsub
2931
templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
3032
log slog.Logger
3133
tick <-chan time.Time
@@ -40,11 +42,12 @@ type Stats struct {
4042
}
4143

4244
// New returns a new wsactions executor.
43-
func NewExecutor(ctx context.Context, db database.Store, tss *atomic.Pointer[schedule.TemplateScheduleStore], log slog.Logger, tick <-chan time.Time) *Executor {
45+
func NewExecutor(ctx context.Context, db database.Store, ps pubsub.Pubsub, tss *atomic.Pointer[schedule.TemplateScheduleStore], log slog.Logger, tick <-chan time.Time) *Executor {
4446
le := &Executor{
4547
//nolint:gocritic // Autostart has a limited set of permissions.
4648
ctx: dbauthz.AsAutostart(ctx),
4749
db: db,
50+
ps: ps,
4851
templateScheduleStore: tss,
4952
tick: tick,
5053
log: log.Named("autobuild"),
@@ -168,7 +171,7 @@ func (e *Executor) runOnce(t time.Time) Stats {
168171
SetLastWorkspaceBuildJobInTx(&latestJob).
169172
Reason(reason)
170173

171-
if _, _, err := builder.Build(e.ctx, tx, nil); err != nil {
174+
if _, _, err := builder.Build(e.ctx, log, tx, e.ps, nil); err != nil {
172175
log.Error(e.ctx, "unable to transition workspace",
173176
slog.F("transition", nextTransition),
174177
slog.Error(err),

coderd/batchstats/batcher_internal_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/coder/coder/v2/coderd/database/pubsub"
9+
810
"github.com/stretchr/testify/require"
911

1012
"cdr.dev/slog"
@@ -26,11 +28,11 @@ func TestBatchStats(t *testing.T) {
2628
ctx, cancel := context.WithCancel(context.Background())
2729
t.Cleanup(cancel)
2830
log := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
29-
store, _ := dbtestutil.NewDB(t)
31+
store, ps := dbtestutil.NewDB(t)
3032

3133
// Set up some test dependencies.
32-
deps1 := setupDeps(t, store)
33-
deps2 := setupDeps(t, store)
34+
deps1 := setupDeps(t, store, ps)
35+
deps2 := setupDeps(t, store, ps)
3436
tick := make(chan time.Time)
3537
flushed := make(chan int, 1)
3638

@@ -168,7 +170,7 @@ type deps struct {
168170
// It creates an organization, user, template, workspace, and agent
169171
// along with all the other miscellaneous plumbing required to link
170172
// them together.
171-
func setupDeps(t *testing.T, store database.Store) deps {
173+
func setupDeps(t *testing.T, store database.Store, ps pubsub.Pubsub) deps {
172174
t.Helper()
173175

174176
org := dbgen.Organization(t, store, database.Organization{})
@@ -194,7 +196,7 @@ func setupDeps(t *testing.T, store database.Store) deps {
194196
OrganizationID: org.ID,
195197
LastUsedAt: time.Now().Add(-time.Hour),
196198
})
197-
pj := dbgen.ProvisionerJob(t, store, database.ProvisionerJob{
199+
pj := dbgen.ProvisionerJob(t, store, ps, database.ProvisionerJob{
198200
InitiatorID: user.ID,
199201
OrganizationID: org.ID,
200202
})

coderd/coderd.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7-
"encoding/json"
87
"flag"
98
"fmt"
109
"io"
@@ -366,6 +365,11 @@ func New(options *Options) *API {
366365
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
367366
Experiments: experiments,
368367
healthCheckGroup: &singleflight.Group[string, *healthcheck.Report]{},
368+
Acquirer: provisionerdserver.NewAcquirer(
369+
ctx,
370+
options.Logger.Named("acquirer"),
371+
options.Database,
372+
options.Pubsub),
369373
}
370374
if options.UpdateCheckOptions != nil {
371375
api.updateChecker = updatecheck.New(
@@ -1016,6 +1020,8 @@ type API struct {
10161020
healthCheckCache atomic.Pointer[healthcheck.Report]
10171021

10181022
statsBatcher *batchstats.Batcher
1023+
1024+
Acquirer *provisionerdserver.Acquirer
10191025
}
10201026

10211027
// Close waits for all WebSocket connections to drain before returning.
@@ -1067,7 +1073,7 @@ func compressHandler(h http.Handler) http.Handler {
10671073

10681074
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
10691075
// Useful when starting coderd and provisionerd in the same process.
1070-
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
1076+
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) {
10711077
tracer := api.TracerProvider.Tracer(tracing.TracerName)
10721078
clientSession, serverSession := provisionersdk.MemTransportPipe()
10731079
defer func() {
@@ -1077,11 +1083,8 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
10771083
}
10781084
}()
10791085

1080-
tags, err := json.Marshal(database.StringMap{
1086+
tags := provisionerdserver.Tags{
10811087
provisionerdserver.TagScope: provisionerdserver.ScopeOrganization,
1082-
})
1083-
if err != nil {
1084-
return nil, xerrors.Errorf("marshal tags: %w", err)
10851088
}
10861089

10871090
mux := drpcmux.New()
@@ -1098,14 +1101,14 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
10981101
tags,
10991102
api.Database,
11001103
api.Pubsub,
1104+
api.Acquirer,
11011105
api.Telemetry,
11021106
tracer,
11031107
&api.QuotaCommitter,
11041108
&api.Auditor,
11051109
api.TemplateScheduleStore,
11061110
api.UserQuietHoursScheduleStore,
11071111
api.DeploymentValues,
1108-
debounce,
11091112
provisionerdserver.Options{
11101113
OIDCConfig: api.OIDCConfig,
11111114
GitAuthConfigs: api.GitAuthConfigs,

coderd/coderdtest/coderdtest.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
266266
lifecycleExecutor := autobuild.NewExecutor(
267267
ctx,
268268
options.Database,
269+
options.Pubsub,
269270
&templateScheduleStore,
270271
slogtest.Make(t, nil).Named("autobuild.executor").Leveled(slog.LevelDebug),
271272
options.AutobuildTicker,
@@ -453,6 +454,30 @@ func NewWithAPI(t testing.TB, options *Options) (*codersdk.Client, io.Closer, *c
453454
return client, provisionerCloser, coderAPI
454455
}
455456

457+
// provisionerdCloser wraps a provisioner daemon as an io.Closer that can be called multiple times
458+
type provisionerdCloser struct {
459+
mu sync.Mutex
460+
closed bool
461+
d *provisionerd.Server
462+
}
463+
464+
func (c *provisionerdCloser) Close() error {
465+
c.mu.Lock()
466+
defer c.mu.Unlock()
467+
if c.closed {
468+
return nil
469+
}
470+
c.closed = true
471+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
472+
defer cancel()
473+
shutdownErr := c.d.Shutdown(ctx)
474+
closeErr := c.d.Close()
475+
if shutdownErr != nil {
476+
return shutdownErr
477+
}
478+
return closeErr
479+
}
480+
456481
// NewProvisionerDaemon launches a provisionerd instance configured to work
457482
// well with coderd testing. It registers the "echo" provisioner for
458483
// quick testing.
@@ -477,17 +502,17 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
477502
assert.NoError(t, err)
478503
}()
479504

480-
closer := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
481-
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, 0)
505+
daemon := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
506+
return coderAPI.CreateInMemoryProvisionerDaemon(ctx)
482507
}, &provisionerd.Options{
483508
Logger: coderAPI.Logger.Named("provisionerd").Leveled(slog.LevelDebug),
484-
JobPollInterval: 50 * time.Millisecond,
485509
UpdateInterval: 250 * time.Millisecond,
486510
ForceCancelInterval: time.Second,
487511
Connector: provisionerd.LocalProvisioners{
488512
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
489513
},
490514
})
515+
closer := &provisionerdCloser{d: daemon}
491516
t.Cleanup(func() {
492517
_ = closer.Close()
493518
})
@@ -513,21 +538,21 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui
513538
assert.NoError(t, err)
514539
}()
515540

516-
closer := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
541+
daemon := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
517542
return client.ServeProvisionerDaemon(ctx, codersdk.ServeProvisionerDaemonRequest{
518543
Organization: org,
519544
Provisioners: []codersdk.ProvisionerType{codersdk.ProvisionerTypeEcho},
520545
Tags: tags,
521546
})
522547
}, &provisionerd.Options{
523548
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
524-
JobPollInterval: 50 * time.Millisecond,
525549
UpdateInterval: 250 * time.Millisecond,
526550
ForceCancelInterval: time.Second,
527551
Connector: provisionerd.LocalProvisioners{
528552
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
529553
},
530554
})
555+
closer := &provisionerdCloser{d: daemon}
531556
t.Cleanup(func() {
532557
_ = closer.Close()
533558
})

0 commit comments

Comments
 (0)