@@ -22,6 +22,7 @@ import (
22
22
23
23
"cdr.dev/slog"
24
24
"github.com/coder/coder/coderd/tracing"
25
+ "github.com/coder/coder/coderd/util/ptr"
25
26
"github.com/coder/coder/cryptorand"
26
27
"github.com/coder/coder/provisionerd/proto"
27
28
"github.com/coder/coder/provisionerd/runner"
@@ -77,7 +78,7 @@ func New(clientDialer Dialer, opts *Options) *Server {
77
78
opts .ForceCancelInterval = 10 * time .Minute
78
79
}
79
80
if opts .LogBufferInterval == 0 {
80
- opts .LogBufferInterval = 50 * time .Millisecond
81
+ opts .LogBufferInterval = 250 * time .Millisecond
81
82
}
82
83
if opts .Filesystem == nil {
83
84
opts .Filesystem = afero .NewOsFs ()
@@ -113,7 +114,7 @@ type Server struct {
113
114
tracer trace.Tracer
114
115
115
116
clientDialer Dialer
116
- clientValue atomic.Value
117
+ clientValue atomic.Pointer [proto. DRPCProvisionerDaemonClient ]
117
118
118
119
// Locked when closing the daemon, shutting down, or starting a new job.
119
120
mutex sync.Mutex
@@ -194,7 +195,7 @@ func (p *Server) connect(ctx context.Context) {
194
195
p .mutex .Unlock ()
195
196
break
196
197
}
197
- p .clientValue .Store (client )
198
+ p .clientValue .Store (ptr . Ref ( client ) )
198
199
p .mutex .Unlock ()
199
200
200
201
p .opts .Logger .Debug (context .Background (), "connected" )
@@ -260,12 +261,11 @@ func (p *Server) nextInterval() time.Duration {
260
261
}
261
262
262
263
func (p * Server ) client () (proto.DRPCProvisionerDaemonClient , bool ) {
263
- rawClient := p .clientValue .Load ()
264
- if rawClient == nil {
264
+ client := p .clientValue .Load ()
265
+ if client == nil {
265
266
return nil , false
266
267
}
267
- client , ok := rawClient .(proto.DRPCProvisionerDaemonClient )
268
- return client , ok
268
+ return * client , true
269
269
}
270
270
271
271
// isRunningJob returns true if a job is running. Caller must hold the mutex.
@@ -417,14 +417,15 @@ func retryable(err error) bool {
417
417
xerrors .Is (err , context .Canceled )
418
418
}
419
419
420
- // clientDoWithRetries runs the function f with a client, and retries with backoff until either the error returned
421
- // is not retryable() or the context expires.
422
- func (p * Server ) clientDoWithRetries (
423
- ctx context.Context , f func (context.Context , proto.DRPCProvisionerDaemonClient ) (any , error )) (
424
- any , error ,
425
- ) {
420
+ // clientDoWithRetries runs the function f with a client, and retries with
421
+ // backoff until either the error returned is not retryable() or the context
422
+ // expires.
423
+ func clientDoWithRetries [T any ](ctx context.Context ,
424
+ getClient func () (proto.DRPCProvisionerDaemonClient , bool ),
425
+ f func (context.Context , proto.DRPCProvisionerDaemonClient ) (T , error ),
426
+ ) (ret T , _ error ) {
426
427
for retrier := retry .New (25 * time .Millisecond , 5 * time .Second ); retrier .Wait (ctx ); {
427
- client , ok := p . client ()
428
+ client , ok := getClient ()
428
429
if ! ok {
429
430
continue
430
431
}
@@ -434,40 +435,38 @@ func (p *Server) clientDoWithRetries(
434
435
}
435
436
return resp , err
436
437
}
437
- return nil , ctx .Err ()
438
+ return ret , ctx .Err ()
438
439
}
439
440
440
441
func (p * Server ) CommitQuota (ctx context.Context , in * proto.CommitQuotaRequest ) (* proto.CommitQuotaResponse , error ) {
441
- out , err := p . clientDoWithRetries (ctx , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (any , error ) {
442
+ out , err := clientDoWithRetries (ctx , p . client , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (* proto. CommitQuotaResponse , error ) {
442
443
return client .CommitQuota (ctx , in )
443
444
})
444
445
if err != nil {
445
446
return nil , err
446
447
}
447
- // nolint: forcetypeassert
448
- return out .(* proto.CommitQuotaResponse ), nil
448
+ return out , nil
449
449
}
450
450
451
451
func (p * Server ) UpdateJob (ctx context.Context , in * proto.UpdateJobRequest ) (* proto.UpdateJobResponse , error ) {
452
- out , err := p . clientDoWithRetries (ctx , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (any , error ) {
452
+ out , err := clientDoWithRetries (ctx , p . client , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (* proto. UpdateJobResponse , error ) {
453
453
return client .UpdateJob (ctx , in )
454
454
})
455
455
if err != nil {
456
456
return nil , err
457
457
}
458
- // nolint: forcetypeassert
459
- return out .(* proto.UpdateJobResponse ), nil
458
+ return out , nil
460
459
}
461
460
462
461
func (p * Server ) FailJob (ctx context.Context , in * proto.FailedJob ) error {
463
- _ , err := p . clientDoWithRetries (ctx , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (any , error ) {
462
+ _ , err := clientDoWithRetries (ctx , p . client , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (* proto. Empty , error ) {
464
463
return client .FailJob (ctx , in )
465
464
})
466
465
return err
467
466
}
468
467
469
468
func (p * Server ) CompleteJob (ctx context.Context , in * proto.CompletedJob ) error {
470
- _ , err := p . clientDoWithRetries (ctx , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (any , error ) {
469
+ _ , err := clientDoWithRetries (ctx , p . client , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (* proto. Empty , error ) {
471
470
return client .CompleteJob (ctx , in )
472
471
})
473
472
return err
@@ -552,7 +551,7 @@ func (p *Server) closeWithError(err error) error {
552
551
553
552
p .opts .Logger .Debug (context .Background (), "closing server with error" , slog .Error (err ))
554
553
555
- if c , ok := p .clientValue . Load ().(proto. DRPCProvisionerDaemonClient ); ok {
554
+ if c , ok := p .client ( ); ok {
556
555
_ = c .DRPCConn ().Close ()
557
556
}
558
557
0 commit comments