@@ -207,7 +207,7 @@ func (r *Runner) Run() {
207
207
defer r .stop ()
208
208
209
209
go r .doCleanFinish (ctx )
210
- go r .heartbeat (ctx )
210
+ go r .heartbeatRoutine (ctx )
211
211
for r .failedJob == nil && r .completedJob == nil {
212
212
r .cond .Wait ()
213
213
}
@@ -311,6 +311,29 @@ func (r *Runner) ForceStop() {
311
311
r .cond .Signal ()
312
312
}
313
313
314
+ func (r * Runner ) sendHeartbeat (ctx context.Context ) (* proto.UpdateJobResponse , error ) {
315
+ ctx , span := r .startTrace (ctx , "updateHeartbeat" )
316
+ defer span .End ()
317
+
318
+ r .mutex .Lock ()
319
+ defer r .mutex .Unlock ()
320
+ if ! r .okToSend {
321
+ return nil , errUpdateSkipped
322
+ }
323
+
324
+ // Skip sending a heartbeat if we've sent an update recently.
325
+ if lastUpdate := r .lastUpdate .Load (); lastUpdate != nil {
326
+ if time .Since (* lastUpdate ) < r .updateInterval {
327
+ span .SetAttributes (attribute .Bool ("heartbeat_skipped" , true ))
328
+ return & proto.UpdateJobResponse {}, nil
329
+ }
330
+ }
331
+
332
+ return r .update (ctx , & proto.UpdateJobRequest {
333
+ JobId : r .job .JobId ,
334
+ })
335
+ }
336
+
314
337
func (r * Runner ) update (ctx context.Context , u * proto.UpdateJobRequest ) (* proto.UpdateJobResponse , error ) {
315
338
ctx , span := r .startTrace (ctx , tracing .FuncName ())
316
339
defer span .End ()
@@ -324,7 +347,6 @@ func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.
324
347
attribute .Int64 ("template_variables_len" , int64 (len (u .TemplateVariables ))),
325
348
attribute .Int64 ("user_variable_values_len" , int64 (len (u .UserVariableValues ))),
326
349
attribute .Int64 ("readme_len" , int64 (len (u .Readme ))),
327
- attribute .Bool ("heartbeat" , u .Heartbeat ),
328
350
)
329
351
330
352
r .mutex .Lock ()
@@ -333,14 +355,6 @@ func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.
333
355
return nil , errUpdateSkipped
334
356
}
335
357
336
- // Skip sending a heartbeat if we've sent an update recently.
337
- if lastUpdate := r .lastUpdate .Load (); u .Heartbeat && lastUpdate != nil {
338
- if time .Since (* lastUpdate ) < r .updateInterval {
339
- span .SetAttributes (attribute .Bool ("heartbeat_skipped" , true ))
340
- return & proto.UpdateJobResponse {}, nil
341
- }
342
- }
343
-
344
358
return r .sender .UpdateJob (ctx , u )
345
359
}
346
360
@@ -505,9 +519,10 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob)
505
519
}
506
520
}
507
521
508
- // heartbeat periodically sends updates on the job, which keeps coder server from assuming the job
509
- // is stalled, and allows the runner to learn if the job has been canceled by the user.
510
- func (r * Runner ) heartbeat (ctx context.Context ) {
522
+ // heartbeatRoutine periodically sends updates on the job, which keeps coder server
523
+ // from assuming the job is stalled, and allows the runner to learn if the job
524
+ // has been canceled by the user.
525
+ func (r * Runner ) heartbeatRoutine (ctx context.Context ) {
511
526
ctx , span := r .startTrace (ctx , tracing .FuncName ())
512
527
defer span .End ()
513
528
@@ -521,10 +536,7 @@ func (r *Runner) heartbeat(ctx context.Context) {
521
536
case <- ticker .C :
522
537
}
523
538
524
- resp , err := r .update (ctx , & proto.UpdateJobRequest {
525
- JobId : r .job .JobId ,
526
- Heartbeat : true ,
527
- })
539
+ resp , err := r .sendHeartbeat (ctx )
528
540
if err != nil {
529
541
err = r .Fail (ctx , r .failedJobf ("send periodic update: %s" , err ))
530
542
if err != nil {
@@ -1109,6 +1121,7 @@ func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob
1109
1121
func (r * Runner ) startTrace (ctx context.Context , name string , opts ... trace.SpanStartOption ) (context.Context , trace.Span ) {
1110
1122
return r .tracer .Start (ctx , name , append (opts , trace .WithAttributes (
1111
1123
semconv .ServiceNameKey .String ("coderd.provisionerd" ),
1124
+ attribute .String ("job_id" , r .job .JobId ),
1112
1125
))... )
1113
1126
}
1114
1127
0 commit comments