Skip to content

fix: fetch provisioner logs after end of logs message #6495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 42 additions & 40 deletions coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"net/http"
"sort"
"strconv"
"sync"
"time"

"github.com/google/uuid"
"go.uber.org/atomic"
"nhooyr.io/websocket"

"cdr.dev/slog"
Expand Down Expand Up @@ -374,26 +374,38 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
logger := api.Logger.With(slog.F("job_id", jobID))

var (
closed = make(chan struct{})
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
logMut = &sync.Mutex{}
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
endOfLogs atomic.Bool
lastSentLogID atomic.Int64
)

sendLog := func(log *database.ProvisionerJobLog) {
select {
case bufferedLogs <- log:
logger.Debug(context.Background(), "subscribe buffered log", slog.F("stage", log.Stage))
lastSentLogID.Store(log.ID)
default:
// If this overflows users could miss logs streaming. This can happen
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
// so just drop them.
logger.Warn(context.Background(), "provisioner job log overflowing channel")
}
}

closeSubscribe, err := api.Pubsub.Subscribe(
provisionerJobLogsChannel(jobID),
func(ctx context.Context, message []byte) {
select {
case <-closed:
if endOfLogs.Load() {
return
default:
}

jlMsg := provisionerJobLogsMessage{}
err := json.Unmarshal(message, &jlMsg)
if err != nil {
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
return
}

// CreatedAfter is sent when logs are streaming!
if jlMsg.CreatedAfter != 0 {
logs, err := api.Database.GetProvisionerLogsByIDBetween(dbauthz.As(ctx, actor), database.GetProvisionerLogsByIDBetweenParams{
JobID: jobID,
Expand All @@ -403,54 +415,44 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
return
}

for _, log := range logs {
// Sadly we have to use a mutex here because events may be
// handled out of order due to golang goroutine scheduling
// semantics (even though Postgres guarantees ordering of
// notifications).
logMut.Lock()
select {
case <-closed:
logMut.Unlock()
if endOfLogs.Load() {
// An end of logs message came in while we were fetching
// logs or processing them!
return
default:
}
log := log
select {
case bufferedLogs <- &log:
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
default:
// If this overflows users could miss logs streaming. This can happen
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
// so just drop them.
logger.Warn(ctx, "provisioner job log overflowing channel")
}
logMut.Unlock()
sendLog(&log)
}
}

// EndOfLogs is sent when logs are done streaming.
// We don't want to end the stream until we've sent all the logs,
// so we fetch logs after the last ID we've seen and send them!
if jlMsg.EndOfLogs {
// This mutex is to guard double-closes.
logMut.Lock()
select {
case <-closed:
logMut.Unlock()
endOfLogs.Store(true)
logs, err := api.Database.GetProvisionerLogsByIDBetween(dbauthz.As(ctx, actor), database.GetProvisionerLogsByIDBetweenParams{
JobID: jobID,
CreatedAfter: lastSentLogID.Load(),
})
if err != nil {
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
return
default:
}
for _, log := range logs {
log := log
sendLog(&log)
}
logger.Debug(ctx, "got End of Logs")
bufferedLogs <- nil
logMut.Unlock()
}

lastSentLogID.Store(jlMsg.CreatedAfter)
},
)
if err != nil {
return nil, nil, err
}
return bufferedLogs, func() {
closeSubscribe()
close(closed)
close(bufferedLogs)
}, nil
// We don't need to close the bufferedLogs channel because it will be garbage collected!
return bufferedLogs, closeSubscribe, nil
}