Skip to content

Commit 5938fdb

Browse files
committed
fixup! fix(provisionerd): only heartbeat when logs aren't being flushed
1 parent bf979ff commit 5938fdb

File tree

3 files changed

+84
-86
lines changed

3 files changed

+84
-86
lines changed

provisionerd/proto/provisionerd.pb.go

Lines changed: 54 additions & 66 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

provisionerd/proto/provisionerd.proto

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,6 @@ message UpdateJobRequest {
109109
repeated provisioner.TemplateVariable template_variables = 4;
110110
repeated provisioner.VariableValue user_variable_values = 5;
111111
bytes readme = 6;
112-
// Heartbeat is true when the request is a heartbeat indicating job
113-
// liveness.
114-
bool heartbeat = 7;
115112
}
116113

117114
message UpdateJobResponse {

provisionerd/runner/runner.go

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func (r *Runner) Run() {
207207
defer r.stop()
208208

209209
go r.doCleanFinish(ctx)
210-
go r.heartbeat(ctx)
210+
go r.heartbeatRoutine(ctx)
211211
for r.failedJob == nil && r.completedJob == nil {
212212
r.cond.Wait()
213213
}
@@ -311,6 +311,29 @@ func (r *Runner) ForceStop() {
311311
r.cond.Signal()
312312
}
313313

314+
func (r *Runner) sendHeartbeat(ctx context.Context) (*proto.UpdateJobResponse, error) {
315+
ctx, span := r.startTrace(ctx, "updateHeartbeat")
316+
defer span.End()
317+
318+
r.mutex.Lock()
319+
defer r.mutex.Unlock()
320+
if !r.okToSend {
321+
return nil, errUpdateSkipped
322+
}
323+
324+
// Skip sending a heartbeat if we've sent an update recently.
325+
if lastUpdate := r.lastUpdate.Load(); lastUpdate != nil {
326+
if time.Since(*lastUpdate) < r.updateInterval {
327+
span.SetAttributes(attribute.Bool("heartbeat_skipped", true))
328+
return &proto.UpdateJobResponse{}, nil
329+
}
330+
}
331+
332+
return r.update(ctx, &proto.UpdateJobRequest{
333+
JobId: r.job.JobId,
334+
})
335+
}
336+
314337
func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
315338
ctx, span := r.startTrace(ctx, tracing.FuncName())
316339
defer span.End()
@@ -324,7 +347,6 @@ func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.
324347
attribute.Int64("template_variables_len", int64(len(u.TemplateVariables))),
325348
attribute.Int64("user_variable_values_len", int64(len(u.UserVariableValues))),
326349
attribute.Int64("readme_len", int64(len(u.Readme))),
327-
attribute.Bool("heartbeat", u.Heartbeat),
328350
)
329351

330352
r.mutex.Lock()
@@ -333,14 +355,6 @@ func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.
333355
return nil, errUpdateSkipped
334356
}
335357

336-
// Skip sending a heartbeat if we've sent an update recently.
337-
if lastUpdate := r.lastUpdate.Load(); u.Heartbeat && lastUpdate != nil {
338-
if time.Since(*lastUpdate) < r.updateInterval {
339-
span.SetAttributes(attribute.Bool("heartbeat_skipped", true))
340-
return &proto.UpdateJobResponse{}, nil
341-
}
342-
}
343-
344358
return r.sender.UpdateJob(ctx, u)
345359
}
346360

@@ -505,9 +519,10 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob)
505519
}
506520
}
507521

508-
// heartbeat periodically sends updates on the job, which keeps coder server from assuming the job
509-
// is stalled, and allows the runner to learn if the job has been canceled by the user.
510-
func (r *Runner) heartbeat(ctx context.Context) {
522+
// heartbeatRoutine periodically sends updates on the job, which keeps coder server
523+
// from assuming the job is stalled, and allows the runner to learn if the job
524+
// has been canceled by the user.
525+
func (r *Runner) heartbeatRoutine(ctx context.Context) {
511526
ctx, span := r.startTrace(ctx, tracing.FuncName())
512527
defer span.End()
513528

@@ -521,10 +536,7 @@ func (r *Runner) heartbeat(ctx context.Context) {
521536
case <-ticker.C:
522537
}
523538

524-
resp, err := r.update(ctx, &proto.UpdateJobRequest{
525-
JobId: r.job.JobId,
526-
Heartbeat: true,
527-
})
539+
resp, err := r.sendHeartbeat(ctx)
528540
if err != nil {
529541
err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err))
530542
if err != nil {
@@ -1109,6 +1121,7 @@ func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob
11091121
func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
11101122
return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes(
11111123
semconv.ServiceNameKey.String("coderd.provisionerd"),
1124+
attribute.String("job_id", r.job.JobId),
11121125
))...)
11131126
}
11141127

0 commit comments

Comments
 (0)