@@ -20,12 +20,13 @@ import (
20
20
"golang.org/x/xerrors"
21
21
22
22
"cdr.dev/slog"
23
+ "github.com/coder/retry"
24
+
23
25
"github.com/coder/coder/v2/coderd/tracing"
24
26
"github.com/coder/coder/v2/codersdk"
25
27
"github.com/coder/coder/v2/provisionerd/proto"
26
28
"github.com/coder/coder/v2/provisionerd/runner"
27
29
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
28
- "github.com/coder/retry"
29
30
)
30
31
31
32
// Dialer represents the function to create a daemon client connection.
@@ -290,7 +291,7 @@ func (p *Server) acquireLoop() {
290
291
defer p .wg .Done ()
291
292
defer func () { close (p .acquireDoneCh ) }()
292
293
ctx := p .closeContext
293
- for {
294
+ for retrier := retry . New ( 10 * time . Millisecond , 1 * time . Second ); retrier . Wait ( ctx ); {
294
295
if p .acquireExit () {
295
296
return
296
297
}
@@ -299,7 +300,10 @@ func (p *Server) acquireLoop() {
299
300
p .opts .Logger .Debug (ctx , "shut down before client (re) connected" )
300
301
return
301
302
}
302
- p .acquireAndRunOne (client )
303
+ err := p .acquireAndRunOne (client )
304
+ if err != nil && ctx .Err () == nil { // Only log if context is not done.
305
+ p .opts .Logger .Debug (ctx , "retrying to acquire job" , slog .F ("retry_in_ms" , retrier .Delay .Milliseconds ()), slog .Error (err ))
306
+ }
303
307
}
304
308
}
305
309
@@ -318,7 +322,7 @@ func (p *Server) acquireExit() bool {
318
322
return false
319
323
}
320
324
321
- func (p * Server ) acquireAndRunOne (client proto.DRPCProvisionerDaemonClient ) {
325
+ func (p * Server ) acquireAndRunOne (client proto.DRPCProvisionerDaemonClient ) error {
322
326
ctx := p .closeContext
323
327
p .opts .Logger .Debug (ctx , "start of acquireAndRunOne" )
324
328
job , err := p .acquireGraceful (client )
@@ -327,15 +331,15 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
327
331
if errors .Is (err , context .Canceled ) ||
328
332
errors .Is (err , yamux .ErrSessionShutdown ) ||
329
333
errors .Is (err , fasthttputil .ErrInmemoryListenerClosed ) {
330
- return
334
+ return err
331
335
}
332
336
333
337
p .opts .Logger .Warn (ctx , "provisionerd was unable to acquire job" , slog .Error (err ))
334
- return
338
+ return xerrors . Errorf ( "failed to acquire job: %w" , err )
335
339
}
336
340
if job .JobId == "" {
337
341
p .opts .Logger .Debug (ctx , "acquire job successfully canceled" )
338
- return
342
+ return xerrors . New ( "canceled" )
339
343
}
340
344
341
345
if len (job .TraceMetadata ) > 0 {
@@ -392,7 +396,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
392
396
if err != nil {
393
397
p .opts .Logger .Error (ctx , "provisioner job failed" , slog .F ("job_id" , job .JobId ), slog .Error (err ))
394
398
}
395
- return
399
+ return xerrors . Errorf ( "provisioner job failed: %w" , err )
396
400
}
397
401
398
402
p .mutex .Lock ()
@@ -416,6 +420,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
416
420
p .mutex .Lock ()
417
421
p .activeJob = nil
418
422
p .mutex .Unlock ()
423
+ return nil
419
424
}
420
425
421
426
// acquireGraceful attempts to acquire a job from the server, handling canceling the acquisition if we gracefully shut
0 commit comments