@@ -20,12 +20,13 @@ import (
20
20
"golang.org/x/xerrors"
21
21
22
22
"cdr.dev/slog"
23
+ "github.com/coder/retry"
24
+
23
25
"github.com/coder/coder/v2/coderd/tracing"
24
26
"github.com/coder/coder/v2/codersdk"
25
27
"github.com/coder/coder/v2/provisionerd/proto"
26
28
"github.com/coder/coder/v2/provisionerd/runner"
27
29
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
28
- "github.com/coder/retry"
29
30
)
30
31
31
32
// Dialer represents the function to create a daemon client connection.
@@ -290,7 +291,7 @@ func (p *Server) acquireLoop() {
290
291
defer p .wg .Done ()
291
292
defer func () { close (p .acquireDoneCh ) }()
292
293
ctx := p .closeContext
293
- for {
294
+ for retrier := retry . New ( 10 * time . Millisecond , 1 * time . Second ); retrier . Wait ( ctx ); {
294
295
if p .acquireExit () {
295
296
return
296
297
}
@@ -299,7 +300,17 @@ func (p *Server) acquireLoop() {
299
300
p .opts .Logger .Debug (ctx , "shut down before client (re) connected" )
300
301
return
301
302
}
302
- p .acquireAndRunOne (client )
303
+ err := p .acquireAndRunOne (client )
304
+ if err != nil && ctx .Err () == nil { // Only log if context is not done.
305
+ // Short-circuit: don't wait for the retry delay to exit, if required.
306
+ if p .acquireExit () {
307
+ return
308
+ }
309
+ p .opts .Logger .Warn (ctx , "failed to acquire job, retrying" , slog .F ("delay" , fmt .Sprintf ("%vms" , retrier .Delay .Milliseconds ())), slog .Error (err ))
310
+ } else {
311
+ // Reset the retrier after each successful acquisition.
312
+ retrier .Reset ()
313
+ }
303
314
}
304
315
}
305
316
@@ -318,7 +329,7 @@ func (p *Server) acquireExit() bool {
318
329
return false
319
330
}
320
331
321
- func (p * Server ) acquireAndRunOne (client proto.DRPCProvisionerDaemonClient ) {
332
+ func (p * Server ) acquireAndRunOne (client proto.DRPCProvisionerDaemonClient ) error {
322
333
ctx := p .closeContext
323
334
p .opts .Logger .Debug (ctx , "start of acquireAndRunOne" )
324
335
job , err := p .acquireGraceful (client )
@@ -327,15 +338,15 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
327
338
if errors .Is (err , context .Canceled ) ||
328
339
errors .Is (err , yamux .ErrSessionShutdown ) ||
329
340
errors .Is (err , fasthttputil .ErrInmemoryListenerClosed ) {
330
- return
341
+ return err
331
342
}
332
343
333
344
p .opts .Logger .Warn (ctx , "provisionerd was unable to acquire job" , slog .Error (err ))
334
- return
345
+ return xerrors . Errorf ( "failed to acquire job: %w" , err )
335
346
}
336
347
if job .JobId == "" {
337
348
p .opts .Logger .Debug (ctx , "acquire job successfully canceled" )
338
- return
349
+ return nil
339
350
}
340
351
341
352
if len (job .TraceMetadata ) > 0 {
@@ -390,9 +401,9 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
390
401
Error : fmt .Sprintf ("failed to connect to provisioner: %s" , resp .Error ),
391
402
})
392
403
if err != nil {
393
- p .opts .Logger .Error (ctx , "provisioner job failed" , slog .F ("job_id" , job .JobId ), slog .Error (err ))
404
+ p .opts .Logger .Error (ctx , "failed to report provisioner job failed" , slog .F ("job_id" , job .JobId ), slog .Error (err ))
394
405
}
395
- return
406
+ return xerrors . Errorf ( "failed to report provisioner job failed: %w" , err )
396
407
}
397
408
398
409
p .mutex .Lock ()
@@ -416,6 +427,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
416
427
p .mutex .Lock ()
417
428
p .activeJob = nil
418
429
p .mutex .Unlock ()
430
+ return nil
419
431
}
420
432
421
433
// acquireGraceful attempts to acquire a job from the server, handling canceling the acquisition if we gracefully shut
0 commit comments