Skip to content

feat: add debouncing to provisionerd rpc calls #5198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ vendor
.eslintcache
yarn-error.log
gotests.coverage
gotests.xml
.idea
.gitpod.yml
.DS_Store
Expand Down
12 changes: 12 additions & 0 deletions cli/deployment/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,18 @@ func newConfig() *codersdk.DeploymentConfig {
Flag: "provisioner-daemons",
Default: 3,
},
DaemonPollInterval: &codersdk.DeploymentConfigField[time.Duration]{
Name: "Poll Interval",
Usage: "Time to wait before polling for a new job.",
Flag: "provisioner-daemon-poll-interval",
Default: time.Second,
},
DaemonPollJitter: &codersdk.DeploymentConfigField[time.Duration]{
Name: "Poll Jitter",
Usage: "Random jitter added to the poll interval.",
Flag: "provisioner-daemon-poll-jitter",
Default: 100 * time.Millisecond,
},
ForceCancelInterval: &codersdk.DeploymentConfigField[time.Duration]{
Name: "Force Cancel Interval",
Usage: "Time to force cancel provisioning tasks that are stuck.",
Expand Down
33 changes: 19 additions & 14 deletions cli/deployment/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package deployment_test

import (
"testing"
"time"

"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
Expand All @@ -25,20 +26,22 @@ func TestConfig(t *testing.T) {
}{{
Name: "Deployment",
Env: map[string]string{
"CODER_ADDRESS": "0.0.0.0:8443",
"CODER_ACCESS_URL": "https://dev.coder.com",
"CODER_PG_CONNECTION_URL": "some-url",
"CODER_PPROF_ADDRESS": "something",
"CODER_PPROF_ENABLE": "true",
"CODER_PROMETHEUS_ADDRESS": "hello-world",
"CODER_PROMETHEUS_ENABLE": "true",
"CODER_PROVISIONER_DAEMONS": "5",
"CODER_SECURE_AUTH_COOKIE": "true",
"CODER_SSH_KEYGEN_ALGORITHM": "potato",
"CODER_TELEMETRY": "false",
"CODER_TELEMETRY_TRACE": "false",
"CODER_WILDCARD_ACCESS_URL": "something-wildcard.com",
"CODER_UPDATE_CHECK": "false",
"CODER_ADDRESS": "0.0.0.0:8443",
"CODER_ACCESS_URL": "https://dev.coder.com",
"CODER_PG_CONNECTION_URL": "some-url",
"CODER_PPROF_ADDRESS": "something",
"CODER_PPROF_ENABLE": "true",
"CODER_PROMETHEUS_ADDRESS": "hello-world",
"CODER_PROMETHEUS_ENABLE": "true",
"CODER_PROVISIONER_DAEMONS": "5",
"CODER_PROVISIONER_DAEMON_POLL_INTERVAL": "5s",
"CODER_PROVISIONER_DAEMON_POLL_JITTER": "1s",
"CODER_SECURE_AUTH_COOKIE": "true",
"CODER_SSH_KEYGEN_ALGORITHM": "potato",
"CODER_TELEMETRY": "false",
"CODER_TELEMETRY_TRACE": "false",
"CODER_WILDCARD_ACCESS_URL": "something-wildcard.com",
"CODER_UPDATE_CHECK": "false",
},
Valid: func(config *codersdk.DeploymentConfig) {
require.Equal(t, config.Address.Value, "0.0.0.0:8443")
Expand All @@ -49,6 +52,8 @@ func TestConfig(t *testing.T) {
require.Equal(t, config.Prometheus.Address.Value, "hello-world")
require.Equal(t, config.Prometheus.Enable.Value, true)
require.Equal(t, config.Provisioner.Daemons.Value, 5)
require.Equal(t, config.Provisioner.DaemonPollInterval.Value, 5*time.Second)
require.Equal(t, config.Provisioner.DaemonPollJitter.Value, 1*time.Second)
require.Equal(t, config.SecureAuthCookie.Value, true)
require.Equal(t, config.SSHKeygenAlgorithm.Value, "potato")
require.Equal(t, config.Telemetry.Enable.Value, false)
Expand Down
2 changes: 2 additions & 0 deletions cli/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/coder/coder/testutil"
)

// To update the golden files:
// make update-golden-files
var updateGoldenFiles = flag.Bool("update", false, "update .golden files")

//nolint:tparallel,paralleltest // These test sets env vars.
Expand Down
7 changes: 5 additions & 2 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,13 +970,16 @@ func newProvisionerDaemon(
}()
provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(echoClient)
}
debounce := time.Second
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
// This debounces calls to listen every second. Read the comment
// in provisionerdserver.go to learn more!
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, time.Second)
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, debounce)
}, &provisionerd.Options{
Logger: logger,
PollInterval: 500 * time.Millisecond,
JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value,
JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value,
JobPollDebounce: debounce,
UpdateInterval: 500 * time.Millisecond,
ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value,
Provisioners: provisioners,
Expand Down
9 changes: 9 additions & 0 deletions cli/testdata/coder_server_--help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ Flags:
--prometheus-enable Serve prometheus metrics on the address
defined by prometheus address.
Consumes $CODER_PROMETHEUS_ENABLE
--provisioner-daemon-poll-interval duration Time to wait before polling for a new
job.
Consumes
$CODER_PROVISIONER_DAEMON_POLL_INTERVAL
(default 1s)
--provisioner-daemon-poll-jitter duration Random jitter added to the poll interval.
Consumes
$CODER_PROVISIONER_DAEMON_POLL_JITTER
(default 100ms)
--provisioner-daemons int Number of provisioner daemons to create
on start. If builds are stuck in queued
state for a long time, consider
Expand Down
4 changes: 2 additions & 2 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,8 @@ func compressHandler(h http.Handler) http.Handler {
return cmp.Handler(h)
}

// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
// in the same process.
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
// Useful when starting coderd and provisionerd in the same process.
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
clientSession, serverSession := provisionersdk.MemTransportPipe()
defer func() {
Expand Down
4 changes: 2 additions & 2 deletions coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
}, &provisionerd.Options{
Filesystem: fs,
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
PollInterval: 50 * time.Millisecond,
JobPollInterval: 50 * time.Millisecond,
UpdateInterval: 250 * time.Millisecond,
ForceCancelInterval: time.Second,
Provisioners: provisionerd.Provisioners{
Expand Down Expand Up @@ -375,7 +375,7 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui
}, &provisionerd.Options{
Filesystem: fs,
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
PollInterval: 50 * time.Millisecond,
JobPollInterval: 50 * time.Millisecond,
UpdateInterval: 250 * time.Millisecond,
ForceCancelInterval: time.Second,
Provisioners: provisionerd.Provisioners{
Expand Down
2 changes: 2 additions & 0 deletions codersdk/deploymentconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type GitAuthConfig struct {

type ProvisionerConfig struct {
Daemons *DeploymentConfigField[int] `json:"daemons" typescript:",notnull"`
DaemonPollInterval *DeploymentConfigField[time.Duration] `json:"daemon_poll_interval" typescript:",notnull"`
DaemonPollJitter *DeploymentConfigField[time.Duration] `json:"daemon_poll_jitter" typescript:",notnull"`
ForceCancelInterval *DeploymentConfigField[time.Duration] `json:"force_cancel_interval" typescript:",notnull"`
}

Expand Down
27 changes: 17 additions & 10 deletions enterprise/cli/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"os/signal"
"time"

"github.com/spf13/cobra"
"golang.org/x/xerrors"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
agpl "github.com/coder/coder/cli"
Expand All @@ -20,9 +23,6 @@ import (
provisionerdproto "github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionersdk"
"github.com/coder/coder/provisionersdk/proto"

"github.com/spf13/cobra"
"golang.org/x/xerrors"
)

func provisionerDaemons() *cobra.Command {
Expand All @@ -37,8 +37,10 @@ func provisionerDaemons() *cobra.Command {

func provisionerDaemonStart() *cobra.Command {
var (
cacheDir string
rawTags []string
cacheDir string
rawTags []string
pollInterval time.Duration
pollJitter time.Duration
)
cmd := &cobra.Command{
Use: "start",
Expand Down Expand Up @@ -111,11 +113,12 @@ func provisionerDaemonStart() *cobra.Command {
codersdk.ProvisionerTypeTerraform,
}, tags)
}, &provisionerd.Options{
Logger: logger,
PollInterval: 500 * time.Millisecond,
UpdateInterval: 500 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: tempDir,
Logger: logger,
JobPollInterval: pollInterval,
JobPollJitter: pollJitter,
UpdateInterval: 500 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: tempDir,
})

var exitErr error
Expand Down Expand Up @@ -150,6 +153,10 @@ func provisionerDaemonStart() *cobra.Command {
"Specify a directory to cache provisioner job files.")
cliflag.StringArrayVarP(cmd.Flags(), &rawTags, "tag", "t", "CODER_PROVISIONERD_TAGS", []string{},
"Specify a list of tags to target provisioner jobs.")
cliflag.DurationVarP(cmd.Flags(), &pollInterval, "poll-interval", "", "CODER_PROVISIONERD_POLL_INTERVAL", time.Second,
"Specify the interval for which the provisioner daemon should poll for jobs.")
cliflag.DurationVarP(cmd.Flags(), &pollJitter, "poll-jitter", "", "CODER_PROVISIONERD_POLL_JITTER", 100*time.Millisecond,
"Random jitter added to the poll interval.")

return cmd
}
1 change: 0 additions & 1 deletion enterprise/coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"storj.io/drpc/drpcserver"

"cdr.dev/slog"

"github.com/coder/coder/coderd"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/httpapi"
Expand Down
55 changes: 45 additions & 10 deletions provisionerd/provisionerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"cdr.dev/slog"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/cryptorand"
"github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionerd/runner"
sdkproto "github.com/coder/coder/provisionersdk/proto"
Expand Down Expand Up @@ -52,7 +53,9 @@ type Options struct {
ForceCancelInterval time.Duration
UpdateInterval time.Duration
LogBufferInterval time.Duration
PollInterval time.Duration
JobPollInterval time.Duration
JobPollJitter time.Duration
JobPollDebounce time.Duration
Provisioners Provisioners
WorkDirectory string
}
Expand All @@ -62,8 +65,11 @@ func New(clientDialer Dialer, opts *Options) *Server {
if opts == nil {
opts = &Options{}
}
if opts.PollInterval == 0 {
opts.PollInterval = 5 * time.Second
if opts.JobPollInterval == 0 {
opts.JobPollInterval = 5 * time.Second
}
if opts.JobPollJitter == 0 {
opts.JobPollJitter = time.Second
}
if opts.UpdateInterval == 0 {
opts.UpdateInterval = 5 * time.Second
Expand Down Expand Up @@ -207,8 +213,8 @@ func (p *Server) connect(ctx context.Context) {
if p.isClosed() {
return
}
ticker := time.NewTicker(p.opts.PollInterval)
defer ticker.Stop()
timer := time.NewTimer(p.opts.JobPollInterval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to only apply this delay if it failed to acquire a job? This means we will acquire them as quick as possible without delays if there are a lot of jobs queued, but during periods of low jobs we will only attempt to acquire a job every 5 seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in theory yeah, but it's currently architected in a way where this is still ticking while the provisioner is running a job, so it'd require a bit of a refactor for this to work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't you just do something like:

for {
  ok :=acquireJob()
  if !ok {
    select {
      case <-ctx.Done():
        return ctx.Err()
      case <-time.After(5 * time.Second):
  }
}

or something? time.After's only leak if they haven't fired yet, so since the interval is small and the only time this for loop will exit is if the app is shutting down it seems fine here

defer timer.Stop()
for {
client, ok := p.client()
if !ok {
Expand All @@ -219,13 +225,23 @@ func (p *Server) connect(ctx context.Context) {
return
case <-client.DRPCConn().Closed():
return
case <-ticker.C:
case <-timer.C:
p.acquireJob(ctx)
timer.Reset(p.nextInterval())
}
}
}()
}

func (p *Server) nextInterval() time.Duration {
r, err := cryptorand.Float64()
if err != nil {
panic("get random float:" + err.Error())
}

return p.opts.JobPollInterval + time.Duration(float64(p.opts.JobPollJitter)*r)
}

func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) {
rawClient := p.clientValue.Load()
if rawClient == nil {
Expand All @@ -248,6 +264,11 @@ func (p *Server) isRunningJob() bool {
}
}

var (
lastAcquire time.Time
lastAcquireMutex sync.RWMutex
)

// Locks a job in the database, and runs it!
func (p *Server) acquireJob(ctx context.Context) {
p.mutex.Lock()
Expand All @@ -263,6 +284,18 @@ func (p *Server) acquireJob(ctx context.Context) {
return
}

// This prevents loads of provisioner daemons from consistently sending
// requests when no jobs are available.
//
// The debounce only occurs when no job is returned, so if loads of jobs are
// added at once, they will start after at most this duration.
lastAcquireMutex.RLock()
if !lastAcquire.IsZero() && time.Since(lastAcquire) < p.opts.JobPollDebounce {
lastAcquireMutex.RUnlock()
return
}
lastAcquireMutex.RUnlock()
Comment on lines +292 to +297
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should disable this debounce if running in tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only enabled in coder server


var err error
client, ok := p.client()
if !ok {
Expand All @@ -271,17 +304,19 @@ func (p *Server) acquireJob(ctx context.Context) {

job, err := client.AcquireJob(ctx, &proto.Empty{})
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, yamux.ErrSessionShutdown) {
if errors.Is(err, context.Canceled) ||
errors.Is(err, yamux.ErrSessionShutdown) ||
errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) {
return
}

p.opts.Logger.Warn(ctx, "acquire job", slog.Error(err))
return
}
if job.JobId == "" {
lastAcquireMutex.Lock()
lastAcquire = time.Now()
lastAcquireMutex.Unlock()
return
}

Expand Down
10 changes: 5 additions & 5 deletions provisionerd/provisionerd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,11 +1053,11 @@ func createTar(t *testing.T, files map[string]string) []byte {
// Creates a provisionerd implementation with the provided dialer and provisioners.
func createProvisionerd(t *testing.T, dialer provisionerd.Dialer, provisioners provisionerd.Provisioners) *provisionerd.Server {
server := provisionerd.New(dialer, &provisionerd.Options{
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
PollInterval: 50 * time.Millisecond,
UpdateInterval: 50 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: t.TempDir(),
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
JobPollInterval: 50 * time.Millisecond,
UpdateInterval: 50 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: t.TempDir(),
})
t.Cleanup(func() {
_ = server.Close()
Expand Down
2 changes: 2 additions & 0 deletions site/src/api/typesGenerated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,8 @@ export interface PrometheusConfig {
// From codersdk/deploymentconfig.go
export interface ProvisionerConfig {
readonly daemons: DeploymentConfigField<number>
readonly daemon_poll_interval: DeploymentConfigField<number>
readonly daemon_poll_jitter: DeploymentConfigField<number>
readonly force_cancel_interval: DeploymentConfigField<number>
}

Expand Down