Skip to content

Commit 524b14a

Browse files
authored
fix: fetch provisioner logs after end of logs message (coder#6495)
I think this should _actually_ fix it. See https://github.com/coder/coder/actions/runs/4358242625/jobs/7618562167 The problem was that this loop is ran async so even though messages are sent in order, we were processing too slowly and the end of logs was published first.
1 parent 26a725f commit 524b14a

File tree

1 file changed

+42
-40
lines changed

1 file changed

+42
-40
lines changed

coderd/provisionerjobs.go

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99
"net/http"
1010
"sort"
1111
"strconv"
12-
"sync"
1312
"time"
1413

1514
"github.com/google/uuid"
15+
"go.uber.org/atomic"
1616
"nhooyr.io/websocket"
1717

1818
"cdr.dev/slog"
@@ -377,26 +377,38 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
377377
logger := api.Logger.With(slog.F("job_id", jobID))
378378

379379
var (
380-
closed = make(chan struct{})
381-
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
382-
logMut = &sync.Mutex{}
380+
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
381+
endOfLogs atomic.Bool
382+
lastSentLogID atomic.Int64
383383
)
384+
385+
sendLog := func(log *database.ProvisionerJobLog) {
386+
select {
387+
case bufferedLogs <- log:
388+
logger.Debug(context.Background(), "subscribe buffered log", slog.F("stage", log.Stage))
389+
lastSentLogID.Store(log.ID)
390+
default:
391+
// If this overflows users could miss logs streaming. This can happen
392+
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
393+
// so just drop them.
394+
logger.Warn(context.Background(), "provisioner job log overflowing channel")
395+
}
396+
}
397+
384398
closeSubscribe, err := api.Pubsub.Subscribe(
385399
provisionerJobLogsChannel(jobID),
386400
func(ctx context.Context, message []byte) {
387-
select {
388-
case <-closed:
401+
if endOfLogs.Load() {
389402
return
390-
default:
391403
}
392-
393404
jlMsg := provisionerJobLogsMessage{}
394405
err := json.Unmarshal(message, &jlMsg)
395406
if err != nil {
396407
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
397408
return
398409
}
399410

411+
// CreatedAfter is sent when logs are streaming!
400412
if jlMsg.CreatedAfter != 0 {
401413
logs, err := api.Database.GetProvisionerLogsByIDBetween(dbauthz.As(ctx, actor), database.GetProvisionerLogsByIDBetweenParams{
402414
JobID: jobID,
@@ -406,54 +418,44 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
406418
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
407419
return
408420
}
409-
410421
for _, log := range logs {
411-
// Sadly we have to use a mutex here because events may be
412-
// handled out of order due to golang goroutine scheduling
413-
// semantics (even though Postgres guarantees ordering of
414-
// notifications).
415-
logMut.Lock()
416-
select {
417-
case <-closed:
418-
logMut.Unlock()
422+
if endOfLogs.Load() {
423+
// An end of logs message came in while we were fetching
424+
// logs or processing them!
419425
return
420-
default:
421426
}
422427
log := log
423-
select {
424-
case bufferedLogs <- &log:
425-
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
426-
default:
427-
// If this overflows users could miss logs streaming. This can happen
428-
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
429-
// so just drop them.
430-
logger.Warn(ctx, "provisioner job log overflowing channel")
431-
}
432-
logMut.Unlock()
428+
sendLog(&log)
433429
}
434430
}
435431

432+
// EndOfLogs is sent when logs are done streaming.
433+
// We don't want to end the stream until we've sent all the logs,
434+
// so we fetch logs after the last ID we've seen and send them!
436435
if jlMsg.EndOfLogs {
437-
// This mutex is to guard double-closes.
438-
logMut.Lock()
439-
select {
440-
case <-closed:
441-
logMut.Unlock()
436+
endOfLogs.Store(true)
437+
logs, err := api.Database.GetProvisionerLogsByIDBetween(dbauthz.As(ctx, actor), database.GetProvisionerLogsByIDBetweenParams{
438+
JobID: jobID,
439+
CreatedAfter: lastSentLogID.Load(),
440+
})
441+
if err != nil {
442+
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
442443
return
443-
default:
444+
}
445+
for _, log := range logs {
446+
log := log
447+
sendLog(&log)
444448
}
445449
logger.Debug(ctx, "got End of Logs")
446450
bufferedLogs <- nil
447-
logMut.Unlock()
448451
}
452+
453+
lastSentLogID.Store(jlMsg.CreatedAfter)
449454
},
450455
)
451456
if err != nil {
452457
return nil, nil, err
453458
}
454-
return bufferedLogs, func() {
455-
closeSubscribe()
456-
close(closed)
457-
close(bufferedLogs)
458-
}, nil
459+
// We don't need to close the bufferedLogs channel because it will be garbage collected!
460+
return bufferedLogs, closeSubscribe, nil
459461
}

0 commit comments

Comments
 (0)