@@ -68,8 +68,9 @@ func New(clientDialer Dialer, opts *Options) *Server {
68
68
clientDialer : clientDialer ,
69
69
opts : opts ,
70
70
71
- closeCancel : ctxCancel ,
72
- closed : make (chan struct {}),
71
+ closeContext : ctx ,
72
+ closeCancel : ctxCancel ,
73
+ closed : make (chan struct {}),
73
74
74
75
shutdown : make (chan struct {}),
75
76
@@ -90,10 +91,11 @@ type Server struct {
90
91
client proto.DRPCProvisionerDaemonClient
91
92
92
93
// Locked when closing the daemon.
93
- closeMutex sync.Mutex
94
- closeCancel context.CancelFunc
95
- closed chan struct {}
96
- closeError error
94
+ closeMutex sync.Mutex
95
+ closeContext context.Context
96
+ closeCancel context.CancelFunc
97
+ closed chan struct {}
98
+ closeError error
97
99
98
100
shutdownMutex sync.Mutex
99
101
shutdown chan struct {}
@@ -244,6 +246,9 @@ func (p *Server) runJob(ctx context.Context, job *proto.AcquiredJob) {
244
246
resp , err := p .client .UpdateJob (ctx , & proto.UpdateJobRequest {
245
247
JobId : job .JobId ,
246
248
})
249
+ if errors .Is (err , yamux .ErrSessionShutdown ) {
250
+ continue
251
+ }
247
252
if err != nil {
248
253
p .failActiveJobf ("send periodic update: %s" , err )
249
254
return
@@ -493,7 +498,7 @@ func (p *Server) runTemplateImport(ctx, shutdown context.Context, provisioner sd
493
498
return
494
499
}
495
500
496
- _ , err = p . client . CompleteJob ( ctx , & proto.CompletedJob {
501
+ p . completeJob ( & proto.CompletedJob {
497
502
JobId : job .JobId ,
498
503
Type : & proto.CompletedJob_TemplateImport_ {
499
504
TemplateImport : & proto.CompletedJob_TemplateImport {
@@ -502,10 +507,6 @@ func (p *Server) runTemplateImport(ctx, shutdown context.Context, provisioner sd
502
507
},
503
508
},
504
509
})
505
- if err != nil {
506
- p .failActiveJobf ("complete job: %s" , err )
507
- return
508
- }
509
510
}
510
511
511
512
// Parses parameter schemas from source.
@@ -729,15 +730,7 @@ func (p *Server) runWorkspaceBuild(ctx, shutdown context.Context, provisioner sd
729
730
return
730
731
}
731
732
732
- p .opts .Logger .Info (context .Background (), "provision successful; marking job as complete" ,
733
- slog .F ("resource_count" , len (msgType .Complete .Resources )),
734
- slog .F ("resources" , msgType .Complete .Resources ),
735
- slog .F ("state_length" , len (msgType .Complete .State )),
736
- )
737
-
738
- // Complete job may need to be async if we disconnected...
739
- // When we reconnect we can flush any of these cached values.
740
- _ , err = p .client .CompleteJob (ctx , & proto.CompletedJob {
733
+ p .completeJob (& proto.CompletedJob {
741
734
JobId : job .JobId ,
742
735
Type : & proto.CompletedJob_WorkspaceBuild_ {
743
736
WorkspaceBuild : & proto.CompletedJob_WorkspaceBuild {
@@ -746,11 +739,12 @@ func (p *Server) runWorkspaceBuild(ctx, shutdown context.Context, provisioner sd
746
739
},
747
740
},
748
741
})
749
- if err != nil {
750
- p .failActiveJobf ("complete job: %s" , err )
751
- return
752
- }
753
- // Return so we stop looping!
742
+ p .opts .Logger .Info (context .Background (), "provision successful; marked job as complete" ,
743
+ slog .F ("resource_count" , len (msgType .Complete .Resources )),
744
+ slog .F ("resources" , msgType .Complete .Resources ),
745
+ slog .F ("state_length" , len (msgType .Complete .State )),
746
+ )
747
+ // Stop looping!
754
748
return
755
749
default :
756
750
p .failActiveJobf ("invalid message type %T received from provisioner" , msg .Type )
@@ -759,6 +753,19 @@ func (p *Server) runWorkspaceBuild(ctx, shutdown context.Context, provisioner sd
759
753
}
760
754
}
761
755
756
+ func (p * Server ) completeJob (job * proto.CompletedJob ) {
757
+ for retrier := retry .New (25 * time .Millisecond , 5 * time .Second ); retrier .Wait (p .closeContext ); {
758
+ // Complete job may need to be async if we disconnected...
759
+ // When we reconnect we can flush any of these cached values.
760
+ _ , err := p .client .CompleteJob (p .closeContext , job )
761
+ if err != nil {
762
+ p .opts .Logger .Warn (p .closeContext , "failed to complete job" , slog .Error (err ))
763
+ continue
764
+ }
765
+ break
766
+ }
767
+ }
768
+
762
769
func (p * Server ) failActiveJobf (format string , args ... interface {}) {
763
770
p .failActiveJob (& proto.FailedJob {
764
771
Error : fmt .Sprintf (format , args ... ),
@@ -786,12 +793,18 @@ func (p *Server) failActiveJob(failedJob *proto.FailedJob) {
786
793
slog .F ("job_id" , p .jobID ),
787
794
)
788
795
failedJob .JobId = p .jobID
789
- _ , err := p .client .FailJob (context .Background (), failedJob )
790
- if err != nil {
791
- p .opts .Logger .Warn (context .Background (), "failed to notify of error; job is no longer running" , slog .Error (err ))
796
+ for retrier := retry .New (25 * time .Millisecond , 5 * time .Second ); retrier .Wait (p .closeContext ); {
797
+ _ , err := p .client .FailJob (p .closeContext , failedJob )
798
+ if err != nil {
799
+ if p .isClosed () {
800
+ return
801
+ }
802
+ p .opts .Logger .Warn (context .Background (), "failed to notify of error; job is no longer running" , slog .Error (err ))
803
+ continue
804
+ }
805
+ p .opts .Logger .Debug (context .Background (), "marked running job as failed" )
792
806
return
793
807
}
794
- p .opts .Logger .Debug (context .Background (), "marked running job as failed" )
795
808
}
796
809
797
810
// isClosed returns whether the API is closed or not.
0 commit comments