Skip to content

Commit ccc553d

Browse files
committed
fix: fetch provisioner logs after end of logs message
I think this should _actually_ fix it. See https://github.com/coder/coder/actions/runs/4358242625/jobs/7618562167 The problem was that this loop is ran async so even though messages are sent in order, we were processing too slowly and the end of logs was published first.
1 parent bb0a996 commit ccc553d

File tree

1 file changed

+42
-40
lines changed

1 file changed

+42
-40
lines changed

coderd/provisionerjobs.go

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99
"net/http"
1010
"sort"
1111
"strconv"
12-
"sync"
1312
"time"
1413

1514
"github.com/google/uuid"
15+
"go.uber.org/atomic"
1616
"nhooyr.io/websocket"
1717

1818
"cdr.dev/slog"
@@ -374,26 +374,38 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
374374
logger := api.Logger.With(slog.F("job_id", jobID))
375375

376376
var (
377-
closed = make(chan struct{})
378-
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
379-
logMut = &sync.Mutex{}
377+
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
378+
endOfLogs atomic.Bool
379+
lastSentLogID atomic.Int64
380380
)
381+
382+
sendLog := func(log *database.ProvisionerJobLog) {
383+
select {
384+
case bufferedLogs <- log:
385+
logger.Debug(context.Background(), "subscribe buffered log", slog.F("stage", log.Stage))
386+
lastSentLogID.Store(log.ID)
387+
default:
388+
// If this overflows users could miss logs streaming. This can happen
389+
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
390+
// so just drop them.
391+
logger.Warn(context.Background(), "provisioner job log overflowing channel")
392+
}
393+
}
394+
381395
closeSubscribe, err := api.Pubsub.Subscribe(
382396
provisionerJobLogsChannel(jobID),
383397
func(ctx context.Context, message []byte) {
384-
select {
385-
case <-closed:
398+
if endOfLogs.Load() {
386399
return
387-
default:
388400
}
389-
390401
jlMsg := provisionerJobLogsMessage{}
391402
err := json.Unmarshal(message, &jlMsg)
392403
if err != nil {
393404
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
394405
return
395406
}
396407

408+
// CreatedAfter is sent when logs are streaming!
397409
if jlMsg.CreatedAfter != 0 {
398410
logs, err := api.Database.GetProvisionerLogsByIDBetween(dbauthz.As(ctx, actor), database.GetProvisionerLogsByIDBetweenParams{
399411
JobID: jobID,
@@ -403,54 +415,44 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
403415
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
404416
return
405417
}
406-
407418
for _, log := range logs {
408-
// Sadly we have to use a mutex here because events may be
409-
// handled out of order due to golang goroutine scheduling
410-
// semantics (even though Postgres guarantees ordering of
411-
// notifications).
412-
logMut.Lock()
413-
select {
414-
case <-closed:
415-
logMut.Unlock()
419+
if endOfLogs.Load() {
420+
// An end of logs message came in while we were fetching
421+
// logs or processing them!
416422
return
417-
default:
418423
}
419424
log := log
420-
select {
421-
case bufferedLogs <- &log:
422-
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
423-
default:
424-
// If this overflows users could miss logs streaming. This can happen
425-
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
426-
// so just drop them.
427-
logger.Warn(ctx, "provisioner job log overflowing channel")
428-
}
429-
logMut.Unlock()
425+
sendLog(&log)
430426
}
431427
}
432428

429+
// EndOfLogs is sent when logs are done streaming.
430+
// We don't want to end the stream until we've sent all the logs,
431+
// so we fetch logs after the last ID we've seen and send them!
433432
if jlMsg.EndOfLogs {
434-
// This mutex is to guard double-closes.
435-
logMut.Lock()
436-
select {
437-
case <-closed:
438-
logMut.Unlock()
433+
endOfLogs.Store(true)
434+
logs, err := api.Database.GetProvisionerLogsByIDBetween(dbauthz.As(ctx, actor), database.GetProvisionerLogsByIDBetweenParams{
435+
JobID: jobID,
436+
CreatedAfter: lastSentLogID.Load(),
437+
})
438+
if err != nil {
439+
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
439440
return
440-
default:
441+
}
442+
for _, log := range logs {
443+
log := log
444+
sendLog(&log)
441445
}
442446
logger.Debug(ctx, "got End of Logs")
443447
bufferedLogs <- nil
444-
logMut.Unlock()
445448
}
449+
450+
lastSentLogID.Store(jlMsg.CreatedAfter)
446451
},
447452
)
448453
if err != nil {
449454
return nil, nil, err
450455
}
451-
return bufferedLogs, func() {
452-
closeSubscribe()
453-
close(closed)
454-
close(bufferedLogs)
455-
}, nil
456+
// We don't need to close the bufferedLogs channel because it will be garbage collected!
457+
return bufferedLogs, closeSubscribe, nil
456458
}

0 commit comments

Comments
 (0)