Skip to content

fix(provisionerd): only heartbeat when logs aren't being flushed #7110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,7 +1367,7 @@ func newProvisionerDaemon(
JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value(),
JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value(),
JobPollDebounce: debounce,
UpdateInterval: 500 * time.Millisecond,
UpdateInterval: time.Second,
ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value(),
Provisioners: provisioners,
WorkDirectory: tempDir,
Expand Down
2 changes: 1 addition & 1 deletion coderd/workspaceapps/apptest/apptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ func Run(t *testing.T, factory DeploymentFactory) {

// Create workspace.
port := appServer(t)
workspace, agnt = createWorkspaceWithApps(t, client, user.OrganizationIDs[0], user, proxyTestSubdomainRaw, port)
workspace, _ = createWorkspaceWithApps(t, client, user.OrganizationIDs[0], user, proxyTestSubdomainRaw, port)

// Verify that the apps have the correct sharing levels set.
workspaceBuild, err := client.WorkspaceBuild(ctx, workspace.LatestBuild.ID)
Expand Down
47 changes: 23 additions & 24 deletions provisionerd/provisionerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"cdr.dev/slog"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/coderd/util/ptr"
"github.com/coder/coder/cryptorand"
"github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionerd/runner"
Expand Down Expand Up @@ -77,7 +78,7 @@ func New(clientDialer Dialer, opts *Options) *Server {
opts.ForceCancelInterval = 10 * time.Minute
}
if opts.LogBufferInterval == 0 {
opts.LogBufferInterval = 50 * time.Millisecond
opts.LogBufferInterval = 250 * time.Millisecond
}
if opts.Filesystem == nil {
opts.Filesystem = afero.NewOsFs()
Expand Down Expand Up @@ -113,7 +114,7 @@ type Server struct {
tracer trace.Tracer

clientDialer Dialer
clientValue atomic.Value
clientValue atomic.Pointer[proto.DRPCProvisionerDaemonClient]

// Locked when closing the daemon, shutting down, or starting a new job.
mutex sync.Mutex
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p *Server) connect(ctx context.Context) {
p.mutex.Unlock()
break
}
p.clientValue.Store(client)
p.clientValue.Store(ptr.Ref(client))
p.mutex.Unlock()

p.opts.Logger.Debug(context.Background(), "connected")
Expand Down Expand Up @@ -260,12 +261,11 @@ func (p *Server) nextInterval() time.Duration {
}

func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) {
rawClient := p.clientValue.Load()
if rawClient == nil {
client := p.clientValue.Load()
if client == nil {
return nil, false
}
client, ok := rawClient.(proto.DRPCProvisionerDaemonClient)
return client, ok
return *client, true
}

// isRunningJob returns true if a job is running. Caller must hold the mutex.
Expand Down Expand Up @@ -417,14 +417,15 @@ func retryable(err error) bool {
xerrors.Is(err, context.Canceled)
}

// clientDoWithRetries runs the function f with a client, and retries with backoff until either the error returned
// is not retryable() or the context expires.
func (p *Server) clientDoWithRetries(
ctx context.Context, f func(context.Context, proto.DRPCProvisionerDaemonClient) (any, error)) (
any, error,
) {
// clientDoWithRetries runs the function f with a client, and retries with
// backoff until either the error returned is not retryable() or the context
// expires.
func clientDoWithRetries[T any](ctx context.Context,
getClient func() (proto.DRPCProvisionerDaemonClient, bool),
f func(context.Context, proto.DRPCProvisionerDaemonClient) (T, error),
) (ret T, _ error) {
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(ctx); {
client, ok := p.client()
client, ok := getClient()
if !ok {
continue
}
Expand All @@ -434,40 +435,38 @@ func (p *Server) clientDoWithRetries(
}
return resp, err
}
return nil, ctx.Err()
return ret, ctx.Err()
}

func (p *Server) CommitQuota(ctx context.Context, in *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error) {
out, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) {
out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.CommitQuotaResponse, error) {
return client.CommitQuota(ctx, in)
})
if err != nil {
return nil, err
}
// nolint: forcetypeassert
return out.(*proto.CommitQuotaResponse), nil
return out, nil
}

func (p *Server) UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
out, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) {
out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.UpdateJobResponse, error) {
return client.UpdateJob(ctx, in)
})
if err != nil {
return nil, err
}
// nolint: forcetypeassert
return out.(*proto.UpdateJobResponse), nil
return out, nil
}

func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error {
_, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) {
_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
return client.FailJob(ctx, in)
})
return err
}

func (p *Server) CompleteJob(ctx context.Context, in *proto.CompletedJob) error {
_, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) {
_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
return client.CompleteJob(ctx, in)
})
return err
Expand Down Expand Up @@ -552,7 +551,7 @@ func (p *Server) closeWithError(err error) error {

p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err))

if c, ok := p.clientValue.Load().(proto.DRPCProvisionerDaemonClient); ok {
if c, ok := p.client(); ok {
_ = c.DRPCConn().Close()
}

Expand Down
58 changes: 51 additions & 7 deletions provisionerd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.14.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"

"cdr.dev/slog"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/coderd/util/ptr"
"github.com/coder/coder/provisionerd/proto"
sdkproto "github.com/coder/coder/provisionersdk/proto"
)
Expand Down Expand Up @@ -54,6 +57,7 @@ type Runner struct {
filesystem afero.Fs
workDirectory string
provisioner sdkproto.DRPCProvisionerClient
lastUpdate atomic.Pointer[time.Time]
updateInterval time.Duration
forceCancelInterval time.Duration
logBufferInterval time.Duration
Expand Down Expand Up @@ -203,7 +207,7 @@ func (r *Runner) Run() {
defer r.stop()

go r.doCleanFinish(ctx)
go r.heartbeat(ctx)
go r.heartbeatRoutine(ctx)
for r.failedJob == nil && r.completedJob == nil {
r.cond.Wait()
}
Expand Down Expand Up @@ -307,15 +311,51 @@ func (r *Runner) ForceStop() {
r.cond.Signal()
}

func (r *Runner) sendHeartbeat(ctx context.Context) (*proto.UpdateJobResponse, error) {
ctx, span := r.startTrace(ctx, "updateHeartbeat")
defer span.End()

r.mutex.Lock()
if !r.okToSend {
r.mutex.Unlock()
return nil, errUpdateSkipped
}
r.mutex.Unlock()

// Skip sending a heartbeat if we've sent an update recently.
if lastUpdate := r.lastUpdate.Load(); lastUpdate != nil {
if time.Since(*lastUpdate) < r.updateInterval {
span.SetAttributes(attribute.Bool("heartbeat_skipped", true))
return &proto.UpdateJobResponse{}, nil
}
}

return r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
})
}

func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
defer func() {
r.lastUpdate.Store(ptr.Ref(time.Now()))
}()

span.SetAttributes(
attribute.Int64("logs_len", int64(len(u.Logs))),
attribute.Int64("parameter_schemas_len", int64(len(u.ParameterSchemas))),
attribute.Int64("template_variables_len", int64(len(u.TemplateVariables))),
attribute.Int64("user_variable_values_len", int64(len(u.UserVariableValues))),
attribute.Int64("readme_len", int64(len(u.Readme))),
)

r.mutex.Lock()
defer r.mutex.Unlock()
if !r.okToSend {
return nil, errUpdateSkipped
}

return r.sender.UpdateJob(ctx, u)
}

Expand Down Expand Up @@ -480,9 +520,13 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob)
}
}

// heartbeat periodically sends updates on the job, which keeps coder server from assuming the job
// is stalled, and allows the runner to learn if the job has been canceled by the user.
func (r *Runner) heartbeat(ctx context.Context) {
// heartbeatRoutine periodically sends updates on the job, which keeps coder server
// from assuming the job is stalled, and allows the runner to learn if the job
// has been canceled by the user.
func (r *Runner) heartbeatRoutine(ctx context.Context) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()

ticker := time.NewTicker(r.updateInterval)
defer ticker.Stop()

Expand All @@ -493,9 +537,7 @@ func (r *Runner) heartbeat(ctx context.Context) {
case <-ticker.C:
}

resp, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
})
resp, err := r.sendHeartbeat(ctx)
if err != nil {
err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err))
if err != nil {
Expand All @@ -504,6 +546,7 @@ func (r *Runner) heartbeat(ctx context.Context) {
return
}
if !resp.Canceled {
ticker.Reset(r.updateInterval)
continue
}
r.logger.Info(ctx, "attempting graceful cancelation")
Expand Down Expand Up @@ -1079,6 +1122,7 @@ func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob
func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes(
semconv.ServiceNameKey.String("coderd.provisionerd"),
attribute.String("job_id", r.job.JobId),
))...)
}

Expand Down