diff --git a/cli/server.go b/cli/server.go index 3726a17a1399a..adfd87a334ab2 100644 --- a/cli/server.go +++ b/cli/server.go @@ -1367,7 +1367,7 @@ func newProvisionerDaemon( JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value(), JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value(), JobPollDebounce: debounce, - UpdateInterval: 500 * time.Millisecond, + UpdateInterval: time.Second, ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value(), Provisioners: provisioners, WorkDirectory: tempDir, diff --git a/coderd/workspaceapps/apptest/apptest.go b/coderd/workspaceapps/apptest/apptest.go index 6ef8f31458e30..b809501756ff7 100644 --- a/coderd/workspaceapps/apptest/apptest.go +++ b/coderd/workspaceapps/apptest/apptest.go @@ -814,7 +814,7 @@ func Run(t *testing.T, factory DeploymentFactory) { // Create workspace. port := appServer(t) - workspace, agnt = createWorkspaceWithApps(t, client, user.OrganizationIDs[0], user, proxyTestSubdomainRaw, port) + workspace, _ = createWorkspaceWithApps(t, client, user.OrganizationIDs[0], user, proxyTestSubdomainRaw, port) // Verify that the apps have the correct sharing levels set. workspaceBuild, err := client.WorkspaceBuild(ctx, workspace.LatestBuild.ID) diff --git a/provisionerd/provisionerd.go b/provisionerd/provisionerd.go index 86a8fddfeee78..411d98805823a 100644 --- a/provisionerd/provisionerd.go +++ b/provisionerd/provisionerd.go @@ -22,6 +22,7 @@ import ( "cdr.dev/slog" "github.com/coder/coder/coderd/tracing" + "github.com/coder/coder/coderd/util/ptr" "github.com/coder/coder/cryptorand" "github.com/coder/coder/provisionerd/proto" "github.com/coder/coder/provisionerd/runner" @@ -77,7 +78,7 @@ func New(clientDialer Dialer, opts *Options) *Server { opts.ForceCancelInterval = 10 * time.Minute } if opts.LogBufferInterval == 0 { - opts.LogBufferInterval = 50 * time.Millisecond + opts.LogBufferInterval = 250 * time.Millisecond } if opts.Filesystem == nil { opts.Filesystem = afero.NewOsFs() @@ -113,7 +114,7 @@ type Server struct { tracer trace.Tracer clientDialer Dialer - clientValue atomic.Value + clientValue atomic.Pointer[proto.DRPCProvisionerDaemonClient] // Locked when closing the daemon, shutting down, or starting a new job. mutex sync.Mutex @@ -194,7 +195,7 @@ func (p *Server) connect(ctx context.Context) { p.mutex.Unlock() break } - p.clientValue.Store(client) + p.clientValue.Store(ptr.Ref(client)) p.mutex.Unlock() p.opts.Logger.Debug(context.Background(), "connected") @@ -260,12 +261,11 @@ func (p *Server) nextInterval() time.Duration { } func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) { - rawClient := p.clientValue.Load() - if rawClient == nil { + client := p.clientValue.Load() + if client == nil { return nil, false } - client, ok := rawClient.(proto.DRPCProvisionerDaemonClient) - return client, ok + return *client, true } // isRunningJob returns true if a job is running. Caller must hold the mutex. @@ -417,14 +417,15 @@ func retryable(err error) bool { xerrors.Is(err, context.Canceled) } -// clientDoWithRetries runs the function f with a client, and retries with backoff until either the error returned -// is not retryable() or the context expires. -func (p *Server) clientDoWithRetries( - ctx context.Context, f func(context.Context, proto.DRPCProvisionerDaemonClient) (any, error)) ( - any, error, -) { +// clientDoWithRetries runs the function f with a client, and retries with +// backoff until either the error returned is not retryable() or the context +// expires. +func clientDoWithRetries[T any](ctx context.Context, + getClient func() (proto.DRPCProvisionerDaemonClient, bool), + f func(context.Context, proto.DRPCProvisionerDaemonClient) (T, error), +) (ret T, _ error) { for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(ctx); { - client, ok := p.client() + client, ok := getClient() if !ok { continue } @@ -434,40 +435,38 @@ func (p *Server) clientDoWithRetries( } return resp, err } - return nil, ctx.Err() + return ret, ctx.Err() } func (p *Server) CommitQuota(ctx context.Context, in *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error) { - out, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) { + out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.CommitQuotaResponse, error) { return client.CommitQuota(ctx, in) }) if err != nil { return nil, err } - // nolint: forcetypeassert - return out.(*proto.CommitQuotaResponse), nil + return out, nil } func (p *Server) UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { - out, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) { + out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.UpdateJobResponse, error) { return client.UpdateJob(ctx, in) }) if err != nil { return nil, err } - // nolint: forcetypeassert - return out.(*proto.UpdateJobResponse), nil + return out, nil } func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error { - _, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) { + _, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) { return client.FailJob(ctx, in) }) return err } func (p *Server) CompleteJob(ctx context.Context, in *proto.CompletedJob) error { - _, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) { + _, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) { return client.CompleteJob(ctx, in) }) return err @@ -552,7 +551,7 @@ func (p *Server) closeWithError(err error) error { p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err)) - if c, ok := p.clientValue.Load().(proto.DRPCProvisionerDaemonClient); ok { + if c, ok := p.client(); ok { _ = c.DRPCConn().Close() } diff --git a/provisionerd/runner/runner.go b/provisionerd/runner/runner.go index 6dfe2f9efb27d..95f3fee0de2d1 100644 --- a/provisionerd/runner/runner.go +++ b/provisionerd/runner/runner.go @@ -13,11 +13,13 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "time" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.14.0" "go.opentelemetry.io/otel/trace" @@ -25,6 +27,7 @@ import ( "cdr.dev/slog" "github.com/coder/coder/coderd/tracing" + "github.com/coder/coder/coderd/util/ptr" "github.com/coder/coder/provisionerd/proto" sdkproto "github.com/coder/coder/provisionersdk/proto" ) @@ -54,6 +57,7 @@ type Runner struct { filesystem afero.Fs workDirectory string provisioner sdkproto.DRPCProvisionerClient + lastUpdate atomic.Pointer[time.Time] updateInterval time.Duration forceCancelInterval time.Duration logBufferInterval time.Duration @@ -203,7 +207,7 @@ func (r *Runner) Run() { defer r.stop() go r.doCleanFinish(ctx) - go r.heartbeat(ctx) + go r.heartbeatRoutine(ctx) for r.failedJob == nil && r.completedJob == nil { r.cond.Wait() } @@ -307,15 +311,51 @@ func (r *Runner) ForceStop() { r.cond.Signal() } +func (r *Runner) sendHeartbeat(ctx context.Context) (*proto.UpdateJobResponse, error) { + ctx, span := r.startTrace(ctx, "updateHeartbeat") + defer span.End() + + r.mutex.Lock() + if !r.okToSend { + r.mutex.Unlock() + return nil, errUpdateSkipped + } + r.mutex.Unlock() + + // Skip sending a heartbeat if we've sent an update recently. + if lastUpdate := r.lastUpdate.Load(); lastUpdate != nil { + if time.Since(*lastUpdate) < r.updateInterval { + span.SetAttributes(attribute.Bool("heartbeat_skipped", true)) + return &proto.UpdateJobResponse{}, nil + } + } + + return r.update(ctx, &proto.UpdateJobRequest{ + JobId: r.job.JobId, + }) +} + func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() + defer func() { + r.lastUpdate.Store(ptr.Ref(time.Now())) + }() + + span.SetAttributes( + attribute.Int64("logs_len", int64(len(u.Logs))), + attribute.Int64("parameter_schemas_len", int64(len(u.ParameterSchemas))), + attribute.Int64("template_variables_len", int64(len(u.TemplateVariables))), + attribute.Int64("user_variable_values_len", int64(len(u.UserVariableValues))), + attribute.Int64("readme_len", int64(len(u.Readme))), + ) r.mutex.Lock() defer r.mutex.Unlock() if !r.okToSend { return nil, errUpdateSkipped } + return r.sender.UpdateJob(ctx, u) } @@ -480,9 +520,13 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) } } -// heartbeat periodically sends updates on the job, which keeps coder server from assuming the job -// is stalled, and allows the runner to learn if the job has been canceled by the user. -func (r *Runner) heartbeat(ctx context.Context) { +// heartbeatRoutine periodically sends updates on the job, which keeps coder server +// from assuming the job is stalled, and allows the runner to learn if the job +// has been canceled by the user. +func (r *Runner) heartbeatRoutine(ctx context.Context) { + ctx, span := r.startTrace(ctx, tracing.FuncName()) + defer span.End() + ticker := time.NewTicker(r.updateInterval) defer ticker.Stop() @@ -493,9 +537,7 @@ func (r *Runner) heartbeat(ctx context.Context) { case <-ticker.C: } - resp, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - }) + resp, err := r.sendHeartbeat(ctx) if err != nil { err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err)) if err != nil { @@ -504,6 +546,7 @@ func (r *Runner) heartbeat(ctx context.Context) { return } if !resp.Canceled { + ticker.Reset(r.updateInterval) continue } r.logger.Info(ctx, "attempting graceful cancelation") @@ -1079,6 +1122,7 @@ func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes( semconv.ServiceNameKey.String("coderd.provisionerd"), + attribute.String("job_id", r.job.JobId), ))...) }