Skip to content

Commit 4afc5a3

Browse files
authored
Merge branch 'main' into testpackage
2 parents 1e3bde6 + b6d27ad commit 4afc5a3

File tree

12 files changed

+8216
-259
lines changed

12 files changed

+8216
-259
lines changed

.github/workflows/coder.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ jobs:
198198
- run: yarn build
199199
working-directory: site
200200

201+
- run: yarn storybook:build
202+
working-directory: site
203+
201204
- run: yarn test:coverage
202205
working-directory: site
203206

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ yarn-error.log
1919
site/.eslintcache
2020
site/.next/
2121
site/node_modules/
22+
site/storybook-static/
2223
site/yarn-error.log
2324
coverage/
2425

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"goleak",
3535
"hashicorp",
3636
"httpmw",
37+
"Jobf",
3738
"moby",
3839
"nhooyr",
3940
"nolint",

provisionerd/provisionerd.go

Lines changed: 69 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ func New(clientDialer Dialer, opts *Options) io.Closer {
5151
clientDialer: clientDialer,
5252
opts: opts,
5353

54-
closeContext: ctx,
55-
closeCancel: ctxCancel,
56-
closed: make(chan struct{}),
54+
closeCancel: ctxCancel,
55+
closed: make(chan struct{}),
5756

5857
jobRunning: make(chan struct{}),
5958
}
@@ -71,23 +70,21 @@ type provisionerDaemon struct {
7170
client proto.DRPCProvisionerDaemonClient
7271
updateStream proto.DRPCProvisionerDaemon_UpdateJobClient
7372

74-
closeContext context.Context
75-
closeCancel context.CancelFunc
76-
closed chan struct{}
77-
closeMutex sync.Mutex
78-
closeError error
73+
// Locked when closing the daemon.
74+
closeMutex sync.Mutex
75+
closeCancel context.CancelFunc
76+
closed chan struct{}
77+
closeError error
7978

80-
jobID string
79+
// Locked when acquiring or canceling a job.
8180
jobMutex sync.Mutex
81+
jobID string
8282
jobRunning chan struct{}
8383
jobCancel context.CancelFunc
8484
}
8585

8686
// Connect establishes a connection to coderd.
8787
func (p *provisionerDaemon) connect(ctx context.Context) {
88-
p.jobMutex.Lock()
89-
defer p.jobMutex.Unlock()
90-
9188
var err error
9289
// An exponential back-off occurs when the connection is failing to dial.
9390
// This is to prevent server spam in case of a coderd outage.
@@ -102,6 +99,9 @@ func (p *provisionerDaemon) connect(ctx context.Context) {
10299
}
103100
p.updateStream, err = p.client.UpdateJob(ctx)
104101
if err != nil {
102+
if errors.Is(err, context.Canceled) {
103+
return
104+
}
105105
p.opts.Logger.Warn(context.Background(), "create update job stream", slog.Error(err))
106106
continue
107107
}
@@ -126,12 +126,6 @@ func (p *provisionerDaemon) connect(ctx context.Context) {
126126
// has been interrupted. This works well, because logs need
127127
// to buffer if a job is running in the background.
128128
p.opts.Logger.Debug(context.Background(), "update stream ended", slog.Error(p.updateStream.Context().Err()))
129-
// Make sure we're not closing here!
130-
p.closeMutex.Lock()
131-
defer p.closeMutex.Unlock()
132-
if p.isClosed() {
133-
return
134-
}
135129
p.connect(ctx)
136130
}
137131
}()
@@ -168,6 +162,9 @@ func (p *provisionerDaemon) isRunningJob() bool {
168162
func (p *provisionerDaemon) acquireJob(ctx context.Context) {
169163
p.jobMutex.Lock()
170164
defer p.jobMutex.Unlock()
165+
if p.isClosed() {
166+
return
167+
}
171168
if p.isRunningJob() {
172169
p.opts.Logger.Debug(context.Background(), "skipping acquire; job is already running")
173170
return
@@ -184,15 +181,10 @@ func (p *provisionerDaemon) acquireJob(ctx context.Context) {
184181
p.opts.Logger.Warn(context.Background(), "acquire job", slog.Error(err))
185182
return
186183
}
187-
if p.isClosed() {
188-
return
189-
}
190184
if job.JobId == "" {
191185
p.opts.Logger.Debug(context.Background(), "no jobs available")
192186
return
193187
}
194-
p.closeMutex.Lock()
195-
defer p.closeMutex.Unlock()
196188
ctx, p.jobCancel = context.WithCancel(ctx)
197189
p.jobRunning = make(chan struct{})
198190
p.jobID = job.JobId
@@ -222,31 +214,40 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
222214
JobId: job.JobId,
223215
})
224216
if err != nil {
225-
go p.cancelActiveJob(fmt.Sprintf("send periodic update: %s", err))
217+
go p.cancelActiveJobf("send periodic update: %s", err)
226218
return
227219
}
228220
}
229221
}()
230222
defer func() {
231223
// Cleanup the work directory after execution.
232-
err := os.RemoveAll(p.opts.WorkDirectory)
233-
if err != nil {
234-
go p.cancelActiveJob(fmt.Sprintf("remove all from %q directory: %s", p.opts.WorkDirectory, err))
235-
return
224+
for attempt := 0; attempt < 5; attempt++ {
225+
err := os.RemoveAll(p.opts.WorkDirectory)
226+
if err != nil {
227+
// On Windows, open files cannot be removed.
228+
// When the provisioner daemon is shutting down,
229+
// it may take a few milliseconds for processes to exit.
230+
// See: https://github.com/golang/go/issues/50510
231+
p.opts.Logger.Debug(ctx, "failed to clean work directory; trying again", slog.Error(err))
232+
time.Sleep(250 * time.Millisecond)
233+
continue
234+
}
235+
p.opts.Logger.Debug(ctx, "cleaned up work directory", slog.Error(err))
236+
break
236237
}
237-
p.opts.Logger.Debug(ctx, "cleaned up work directory")
238+
238239
close(p.jobRunning)
239240
}()
240241
// It's safe to cast this ProvisionerType. This data is coming directly from coderd.
241242
provisioner, hasProvisioner := p.opts.Provisioners[job.Provisioner]
242243
if !hasProvisioner {
243-
go p.cancelActiveJob(fmt.Sprintf("provisioner %q not registered", job.Provisioner))
244+
go p.cancelActiveJobf("provisioner %q not registered", job.Provisioner)
244245
return
245246
}
246247

247248
err := os.MkdirAll(p.opts.WorkDirectory, 0700)
248249
if err != nil {
249-
go p.cancelActiveJob(fmt.Sprintf("create work directory %q: %s", p.opts.WorkDirectory, err))
250+
go p.cancelActiveJobf("create work directory %q: %s", p.opts.WorkDirectory, err)
250251
return
251252
}
252253

@@ -258,13 +259,13 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
258259
break
259260
}
260261
if err != nil {
261-
go p.cancelActiveJob(fmt.Sprintf("read project source archive: %s", err))
262+
go p.cancelActiveJobf("read project source archive: %s", err)
262263
return
263264
}
264265
// #nosec
265266
path := filepath.Join(p.opts.WorkDirectory, header.Name)
266267
if !strings.HasPrefix(path, filepath.Clean(p.opts.WorkDirectory)) {
267-
go p.cancelActiveJob("tar attempts to target relative upper directory")
268+
go p.cancelActiveJobf("tar attempts to target relative upper directory")
268269
return
269270
}
270271
mode := header.FileInfo().Mode()
@@ -275,14 +276,14 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
275276
case tar.TypeDir:
276277
err = os.MkdirAll(path, mode)
277278
if err != nil {
278-
go p.cancelActiveJob(fmt.Sprintf("mkdir %q: %s", path, err))
279+
go p.cancelActiveJobf("mkdir %q: %s", path, err)
279280
return
280281
}
281282
p.opts.Logger.Debug(context.Background(), "extracted directory", slog.F("path", path))
282283
case tar.TypeReg:
283284
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, mode)
284285
if err != nil {
285-
go p.cancelActiveJob(fmt.Sprintf("create file %q (mode %s): %s", path, mode, err))
286+
go p.cancelActiveJobf("create file %q (mode %s): %s", path, mode, err)
286287
return
287288
}
288289
// Max file size of 10MB.
@@ -291,12 +292,13 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
291292
err = nil
292293
}
293294
if err != nil {
294-
go p.cancelActiveJob(fmt.Sprintf("copy file %q: %s", path, err))
295+
_ = file.Close()
296+
go p.cancelActiveJobf("copy file %q: %s", path, err)
295297
return
296298
}
297299
err = file.Close()
298300
if err != nil {
299-
go p.cancelActiveJob(fmt.Sprintf("close file %q: %s", path, err))
301+
go p.cancelActiveJobf("close file %q: %s", path, err)
300302
return
301303
}
302304
p.opts.Logger.Debug(context.Background(), "extracted file",
@@ -323,26 +325,30 @@ func (p *provisionerDaemon) runJob(ctx context.Context, job *proto.AcquiredJob)
323325

324326
p.runWorkspaceProvision(ctx, provisioner, job)
325327
default:
326-
go p.cancelActiveJob(fmt.Sprintf("unknown job type %q; ensure your provisioner daemon is up-to-date", reflect.TypeOf(job.Type).String()))
328+
go p.cancelActiveJobf("unknown job type %q; ensure your provisioner daemon is up-to-date", reflect.TypeOf(job.Type).String())
327329
return
328330
}
329331

330-
p.opts.Logger.Info(context.Background(), "completed job", slog.F("id", job.JobId))
332+
// Ensure the job is still running to output.
333+
// It's possible the job was canceled.
334+
if p.isRunningJob() {
335+
p.opts.Logger.Info(context.Background(), "completed job", slog.F("id", job.JobId))
336+
}
331337
}
332338

333339
func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sdkproto.DRPCProvisionerClient, job *proto.AcquiredJob) {
334340
stream, err := provisioner.Parse(ctx, &sdkproto.Parse_Request{
335341
Directory: p.opts.WorkDirectory,
336342
})
337343
if err != nil {
338-
go p.cancelActiveJob(fmt.Sprintf("parse source: %s", err))
344+
go p.cancelActiveJobf("parse source: %s", err)
339345
return
340346
}
341347
defer stream.Close()
342348
for {
343349
msg, err := stream.Recv()
344350
if err != nil {
345-
go p.cancelActiveJob(fmt.Sprintf("recv parse source: %s", err))
351+
go p.cancelActiveJobf("recv parse source: %s", err)
346352
return
347353
}
348354
switch msgType := msg.Type.(type) {
@@ -363,7 +369,7 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
363369
}},
364370
})
365371
if err != nil {
366-
go p.cancelActiveJob(fmt.Sprintf("update job: %s", err))
372+
go p.cancelActiveJobf("update job: %s", err)
367373
return
368374
}
369375
case *sdkproto.Parse_Response_Complete:
@@ -379,14 +385,14 @@ func (p *provisionerDaemon) runProjectImport(ctx context.Context, provisioner sd
379385
},
380386
})
381387
if err != nil {
382-
go p.cancelActiveJob(fmt.Sprintf("complete job: %s", err))
388+
go p.cancelActiveJobf("complete job: %s", err)
383389
return
384390
}
385391
// Return so we stop looping!
386392
return
387393
default:
388-
go p.cancelActiveJob(fmt.Sprintf("invalid message type %q received from provisioner",
389-
reflect.TypeOf(msg.Type).String()))
394+
go p.cancelActiveJobf("invalid message type %q received from provisioner",
395+
reflect.TypeOf(msg.Type).String())
390396
return
391397
}
392398
}
@@ -399,15 +405,15 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
399405
State: job.GetWorkspaceProvision().State,
400406
})
401407
if err != nil {
402-
go p.cancelActiveJob(fmt.Sprintf("provision: %s", err))
408+
go p.cancelActiveJobf("provision: %s", err)
403409
return
404410
}
405411
defer stream.Close()
406412

407413
for {
408414
msg, err := stream.Recv()
409415
if err != nil {
410-
go p.cancelActiveJob(fmt.Sprintf("recv workspace provision: %s", err))
416+
go p.cancelActiveJobf("recv workspace provision: %s", err)
411417
return
412418
}
413419
switch msgType := msg.Type.(type) {
@@ -428,7 +434,7 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
428434
}},
429435
})
430436
if err != nil {
431-
go p.cancelActiveJob(fmt.Sprintf("send job update: %s", err))
437+
go p.cancelActiveJobf("send job update: %s", err)
432438
return
433439
}
434440
case *sdkproto.Provision_Response_Complete:
@@ -450,26 +456,28 @@ func (p *provisionerDaemon) runWorkspaceProvision(ctx context.Context, provision
450456
},
451457
})
452458
if err != nil {
453-
go p.cancelActiveJob(fmt.Sprintf("complete job: %s", err))
459+
go p.cancelActiveJobf("complete job: %s", err)
454460
return
455461
}
456462
// Return so we stop looping!
457463
return
458464
default:
459-
go p.cancelActiveJob(fmt.Sprintf("invalid message type %q received from provisioner",
460-
reflect.TypeOf(msg.Type).String()))
465+
go p.cancelActiveJobf("invalid message type %q received from provisioner",
466+
reflect.TypeOf(msg.Type).String())
461467
return
462468
}
463469
}
464470
}
465471

466-
func (p *provisionerDaemon) cancelActiveJob(errMsg string) {
472+
func (p *provisionerDaemon) cancelActiveJobf(format string, args ...interface{}) {
467473
p.jobMutex.Lock()
468474
defer p.jobMutex.Unlock()
469-
if p.isClosed() {
470-
return
471-
}
475+
errMsg := fmt.Sprintf(format, args...)
472476
if !p.isRunningJob() {
477+
if p.isClosed() {
478+
// We don't want to log if we're already closed!
479+
return
480+
}
473481
p.opts.Logger.Warn(context.Background(), "skipping job cancel; none running", slog.F("error_message", errMsg))
474482
return
475483
}
@@ -512,22 +520,17 @@ func (p *provisionerDaemon) closeWithError(err error) error {
512520
if p.isClosed() {
513521
return p.closeError
514522
}
515-
p.closeCancel()
523+
p.closeError = err
524+
close(p.closed)
525+
516526
errMsg := "provisioner daemon was shutdown gracefully"
517527
if err != nil {
518528
errMsg = err.Error()
519529
}
520-
p.cancelActiveJob(errMsg)
521-
p.jobMutex.Lock()
522-
defer p.jobMutex.Unlock()
523-
p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err))
524-
p.closeError = err
525-
close(p.closed)
530+
p.cancelActiveJobf(errMsg)
531+
p.closeCancel()
526532

527-
if p.updateStream != nil {
528-
_ = p.client.DRPCConn().Close()
529-
_ = p.updateStream.Close()
530-
}
533+
p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err))
531534

532535
return err
533536
}

site/.eslintignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ node_modules
55
vendor
66
out
77
coverage
8-
.next
8+
.next
9+
storybook-static

site/.prettierignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ yarn-error.log
1414
.next/
1515
coverage/
1616
out/
17+
storybook-static/

0 commit comments

Comments
 (0)