diff --git a/coderd/provisionerjobs.go b/coderd/provisionerjobs.go index 3770cf217d0a0..7d83d3f905cfe 100644 --- a/coderd/provisionerjobs.go +++ b/coderd/provisionerjobs.go @@ -49,7 +49,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job // if we are following logs, start the subscription before we query the database, so that we don't miss any logs // between the end of our query and the start of the subscription. We might get duplicates, so we'll keep track // of processed IDs. - var bufferedLogs <-chan database.ProvisionerJobLog + var bufferedLogs <-chan *database.ProvisionerJobLog if follow { bl, closeFollow, err := api.followLogs(actor, job.ID) if err != nil { @@ -173,8 +173,9 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job logger.Debug(context.Background(), "job logs context canceled") return case log, ok := <-bufferedLogs: - if !ok { - logger.Debug(context.Background(), "done with published logs") + // A nil log is sent when complete! + if !ok || log == nil { + logger.Debug(context.Background(), "reached the end of published logs") return } if logIdsDone[log.ID] { @@ -183,7 +184,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job } else { logger.Debug(ctx, "subscribe encoding log", slog.F("stage", log.Stage)) - err = encoder.Encode(convertProvisionerJobLog(log)) + err = encoder.Encode(convertProvisionerJobLog(*log)) if err != nil { return } @@ -369,12 +370,12 @@ type provisionerJobLogsMessage struct { EndOfLogs bool `json:"end_of_logs,omitempty"` } -func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) { +func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *database.ProvisionerJobLog, func(), error) { logger := api.Logger.With(slog.F("job_id", jobID)) var ( closed = make(chan struct{}) - bufferedLogs = make(chan database.ProvisionerJobLog, 128) + bufferedLogs = make(chan *database.ProvisionerJobLog, 128) logMut = &sync.Mutex{} ) closeSubscribe, err := api.Pubsub.Subscribe( @@ -415,9 +416,9 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database return default: } - + log := log select { - case bufferedLogs <- log: + case bufferedLogs <- &log: logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage)) default: // If this overflows users could miss logs streaming. This can happen @@ -439,9 +440,7 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database default: } logger.Debug(ctx, "got End of Logs") - - close(closed) - close(bufferedLogs) + bufferedLogs <- nil logMut.Unlock() } }, @@ -449,5 +448,9 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database if err != nil { return nil, nil, err } - return bufferedLogs, closeSubscribe, nil + return bufferedLogs, func() { + closeSubscribe() + close(closed) + close(bufferedLogs) + }, nil }