@@ -51,9 +51,8 @@ func New(clientDialer Dialer, opts *Options) io.Closer {
51
51
clientDialer : clientDialer ,
52
52
opts : opts ,
53
53
54
- closeContext : ctx ,
55
- closeCancel : ctxCancel ,
56
- closed : make (chan struct {}),
54
+ closeCancel : ctxCancel ,
55
+ closed : make (chan struct {}),
57
56
58
57
jobRunning : make (chan struct {}),
59
58
}
@@ -71,23 +70,21 @@ type provisionerDaemon struct {
71
70
client proto.DRPCProvisionerDaemonClient
72
71
updateStream proto.DRPCProvisionerDaemon_UpdateJobClient
73
72
74
- closeContext context. Context
75
- closeCancel context. CancelFunc
76
- closed chan struct {}
77
- closeMutex sync. Mutex
78
- closeError error
73
+ // Locked when closing the daemon.
74
+ closeMutex sync. Mutex
75
+ closeCancel context. CancelFunc
76
+ closed chan struct {}
77
+ closeError error
79
78
80
- jobID string
79
+ // Locked when acquiring or canceling a job.
81
80
jobMutex sync.Mutex
81
+ jobID string
82
82
jobRunning chan struct {}
83
83
jobCancel context.CancelFunc
84
84
}
85
85
86
86
// Connect establishes a connection to coderd.
87
87
func (p * provisionerDaemon ) connect (ctx context.Context ) {
88
- p .jobMutex .Lock ()
89
- defer p .jobMutex .Unlock ()
90
-
91
88
var err error
92
89
// An exponential back-off occurs when the connection is failing to dial.
93
90
// This is to prevent server spam in case of a coderd outage.
@@ -102,6 +99,9 @@ func (p *provisionerDaemon) connect(ctx context.Context) {
102
99
}
103
100
p .updateStream , err = p .client .UpdateJob (ctx )
104
101
if err != nil {
102
+ if errors .Is (err , context .Canceled ) {
103
+ return
104
+ }
105
105
p .opts .Logger .Warn (context .Background (), "create update job stream" , slog .Error (err ))
106
106
continue
107
107
}
@@ -126,12 +126,6 @@ func (p *provisionerDaemon) connect(ctx context.Context) {
126
126
// has been interrupted. This works well, because logs need
127
127
// to buffer if a job is running in the background.
128
128
p .opts .Logger .Debug (context .Background (), "update stream ended" , slog .Error (p .updateStream .Context ().Err ()))
129
- // Make sure we're not closing here!
130
- p .closeMutex .Lock ()
131
- defer p .closeMutex .Unlock ()
132
- if p .isClosed () {
133
- return
134
- }
135
129
p .connect (ctx )
136
130
}
137
131
}()
@@ -168,6 +162,9 @@ func (p *provisionerDaemon) isRunningJob() bool {
168
162
func (p * provisionerDaemon ) acquireJob (ctx context.Context ) {
169
163
p .jobMutex .Lock ()
170
164
defer p .jobMutex .Unlock ()
165
+ if p .isClosed () {
166
+ return
167
+ }
171
168
if p .isRunningJob () {
172
169
p .opts .Logger .Debug (context .Background (), "skipping acquire; job is already running" )
173
170
return
@@ -184,15 +181,10 @@ func (p *provisionerDaemon) acquireJob(ctx context.Context) {
184
181
p .opts .Logger .Warn (context .Background (), "acquire job" , slog .Error (err ))
185
182
return
186
183
}
187
- if p .isClosed () {
188
- return
189
- }
190
184
if job .JobId == "" {
191
185
p .opts .Logger .Debug (context .Background (), "no jobs available" )
192
186
return
193
187
}
194
- p .closeMutex .Lock ()
195
- defer p .closeMutex .Unlock ()
196
188
ctx , p .jobCancel = context .WithCancel (ctx )
197
189
p .jobRunning = make (chan struct {})
198
190
p .jobID = job .JobId
@@ -222,31 +214,40 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
222
214
JobId : job .JobId ,
223
215
})
224
216
if err != nil {
225
- go p .cancelActiveJob ( fmt . Sprintf ( "send periodic update: %s" , err ) )
217
+ go p .cancelActiveJobf ( "send periodic update: %s" , err )
226
218
return
227
219
}
228
220
}
229
221
}()
230
222
defer func () {
231
223
// Cleanup the work directory after execution.
232
- err := os .RemoveAll (p .opts .WorkDirectory )
233
- if err != nil {
234
- go p .cancelActiveJob (fmt .Sprintf ("remove all from %q directory: %s" , p .opts .WorkDirectory , err ))
235
- return
224
+ for attempt := 0 ; attempt < 5 ; attempt ++ {
225
+ err := os .RemoveAll (p .opts .WorkDirectory )
226
+ if err != nil {
227
+ // On Windows, open files cannot be removed.
228
+ // When the provisioner daemon is shutting down,
229
+ // it may take a few milliseconds for processes to exit.
230
+ // See: https://github.com/golang/go/issues/50510
231
+ p .opts .Logger .Debug (ctx , "failed to clean work directory; trying again" , slog .Error (err ))
232
+ time .Sleep (250 * time .Millisecond )
233
+ continue
234
+ }
235
+ p .opts .Logger .Debug (ctx , "cleaned up work directory" , slog .Error (err ))
236
+ break
236
237
}
237
- p . opts . Logger . Debug ( ctx , "cleaned up work directory" )
238
+
238
239
close (p .jobRunning )
239
240
}()
240
241
// It's safe to cast this ProvisionerType. This data is coming directly from coderd.
241
242
provisioner , hasProvisioner := p .opts .Provisioners [job .Provisioner ]
242
243
if ! hasProvisioner {
243
- go p .cancelActiveJob ( fmt . Sprintf ( "provisioner %q not registered" , job .Provisioner ) )
244
+ go p .cancelActiveJobf ( "provisioner %q not registered" , job .Provisioner )
244
245
return
245
246
}
246
247
247
248
err := os .MkdirAll (p .opts .WorkDirectory , 0700 )
248
249
if err != nil {
249
- go p .cancelActiveJob ( fmt . Sprintf ( "create work directory %q: %s" , p .opts .WorkDirectory , err ) )
250
+ go p .cancelActiveJobf ( "create work directory %q: %s" , p .opts .WorkDirectory , err )
250
251
return
251
252
}
252
253
@@ -258,13 +259,13 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
258
259
break
259
260
}
260
261
if err != nil {
261
- go p .cancelActiveJob ( fmt . Sprintf ( "read project source archive: %s" , err ) )
262
+ go p .cancelActiveJobf ( "read project source archive: %s" , err )
262
263
return
263
264
}
264
265
// #nosec
265
266
path := filepath .Join (p .opts .WorkDirectory , header .Name )
266
267
if ! strings .HasPrefix (path , filepath .Clean (p .opts .WorkDirectory )) {
267
- go p .cancelActiveJob ("tar attempts to target relative upper directory" )
268
+ go p .cancelActiveJobf ("tar attempts to target relative upper directory" )
268
269
return
269
270
}
270
271
mode := header .FileInfo ().Mode ()
@@ -275,14 +276,14 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
275
276
case tar .TypeDir :
276
277
err = os .MkdirAll (path , mode )
277
278
if err != nil {
278
- go p .cancelActiveJob ( fmt . Sprintf ( "mkdir %q: %s" , path , err ) )
279
+ go p .cancelActiveJobf ( "mkdir %q: %s" , path , err )
279
280
return
280
281
}
281
282
p .opts .Logger .Debug (context .Background (), "extracted directory" , slog .F ("path" , path ))
282
283
case tar .TypeReg :
283
284
file , err := os .OpenFile (path , os .O_CREATE | os .O_RDWR , mode )
284
285
if err != nil {
285
- go p .cancelActiveJob ( fmt . Sprintf ( "create file %q (mode %s): %s" , path , mode , err ) )
286
+ go p .cancelActiveJobf ( "create file %q (mode %s): %s" , path , mode , err )
286
287
return
287
288
}
288
289
// Max file size of 10MB.
@@ -291,12 +292,13 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
291
292
err = nil
292
293
}
293
294
if err != nil {
294
- go p .cancelActiveJob (fmt .Sprintf ("copy file %q: %s" , path , err ))
295
+ _ = file .Close ()
296
+ go p .cancelActiveJobf ("copy file %q: %s" , path , err )
295
297
return
296
298
}
297
299
err = file .Close ()
298
300
if err != nil {
299
- go p .cancelActiveJob ( fmt . Sprintf ( "close file %q: %s" , path , err ) )
301
+ go p .cancelActiveJobf ( "close file %q: %s" , path , err )
300
302
return
301
303
}
302
304
p .opts .Logger .Debug (context .Background (), "extracted file" ,
@@ -323,26 +325,30 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
323
325
324
326
p .runWorkspaceProvision (ctx , provisioner , job )
325
327
default :
326
- go p .cancelActiveJob ( fmt . Sprintf ( "unknown job type %q; ensure your provisioner daemon is up-to-date" , reflect .TypeOf (job .Type ).String () ))
328
+ go p .cancelActiveJobf ( "unknown job type %q; ensure your provisioner daemon is up-to-date" , reflect .TypeOf (job .Type ).String ())
327
329
return
328
330
}
329
331
330
- p .opts .Logger .Info (context .Background (), "completed job" , slog .F ("id" , job .JobId ))
332
+ // Ensure the job is still running to output.
333
+ // It's possible the job was canceled.
334
+ if p .isRunningJob () {
335
+ p .opts .Logger .Info (context .Background (), "completed job" , slog .F ("id" , job .JobId ))
336
+ }
331
337
}
332
338
333
339
func (p * provisionerDaemon ) runProjectImport (ctx context.Context , provisioner sdkproto.DRPCProvisionerClient , job * proto.AcquiredJob ) {
334
340
stream , err := provisioner .Parse (ctx , & sdkproto.Parse_Request {
335
341
Directory : p .opts .WorkDirectory ,
336
342
})
337
343
if err != nil {
338
- go p .cancelActiveJob ( fmt . Sprintf ( "parse source: %s" , err ) )
344
+ go p .cancelActiveJobf ( "parse source: %s" , err )
339
345
return
340
346
}
341
347
defer stream .Close ()
342
348
for {
343
349
msg , err := stream .Recv ()
344
350
if err != nil {
345
- go p .cancelActiveJob ( fmt . Sprintf ( "recv parse source: %s" , err ) )
351
+ go p .cancelActiveJobf ( "recv parse source: %s" , err )
346
352
return
347
353
}
348
354
switch msgType := msg .Type .(type ) {
@@ -363,7 +369,7 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
363
369
}},
364
370
})
365
371
if err != nil {
366
- go p .cancelActiveJob ( fmt . Sprintf ( "update job: %s" , err ) )
372
+ go p .cancelActiveJobf ( "update job: %s" , err )
367
373
return
368
374
}
369
375
case * sdkproto.Parse_Response_Complete :
@@ -379,14 +385,14 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
379
385
},
380
386
})
381
387
if err != nil {
382
- go p .cancelActiveJob ( fmt . Sprintf ( "complete job: %s" , err ) )
388
+ go p .cancelActiveJobf ( "complete job: %s" , err )
383
389
return
384
390
}
385
391
// Return so we stop looping!
386
392
return
387
393
default :
388
- go p .cancelActiveJob ( fmt . Sprintf ("invalid message type %q received from provisioner" ,
389
- reflect .TypeOf (msg .Type ).String ()))
394
+ go p .cancelActiveJobf ("invalid message type %q received from provisioner" ,
395
+ reflect .TypeOf (msg .Type ).String ())
390
396
return
391
397
}
392
398
}
@@ -399,15 +405,15 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
399
405
State : job .GetWorkspaceProvision ().State ,
400
406
})
401
407
if err != nil {
402
- go p .cancelActiveJob ( fmt . Sprintf ( "provision: %s" , err ) )
408
+ go p .cancelActiveJobf ( "provision: %s" , err )
403
409
return
404
410
}
405
411
defer stream .Close ()
406
412
407
413
for {
408
414
msg , err := stream .Recv ()
409
415
if err != nil {
410
- go p .cancelActiveJob ( fmt . Sprintf ( "recv workspace provision: %s" , err ) )
416
+ go p .cancelActiveJobf ( "recv workspace provision: %s" , err )
411
417
return
412
418
}
413
419
switch msgType := msg .Type .(type ) {
@@ -428,7 +434,7 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
428
434
}},
429
435
})
430
436
if err != nil {
431
- go p .cancelActiveJob ( fmt . Sprintf ( "send job update: %s" , err ) )
437
+ go p .cancelActiveJobf ( "send job update: %s" , err )
432
438
return
433
439
}
434
440
case * sdkproto.Provision_Response_Complete :
@@ -450,26 +456,28 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
450
456
},
451
457
})
452
458
if err != nil {
453
- go p .cancelActiveJob ( fmt . Sprintf ( "complete job: %s" , err ) )
459
+ go p .cancelActiveJobf ( "complete job: %s" , err )
454
460
return
455
461
}
456
462
// Return so we stop looping!
457
463
return
458
464
default :
459
- go p .cancelActiveJob ( fmt . Sprintf ("invalid message type %q received from provisioner" ,
460
- reflect .TypeOf (msg .Type ).String ()))
465
+ go p .cancelActiveJobf ("invalid message type %q received from provisioner" ,
466
+ reflect .TypeOf (msg .Type ).String ())
461
467
return
462
468
}
463
469
}
464
470
}
465
471
466
- func (p * provisionerDaemon ) cancelActiveJob ( errMsg string ) {
472
+ func (p * provisionerDaemon ) cancelActiveJobf ( format string , args ... interface {} ) {
467
473
p .jobMutex .Lock ()
468
474
defer p .jobMutex .Unlock ()
469
- if p .isClosed () {
470
- return
471
- }
475
+ errMsg := fmt .Sprintf (format , args ... )
472
476
if ! p .isRunningJob () {
477
+ if p .isClosed () {
478
+ // We don't want to log if we're already closed!
479
+ return
480
+ }
473
481
p .opts .Logger .Warn (context .Background (), "skipping job cancel; none running" , slog .F ("error_message" , errMsg ))
474
482
return
475
483
}
@@ -512,22 +520,17 @@ func (p *provisionerDaemon) closeWithError(err error) error {
512
520
if p .isClosed () {
513
521
return p .closeError
514
522
}
515
- p .closeCancel ()
523
+ p .closeError = err
524
+ close (p .closed )
525
+
516
526
errMsg := "provisioner daemon was shutdown gracefully"
517
527
if err != nil {
518
528
errMsg = err .Error ()
519
529
}
520
- p .cancelActiveJob (errMsg )
521
- p .jobMutex .Lock ()
522
- defer p .jobMutex .Unlock ()
523
- p .opts .Logger .Debug (context .Background (), "closing server with error" , slog .Error (err ))
524
- p .closeError = err
525
- close (p .closed )
530
+ p .cancelActiveJobf (errMsg )
531
+ p .closeCancel ()
526
532
527
- if p .updateStream != nil {
528
- _ = p .client .DRPCConn ().Close ()
529
- _ = p .updateStream .Close ()
530
- }
533
+ p .opts .Logger .Debug (context .Background (), "closing server with error" , slog .Error (err ))
531
534
532
535
return err
533
536
}
0 commit comments