Skip to content

Commit 13ef836

Browse files
committed
Merge branch 'testcleanup' of github.com:coder/coder into testcleanup
2 parents 9fa815c + 57efae2 commit 13ef836

File tree

2 files changed

+68
-63
lines changed

2 files changed

+68
-63
lines changed

database/dump.sql

+32-32
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

provisionerd/provisionerd.go

+36-31
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/hashicorp/yamux"
18+
"go.uber.org/atomic"
1819

1920
"cdr.dev/slog"
2021
"github.com/coder/coder/provisionerd/proto"
@@ -54,7 +55,8 @@ func New(clientDialer Dialer, opts *Options) io.Closer {
5455
closeCancel: ctxCancel,
5556
closed: make(chan struct{}),
5657

57-
jobRunning: make(chan struct{}),
58+
jobRunning: make(chan struct{}),
59+
jobCancelled: *atomic.NewBool(true),
5860
}
5961
// Start off with a closed channel so
6062
// isRunningJob() returns properly.
@@ -77,10 +79,11 @@ type provisionerDaemon struct {
7779
closeError error
7880

7981
// Locked when acquiring or canceling a job.
80-
jobMutex sync.Mutex
81-
jobID string
82-
jobRunning chan struct{}
83-
jobCancel context.CancelFunc
82+
jobMutex sync.Mutex
83+
jobID string
84+
jobRunning chan struct{}
85+
jobCancelled atomic.Bool
86+
jobCancel context.CancelFunc
8487
}
8588

8689
// Connect establishes a connection to coderd.
@@ -193,6 +196,7 @@ func (p *provisionerDaemon) acquireJob(ctx context.Context) {
193196
}
194197
ctx, p.jobCancel = context.WithCancel(ctx)
195198
p.jobRunning = make(chan struct{})
199+
p.jobCancelled.Store(false)
196200
p.jobID = job.JobId
197201

198202
p.opts.Logger.Info(context.Background(), "acquired job",
@@ -220,7 +224,7 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
220224
JobId: job.JobId,
221225
})
222226
if err != nil {
223-
go p.cancelActiveJobf("send periodic update: %s", err)
227+
p.cancelActiveJobf("send periodic update: %s", err)
224228
return
225229
}
226230
}
@@ -247,13 +251,13 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
247251
// It's safe to cast this ProvisionerType. This data is coming directly from coderd.
248252
provisioner, hasProvisioner := p.opts.Provisioners[job.Provisioner]
249253
if !hasProvisioner {
250-
go p.cancelActiveJobf("provisioner %q not registered", job.Provisioner)
254+
p.cancelActiveJobf("provisioner %q not registered", job.Provisioner)
251255
return
252256
}
253257

254258
err := os.MkdirAll(p.opts.WorkDirectory, 0700)
255259
if err != nil {
256-
go p.cancelActiveJobf("create work directory %q: %s", p.opts.WorkDirectory, err)
260+
p.cancelActiveJobf("create work directory %q: %s", p.opts.WorkDirectory, err)
257261
return
258262
}
259263

@@ -265,13 +269,13 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
265269
break
266270
}
267271
if err != nil {
268-
go p.cancelActiveJobf("read project source archive: %s", err)
272+
p.cancelActiveJobf("read project source archive: %s", err)
269273
return
270274
}
271275
// #nosec
272276
path := filepath.Join(p.opts.WorkDirectory, header.Name)
273277
if !strings.HasPrefix(path, filepath.Clean(p.opts.WorkDirectory)) {
274-
go p.cancelActiveJobf("tar attempts to target relative upper directory")
278+
p.cancelActiveJobf("tar attempts to target relative upper directory")
275279
return
276280
}
277281
mode := header.FileInfo().Mode()
@@ -282,14 +286,14 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
282286
case tar.TypeDir:
283287
err = os.MkdirAll(path, mode)
284288
if err != nil {
285-
go p.cancelActiveJobf("mkdir %q: %s", path, err)
289+
p.cancelActiveJobf("mkdir %q: %s", path, err)
286290
return
287291
}
288292
p.opts.Logger.Debug(context.Background(), "extracted directory", slog.F("path", path))
289293
case tar.TypeReg:
290294
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, mode)
291295
if err != nil {
292-
go p.cancelActiveJobf("create file %q (mode %s): %s", path, mode, err)
296+
p.cancelActiveJobf("create file %q (mode %s): %s", path, mode, err)
293297
return
294298
}
295299
// Max file size of 10MB.
@@ -299,12 +303,12 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
299303
}
300304
if err != nil {
301305
_ = file.Close()
302-
go p.cancelActiveJobf("copy file %q: %s", path, err)
306+
p.cancelActiveJobf("copy file %q: %s", path, err)
303307
return
304308
}
305309
err = file.Close()
306310
if err != nil {
307-
go p.cancelActiveJobf("close file %q: %s", path, err)
311+
p.cancelActiveJobf("close file %q: %s", path, err)
308312
return
309313
}
310314
p.opts.Logger.Debug(context.Background(), "extracted file",
@@ -331,7 +335,7 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
331335

332336
p.runWorkspaceProvision(ctx, provisioner, job)
333337
default:
334-
go p.cancelActiveJobf("unknown job type %q; ensure your provisioner daemon is up-to-date", reflect.TypeOf(job.Type).String())
338+
p.cancelActiveJobf("unknown job type %q; ensure your provisioner daemon is up-to-date", reflect.TypeOf(job.Type).String())
335339
return
336340
}
337341

@@ -347,14 +351,14 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
347351
Directory: p.opts.WorkDirectory,
348352
})
349353
if err != nil {
350-
go p.cancelActiveJobf("parse source: %s", err)
354+
p.cancelActiveJobf("parse source: %s", err)
351355
return
352356
}
353357
defer stream.Close()
354358
for {
355359
msg, err := stream.Recv()
356360
if err != nil {
357-
go p.cancelActiveJobf("recv parse source: %s", err)
361+
p.cancelActiveJobf("recv parse source: %s", err)
358362
return
359363
}
360364
switch msgType := msg.Type.(type) {
@@ -375,7 +379,7 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
375379
}},
376380
})
377381
if err != nil {
378-
go p.cancelActiveJobf("update job: %s", err)
382+
p.cancelActiveJobf("update job: %s", err)
379383
return
380384
}
381385
case *sdkproto.Parse_Response_Complete:
@@ -391,13 +395,13 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
391395
},
392396
})
393397
if err != nil {
394-
go p.cancelActiveJobf("complete job: %s", err)
398+
p.cancelActiveJobf("complete job: %s", err)
395399
return
396400
}
397401
// Return so we stop looping!
398402
return
399403
default:
400-
go p.cancelActiveJobf("invalid message type %q received from provisioner",
404+
p.cancelActiveJobf("invalid message type %q received from provisioner",
401405
reflect.TypeOf(msg.Type).String())
402406
return
403407
}
@@ -411,15 +415,15 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
411415
State: job.GetWorkspaceProvision().State,
412416
})
413417
if err != nil {
414-
go p.cancelActiveJobf("provision: %s", err)
418+
p.cancelActiveJobf("provision: %s", err)
415419
return
416420
}
417421
defer stream.Close()
418422

419423
for {
420424
msg, err := stream.Recv()
421425
if err != nil {
422-
go p.cancelActiveJobf("recv workspace provision: %s", err)
426+
p.cancelActiveJobf("recv workspace provision: %s", err)
423427
return
424428
}
425429
switch msgType := msg.Type.(type) {
@@ -440,7 +444,7 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
440444
}},
441445
})
442446
if err != nil {
443-
go p.cancelActiveJobf("send job update: %s", err)
447+
p.cancelActiveJobf("send job update: %s", err)
444448
return
445449
}
446450
case *sdkproto.Provision_Response_Complete:
@@ -462,13 +466,13 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
462466
},
463467
})
464468
if err != nil {
465-
go p.cancelActiveJobf("complete job: %s", err)
469+
p.cancelActiveJobf("complete job: %s", err)
466470
return
467471
}
468472
// Return so we stop looping!
469473
return
470474
default:
471-
go p.cancelActiveJobf("invalid message type %q received from provisioner",
475+
p.cancelActiveJobf("invalid message type %q received from provisioner",
472476
reflect.TypeOf(msg.Type).String())
473477
return
474478
}
@@ -480,13 +484,14 @@ func (p *provisionerDaemon) cancelActiveJobf(format string, args ...interface{})
480484
defer p.jobMutex.Unlock()
481485
errMsg := fmt.Sprintf(format, args...)
482486
if !p.isRunningJob() {
483-
if p.isClosed() {
484-
// We don't want to log if we're already closed!
485-
return
486-
}
487-
p.opts.Logger.Warn(context.Background(), "skipping job cancel; none running", slog.F("error_message", errMsg))
487+
p.opts.Logger.Info(context.Background(), "skipping job cancel; none running", slog.F("error_message", errMsg))
488488
return
489489
}
490+
if p.jobCancelled.Load() {
491+
p.opts.Logger.Warn(context.Background(), "job has already been cancelled", slog.F("error_messsage", errMsg))
492+
return
493+
}
494+
p.jobCancelled.Store(true)
490495
p.jobCancel()
491496
p.opts.Logger.Info(context.Background(), "canceling running job",
492497
slog.F("error_message", errMsg),
@@ -500,7 +505,6 @@ func (p *provisionerDaemon) cancelActiveJobf(format string, args ...interface{})
500505
p.opts.Logger.Warn(context.Background(), "failed to notify of cancel; job is no longer running", slog.Error(err))
501506
return
502507
}
503-
<-p.jobRunning
504508
p.opts.Logger.Debug(context.Background(), "canceled running job")
505509
}
506510

@@ -534,6 +538,7 @@ func (p *provisionerDaemon) closeWithError(err error) error {
534538
errMsg = err.Error()
535539
}
536540
p.cancelActiveJobf(errMsg)
541+
<-p.jobRunning
537542
p.closeCancel()
538543

539544
p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err))

0 commit comments

Comments
 (0)