@@ -15,6 +15,7 @@ import (
15
15
"time"
16
16
17
17
"github.com/hashicorp/yamux"
18
+ "go.uber.org/atomic"
18
19
19
20
"cdr.dev/slog"
20
21
"github.com/coder/coder/provisionerd/proto"
@@ -54,7 +55,8 @@ func New(clientDialer Dialer, opts *Options) io.Closer {
54
55
closeCancel : ctxCancel ,
55
56
closed : make (chan struct {}),
56
57
57
- jobRunning : make (chan struct {}),
58
+ jobRunning : make (chan struct {}),
59
+ jobCancelled : * atomic .NewBool (true ),
58
60
}
59
61
// Start off with a closed channel so
60
62
// isRunningJob() returns properly.
@@ -77,10 +79,11 @@ type provisionerDaemon struct {
77
79
closeError error
78
80
79
81
// Locked when acquiring or canceling a job.
80
- jobMutex sync.Mutex
81
- jobID string
82
- jobRunning chan struct {}
83
- jobCancel context.CancelFunc
82
+ jobMutex sync.Mutex
83
+ jobID string
84
+ jobRunning chan struct {}
85
+ jobCancelled atomic.Bool
86
+ jobCancel context.CancelFunc
84
87
}
85
88
86
89
// Connect establishes a connection to coderd.
@@ -193,6 +196,7 @@ func (p *provisionerDaemon) acquireJob(ctx context.Context) {
193
196
}
194
197
ctx , p .jobCancel = context .WithCancel (ctx )
195
198
p .jobRunning = make (chan struct {})
199
+ p .jobCancelled .Store (false )
196
200
p .jobID = job .JobId
197
201
198
202
p .opts .Logger .Info (context .Background (), "acquired job" ,
@@ -220,7 +224,7 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
220
224
JobId : job .JobId ,
221
225
})
222
226
if err != nil {
223
- go p .cancelActiveJobf ("send periodic update: %s" , err )
227
+ p .cancelActiveJobf ("send periodic update: %s" , err )
224
228
return
225
229
}
226
230
}
@@ -247,13 +251,13 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
247
251
// It's safe to cast this ProvisionerType. This data is coming directly from coderd.
248
252
provisioner , hasProvisioner := p .opts .Provisioners [job .Provisioner ]
249
253
if ! hasProvisioner {
250
- go p .cancelActiveJobf ("provisioner %q not registered" , job .Provisioner )
254
+ p .cancelActiveJobf ("provisioner %q not registered" , job .Provisioner )
251
255
return
252
256
}
253
257
254
258
err := os .MkdirAll (p .opts .WorkDirectory , 0700 )
255
259
if err != nil {
256
- go p .cancelActiveJobf ("create work directory %q: %s" , p .opts .WorkDirectory , err )
260
+ p .cancelActiveJobf ("create work directory %q: %s" , p .opts .WorkDirectory , err )
257
261
return
258
262
}
259
263
@@ -265,13 +269,13 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
265
269
break
266
270
}
267
271
if err != nil {
268
- go p .cancelActiveJobf ("read project source archive: %s" , err )
272
+ p .cancelActiveJobf ("read project source archive: %s" , err )
269
273
return
270
274
}
271
275
// #nosec
272
276
path := filepath .Join (p .opts .WorkDirectory , header .Name )
273
277
if ! strings .HasPrefix (path , filepath .Clean (p .opts .WorkDirectory )) {
274
- go p .cancelActiveJobf ("tar attempts to target relative upper directory" )
278
+ p .cancelActiveJobf ("tar attempts to target relative upper directory" )
275
279
return
276
280
}
277
281
mode := header .FileInfo ().Mode ()
@@ -282,14 +286,14 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
282
286
case tar .TypeDir :
283
287
err = os .MkdirAll (path , mode )
284
288
if err != nil {
285
- go p .cancelActiveJobf ("mkdir %q: %s" , path , err )
289
+ p .cancelActiveJobf ("mkdir %q: %s" , path , err )
286
290
return
287
291
}
288
292
p .opts .Logger .Debug (context .Background (), "extracted directory" , slog .F ("path" , path ))
289
293
case tar .TypeReg :
290
294
file , err := os .OpenFile (path , os .O_CREATE | os .O_RDWR , mode )
291
295
if err != nil {
292
- go p .cancelActiveJobf ("create file %q (mode %s): %s" , path , mode , err )
296
+ p .cancelActiveJobf ("create file %q (mode %s): %s" , path , mode , err )
293
297
return
294
298
}
295
299
// Max file size of 10MB.
@@ -299,12 +303,12 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
299
303
}
300
304
if err != nil {
301
305
_ = file .Close ()
302
- go p .cancelActiveJobf ("copy file %q: %s" , path , err )
306
+ p .cancelActiveJobf ("copy file %q: %s" , path , err )
303
307
return
304
308
}
305
309
err = file .Close ()
306
310
if err != nil {
307
- go p .cancelActiveJobf ("close file %q: %s" , path , err )
311
+ p .cancelActiveJobf ("close file %q: %s" , path , err )
308
312
return
309
313
}
310
314
p .opts .Logger .Debug (context .Background (), "extracted file" ,
@@ -331,7 +335,7 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
331
335
332
336
p .runWorkspaceProvision (ctx , provisioner , job )
333
337
default :
334
- go p .cancelActiveJobf ("unknown job type %q; ensure your provisioner daemon is up-to-date" , reflect .TypeOf (job .Type ).String ())
338
+ p .cancelActiveJobf ("unknown job type %q; ensure your provisioner daemon is up-to-date" , reflect .TypeOf (job .Type ).String ())
335
339
return
336
340
}
337
341
@@ -347,14 +351,14 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
347
351
Directory : p .opts .WorkDirectory ,
348
352
})
349
353
if err != nil {
350
- go p .cancelActiveJobf ("parse source: %s" , err )
354
+ p .cancelActiveJobf ("parse source: %s" , err )
351
355
return
352
356
}
353
357
defer stream .Close ()
354
358
for {
355
359
msg , err := stream .Recv ()
356
360
if err != nil {
357
- go p .cancelActiveJobf ("recv parse source: %s" , err )
361
+ p .cancelActiveJobf ("recv parse source: %s" , err )
358
362
return
359
363
}
360
364
switch msgType := msg .Type .(type ) {
@@ -375,7 +379,7 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
375
379
}},
376
380
})
377
381
if err != nil {
378
- go p .cancelActiveJobf ("update job: %s" , err )
382
+ p .cancelActiveJobf ("update job: %s" , err )
379
383
return
380
384
}
381
385
case * sdkproto.Parse_Response_Complete :
@@ -391,13 +395,13 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
391
395
},
392
396
})
393
397
if err != nil {
394
- go p .cancelActiveJobf ("complete job: %s" , err )
398
+ p .cancelActiveJobf ("complete job: %s" , err )
395
399
return
396
400
}
397
401
// Return so we stop looping!
398
402
return
399
403
default :
400
- go p .cancelActiveJobf ("invalid message type %q received from provisioner" ,
404
+ p .cancelActiveJobf ("invalid message type %q received from provisioner" ,
401
405
reflect .TypeOf (msg .Type ).String ())
402
406
return
403
407
}
@@ -411,15 +415,15 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
411
415
State : job .GetWorkspaceProvision ().State ,
412
416
})
413
417
if err != nil {
414
- go p .cancelActiveJobf ("provision: %s" , err )
418
+ p .cancelActiveJobf ("provision: %s" , err )
415
419
return
416
420
}
417
421
defer stream .Close ()
418
422
419
423
for {
420
424
msg , err := stream .Recv ()
421
425
if err != nil {
422
- go p .cancelActiveJobf ("recv workspace provision: %s" , err )
426
+ p .cancelActiveJobf ("recv workspace provision: %s" , err )
423
427
return
424
428
}
425
429
switch msgType := msg .Type .(type ) {
@@ -440,7 +444,7 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
440
444
}},
441
445
})
442
446
if err != nil {
443
- go p .cancelActiveJobf ("send job update: %s" , err )
447
+ p .cancelActiveJobf ("send job update: %s" , err )
444
448
return
445
449
}
446
450
case * sdkproto.Provision_Response_Complete :
@@ -462,13 +466,13 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
462
466
},
463
467
})
464
468
if err != nil {
465
- go p .cancelActiveJobf ("complete job: %s" , err )
469
+ p .cancelActiveJobf ("complete job: %s" , err )
466
470
return
467
471
}
468
472
// Return so we stop looping!
469
473
return
470
474
default :
471
- go p .cancelActiveJobf ("invalid message type %q received from provisioner" ,
475
+ p .cancelActiveJobf ("invalid message type %q received from provisioner" ,
472
476
reflect .TypeOf (msg .Type ).String ())
473
477
return
474
478
}
@@ -480,13 +484,14 @@ func (p *provisionerDaemon) cancelActiveJobf(format string, args ...interface{})
480
484
defer p .jobMutex .Unlock ()
481
485
errMsg := fmt .Sprintf (format , args ... )
482
486
if ! p .isRunningJob () {
483
- if p .isClosed () {
484
- // We don't want to log if we're already closed!
485
- return
486
- }
487
- p .opts .Logger .Warn (context .Background (), "skipping job cancel; none running" , slog .F ("error_message" , errMsg ))
487
+ p .opts .Logger .Info (context .Background (), "skipping job cancel; none running" , slog .F ("error_message" , errMsg ))
488
488
return
489
489
}
490
+ if p .jobCancelled .Load () {
491
+ p .opts .Logger .Warn (context .Background (), "job has already been cancelled" , slog .F ("error_messsage" , errMsg ))
492
+ return
493
+ }
494
+ p .jobCancelled .Store (true )
490
495
p .jobCancel ()
491
496
p .opts .Logger .Info (context .Background (), "canceling running job" ,
492
497
slog .F ("error_message" , errMsg ),
@@ -500,7 +505,6 @@ func (p *provisionerDaemon) cancelActiveJobf(format string, args ...interface{})
500
505
p .opts .Logger .Warn (context .Background (), "failed to notify of cancel; job is no longer running" , slog .Error (err ))
501
506
return
502
507
}
503
- <- p .jobRunning
504
508
p .opts .Logger .Debug (context .Background (), "canceled running job" )
505
509
}
506
510
@@ -534,6 +538,7 @@ func (p *provisionerDaemon) closeWithError(err error) error {
534
538
errMsg = err .Error ()
535
539
}
536
540
p .cancelActiveJobf (errMsg )
541
+ <- p .jobRunning
537
542
p .closeCancel ()
538
543
539
544
p .opts .Logger .Debug (context .Background (), "closing server with error" , slog .Error (err ))
0 commit comments