Skip to content

Commit bb0a996

Browse files
authored
chore: fix buffered provisioner job logs close flake (#6492)
See https://github.com/coder/coder/actions/runs/4357599919/jobs/7617111287
1 parent 1bdd2ab commit bb0a996

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

coderd/provisionerjobs.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
4949
// if we are following logs, start the subscription before we query the database, so that we don't miss any logs
5050
// between the end of our query and the start of the subscription. We might get duplicates, so we'll keep track
5151
// of processed IDs.
52-
var bufferedLogs <-chan database.ProvisionerJobLog
52+
var bufferedLogs <-chan *database.ProvisionerJobLog
5353
if follow {
5454
bl, closeFollow, err := api.followLogs(actor, job.ID)
5555
if err != nil {
@@ -173,8 +173,9 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
173173
logger.Debug(context.Background(), "job logs context canceled")
174174
return
175175
case log, ok := <-bufferedLogs:
176-
if !ok {
177-
logger.Debug(context.Background(), "done with published logs")
176+
// A nil log is sent when complete!
177+
if !ok || log == nil {
178+
logger.Debug(context.Background(), "reached the end of published logs")
178179
return
179180
}
180181
if logIdsDone[log.ID] {
@@ -183,7 +184,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
183184
} else {
184185
logger.Debug(ctx, "subscribe encoding log",
185186
slog.F("stage", log.Stage))
186-
err = encoder.Encode(convertProvisionerJobLog(log))
187+
err = encoder.Encode(convertProvisionerJobLog(*log))
187188
if err != nil {
188189
return
189190
}
@@ -369,12 +370,12 @@ type provisionerJobLogsMessage struct {
369370
EndOfLogs bool `json:"end_of_logs,omitempty"`
370371
}
371372

372-
func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) {
373+
func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *database.ProvisionerJobLog, func(), error) {
373374
logger := api.Logger.With(slog.F("job_id", jobID))
374375

375376
var (
376377
closed = make(chan struct{})
377-
bufferedLogs = make(chan database.ProvisionerJobLog, 128)
378+
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
378379
logMut = &sync.Mutex{}
379380
)
380381
closeSubscribe, err := api.Pubsub.Subscribe(
@@ -415,9 +416,9 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database
415416
return
416417
default:
417418
}
418-
419+
log := log
419420
select {
420-
case bufferedLogs <- log:
421+
case bufferedLogs <- &log:
421422
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
422423
default:
423424
// If this overflows users could miss logs streaming. This can happen
@@ -439,15 +440,17 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database
439440
default:
440441
}
441442
logger.Debug(ctx, "got End of Logs")
442-
443-
close(closed)
444-
close(bufferedLogs)
443+
bufferedLogs <- nil
445444
logMut.Unlock()
446445
}
447446
},
448447
)
449448
if err != nil {
450449
return nil, nil, err
451450
}
452-
return bufferedLogs, closeSubscribe, nil
451+
return bufferedLogs, func() {
452+
closeSubscribe()
453+
close(closed)
454+
close(bufferedLogs)
455+
}, nil
453456
}

0 commit comments

Comments
 (0)