Skip to content

Commit 085330a

Browse files
authored
fix(provisionerd): only heartbeat when logs aren't being flushed (#7110)
1 parent f5a8a27 commit 085330a

File tree

4 files changed

+76
-33
lines changed

4 files changed

+76
-33
lines changed

cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,7 @@ func newProvisionerDaemon(
12141214
JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value(),
12151215
JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value(),
12161216
JobPollDebounce: debounce,
1217-
UpdateInterval: 500 * time.Millisecond,
1217+
UpdateInterval: time.Second,
12181218
ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value(),
12191219
Provisioners: provisioners,
12201220
WorkDirectory: tempDir,

coderd/workspaceapps/apptest/apptest.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,7 +814,7 @@ func Run(t *testing.T, factory DeploymentFactory) {
814814

815815
// Create workspace.
816816
port := appServer(t)
817-
workspace, agnt = createWorkspaceWithApps(t, client, user.OrganizationIDs[0], user, proxyTestSubdomainRaw, port)
817+
workspace, _ = createWorkspaceWithApps(t, client, user.OrganizationIDs[0], user, proxyTestSubdomainRaw, port)
818818

819819
// Verify that the apps have the correct sharing levels set.
820820
workspaceBuild, err := client.WorkspaceBuild(ctx, workspace.LatestBuild.ID)

provisionerd/provisionerd.go

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"cdr.dev/slog"
2424
"github.com/coder/coder/coderd/tracing"
25+
"github.com/coder/coder/coderd/util/ptr"
2526
"github.com/coder/coder/cryptorand"
2627
"github.com/coder/coder/provisionerd/proto"
2728
"github.com/coder/coder/provisionerd/runner"
@@ -77,7 +78,7 @@ func New(clientDialer Dialer, opts *Options) *Server {
7778
opts.ForceCancelInterval = 10 * time.Minute
7879
}
7980
if opts.LogBufferInterval == 0 {
80-
opts.LogBufferInterval = 50 * time.Millisecond
81+
opts.LogBufferInterval = 250 * time.Millisecond
8182
}
8283
if opts.Filesystem == nil {
8384
opts.Filesystem = afero.NewOsFs()
@@ -113,7 +114,7 @@ type Server struct {
113114
tracer trace.Tracer
114115

115116
clientDialer Dialer
116-
clientValue atomic.Value
117+
clientValue atomic.Pointer[proto.DRPCProvisionerDaemonClient]
117118

118119
// Locked when closing the daemon, shutting down, or starting a new job.
119120
mutex sync.Mutex
@@ -194,7 +195,7 @@ func (p *Server) connect(ctx context.Context) {
194195
p.mutex.Unlock()
195196
break
196197
}
197-
p.clientValue.Store(client)
198+
p.clientValue.Store(ptr.Ref(client))
198199
p.mutex.Unlock()
199200

200201
p.opts.Logger.Debug(context.Background(), "connected")
@@ -260,12 +261,11 @@ func (p *Server) nextInterval() time.Duration {
260261
}
261262

262263
func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) {
263-
rawClient := p.clientValue.Load()
264-
if rawClient == nil {
264+
client := p.clientValue.Load()
265+
if client == nil {
265266
return nil, false
266267
}
267-
client, ok := rawClient.(proto.DRPCProvisionerDaemonClient)
268-
return client, ok
268+
return *client, true
269269
}
270270

271271
// isRunningJob returns true if a job is running. Caller must hold the mutex.
@@ -417,14 +417,15 @@ func retryable(err error) bool {
417417
xerrors.Is(err, context.Canceled)
418418
}
419419

420-
// clientDoWithRetries runs the function f with a client, and retries with backoff until either the error returned
421-
// is not retryable() or the context expires.
422-
func (p *Server) clientDoWithRetries(
423-
ctx context.Context, f func(context.Context, proto.DRPCProvisionerDaemonClient) (any, error)) (
424-
any, error,
425-
) {
420+
// clientDoWithRetries runs the function f with a client, and retries with
421+
// backoff until either the error returned is not retryable() or the context
422+
// expires.
423+
func clientDoWithRetries[T any](ctx context.Context,
424+
getClient func() (proto.DRPCProvisionerDaemonClient, bool),
425+
f func(context.Context, proto.DRPCProvisionerDaemonClient) (T, error),
426+
) (ret T, _ error) {
426427
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(ctx); {
427-
client, ok := p.client()
428+
client, ok := getClient()
428429
if !ok {
429430
continue
430431
}
@@ -434,40 +435,38 @@ func (p *Server) clientDoWithRetries(
434435
}
435436
return resp, err
436437
}
437-
return nil, ctx.Err()
438+
return ret, ctx.Err()
438439
}
439440

440441
func (p *Server) CommitQuota(ctx context.Context, in *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error) {
441-
out, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) {
442+
out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.CommitQuotaResponse, error) {
442443
return client.CommitQuota(ctx, in)
443444
})
444445
if err != nil {
445446
return nil, err
446447
}
447-
// nolint: forcetypeassert
448-
return out.(*proto.CommitQuotaResponse), nil
448+
return out, nil
449449
}
450450

451451
func (p *Server) UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
452-
out, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) {
452+
out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.UpdateJobResponse, error) {
453453
return client.UpdateJob(ctx, in)
454454
})
455455
if err != nil {
456456
return nil, err
457457
}
458-
// nolint: forcetypeassert
459-
return out.(*proto.UpdateJobResponse), nil
458+
return out, nil
460459
}
461460

462461
func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error {
463-
_, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) {
462+
_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
464463
return client.FailJob(ctx, in)
465464
})
466465
return err
467466
}
468467

469468
func (p *Server) CompleteJob(ctx context.Context, in *proto.CompletedJob) error {
470-
_, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) {
469+
_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
471470
return client.CompleteJob(ctx, in)
472471
})
473472
return err
@@ -552,7 +551,7 @@ func (p *Server) closeWithError(err error) error {
552551

553552
p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err))
554553

555-
if c, ok := p.clientValue.Load().(proto.DRPCProvisionerDaemonClient); ok {
554+
if c, ok := p.client(); ok {
556555
_ = c.DRPCConn().Close()
557556
}
558557

provisionerd/runner/runner.go

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,21 @@ import (
1313
"reflect"
1414
"strings"
1515
"sync"
16+
"sync/atomic"
1617
"time"
1718

1819
"github.com/google/uuid"
1920
"github.com/prometheus/client_golang/prometheus"
2021
"github.com/spf13/afero"
22+
"go.opentelemetry.io/otel/attribute"
2123
"go.opentelemetry.io/otel/codes"
2224
semconv "go.opentelemetry.io/otel/semconv/v1.14.0"
2325
"go.opentelemetry.io/otel/trace"
2426
"golang.org/x/xerrors"
2527

2628
"cdr.dev/slog"
2729
"github.com/coder/coder/coderd/tracing"
30+
"github.com/coder/coder/coderd/util/ptr"
2831
"github.com/coder/coder/provisionerd/proto"
2932
sdkproto "github.com/coder/coder/provisionersdk/proto"
3033
)
@@ -54,6 +57,7 @@ type Runner struct {
5457
filesystem afero.Fs
5558
workDirectory string
5659
provisioner sdkproto.DRPCProvisionerClient
60+
lastUpdate atomic.Pointer[time.Time]
5761
updateInterval time.Duration
5862
forceCancelInterval time.Duration
5963
logBufferInterval time.Duration
@@ -203,7 +207,7 @@ func (r *Runner) Run() {
203207
defer r.stop()
204208

205209
go r.doCleanFinish(ctx)
206-
go r.heartbeat(ctx)
210+
go r.heartbeatRoutine(ctx)
207211
for r.failedJob == nil && r.completedJob == nil {
208212
r.cond.Wait()
209213
}
@@ -307,15 +311,51 @@ func (r *Runner) ForceStop() {
307311
r.cond.Signal()
308312
}
309313

314+
func (r *Runner) sendHeartbeat(ctx context.Context) (*proto.UpdateJobResponse, error) {
315+
ctx, span := r.startTrace(ctx, "updateHeartbeat")
316+
defer span.End()
317+
318+
r.mutex.Lock()
319+
if !r.okToSend {
320+
r.mutex.Unlock()
321+
return nil, errUpdateSkipped
322+
}
323+
r.mutex.Unlock()
324+
325+
// Skip sending a heartbeat if we've sent an update recently.
326+
if lastUpdate := r.lastUpdate.Load(); lastUpdate != nil {
327+
if time.Since(*lastUpdate) < r.updateInterval {
328+
span.SetAttributes(attribute.Bool("heartbeat_skipped", true))
329+
return &proto.UpdateJobResponse{}, nil
330+
}
331+
}
332+
333+
return r.update(ctx, &proto.UpdateJobRequest{
334+
JobId: r.job.JobId,
335+
})
336+
}
337+
310338
func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
311339
ctx, span := r.startTrace(ctx, tracing.FuncName())
312340
defer span.End()
341+
defer func() {
342+
r.lastUpdate.Store(ptr.Ref(time.Now()))
343+
}()
344+
345+
span.SetAttributes(
346+
attribute.Int64("logs_len", int64(len(u.Logs))),
347+
attribute.Int64("parameter_schemas_len", int64(len(u.ParameterSchemas))),
348+
attribute.Int64("template_variables_len", int64(len(u.TemplateVariables))),
349+
attribute.Int64("user_variable_values_len", int64(len(u.UserVariableValues))),
350+
attribute.Int64("readme_len", int64(len(u.Readme))),
351+
)
313352

314353
r.mutex.Lock()
315354
defer r.mutex.Unlock()
316355
if !r.okToSend {
317356
return nil, errUpdateSkipped
318357
}
358+
319359
return r.sender.UpdateJob(ctx, u)
320360
}
321361

@@ -480,9 +520,13 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob)
480520
}
481521
}
482522

483-
// heartbeat periodically sends updates on the job, which keeps coder server from assuming the job
484-
// is stalled, and allows the runner to learn if the job has been canceled by the user.
485-
func (r *Runner) heartbeat(ctx context.Context) {
523+
// heartbeatRoutine periodically sends updates on the job, which keeps coder server
524+
// from assuming the job is stalled, and allows the runner to learn if the job
525+
// has been canceled by the user.
526+
func (r *Runner) heartbeatRoutine(ctx context.Context) {
527+
ctx, span := r.startTrace(ctx, tracing.FuncName())
528+
defer span.End()
529+
486530
ticker := time.NewTicker(r.updateInterval)
487531
defer ticker.Stop()
488532

@@ -493,9 +537,7 @@ func (r *Runner) heartbeat(ctx context.Context) {
493537
case <-ticker.C:
494538
}
495539

496-
resp, err := r.update(ctx, &proto.UpdateJobRequest{
497-
JobId: r.job.JobId,
498-
})
540+
resp, err := r.sendHeartbeat(ctx)
499541
if err != nil {
500542
err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err))
501543
if err != nil {
@@ -504,6 +546,7 @@ func (r *Runner) heartbeat(ctx context.Context) {
504546
return
505547
}
506548
if !resp.Canceled {
549+
ticker.Reset(r.updateInterval)
507550
continue
508551
}
509552
r.logger.Info(ctx, "attempting graceful cancelation")
@@ -1079,6 +1122,7 @@ func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob
10791122
func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
10801123
return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes(
10811124
semconv.ServiceNameKey.String("coderd.provisionerd"),
1125+
attribute.String("job_id", r.job.JobId),
10821126
))...)
10831127
}
10841128

0 commit comments

Comments
 (0)