Skip to content

fix(agent): refactor trackScriptLogs to avoid deadlock #8084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 120 additions & 101 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,122 +911,141 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
return nil
}

func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) {
// Initialize variables for log management
queuedLogs := make([]agentsdk.StartupLog, 0)
var flushLogsTimer *time.Timer
var logMutex sync.Mutex
logsFlushed := sync.NewCond(&sync.Mutex{})
var logsSending bool
defer func() {
logMutex.Lock()
if flushLogsTimer != nil {
flushLogsTimer.Stop()
}
logMutex.Unlock()
}()
func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan struct{}, error) {
// Synchronous sender, there can only be one outbound send at a time.
//
// It's important that we either flush or drop all logs before returning
// because the startup state is reported after flush.
sendDone := make(chan struct{})
send := make(chan []agentsdk.StartupLog, 1)
go func() {
// Set flushTimeout and backlogLimit so that logs are uploaded
// once every 250ms or when 100 logs have been added to the
// backlog, whichever comes first.
flushTimeout := 250 * time.Millisecond
backlogLimit := 100

// sendLogs function uploads the queued logs to the server
sendLogs := func() {
// Lock logMutex and check if logs are already being sent
logMutex.Lock()
if logsSending {
logMutex.Unlock()
return
}
if flushLogsTimer != nil {
flushLogsTimer.Stop()
}
if len(queuedLogs) == 0 {
logMutex.Unlock()
return
}
// Move the current queued logs to logsToSend and clear the queue
logsToSend := queuedLogs
logsSending = true
queuedLogs = make([]agentsdk.StartupLog, 0)
logMutex.Unlock()

// Retry uploading logs until successful or a specific error occurs
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
Logs: logsToSend,
})
if err == nil {
break
flush := time.NewTicker(flushTimeout)

var backlog []agentsdk.StartupLog
defer func() {
flush.Stop()
_ = reader.Close() // Ensure read routine is closed.
if len(backlog) > 0 {
a.logger.Debug(ctx, "track script logs sender exiting, discarding logs", slog.F("discarded_logs_count", len(backlog)))
}
var sdkErr *codersdk.Error
if errors.As(err, &sdkErr) {
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
a.logger.Warn(ctx, "startup logs too large, dropping logs")
break
a.logger.Debug(ctx, "track script logs sender exited")
close(sendDone)
}()

done := false
for {
flushed := false
select {
case <-ctx.Done():
return
case <-a.closed:
return
// Close (!ok) can be triggered by the reader closing due to
// EOF or due to agent closing, when this happens we attempt
// a final flush. If the context is canceled this will be a
// no-op.
case logs, ok := <-send:
done = !ok
if ok {
backlog = append(backlog, logs...)
flushed = len(backlog) >= backlogLimit
}
case <-flush.C:
flushed = true
}

if (done || flushed) && len(backlog) > 0 {
flush.Stop() // Lower the chance of a double flush.

// Retry uploading logs until successful or a specific
// error occurs.
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
Logs: backlog,
})
if err == nil {
break
}

if errors.Is(err, context.Canceled) {
return
}
var sdkErr *codersdk.Error
if errors.As(err, &sdkErr) {
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
a.logger.Warn(ctx, "startup logs too large, dropping logs")
break
}
}
a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog))
}
if ctx.Err() != nil {
return
}
backlog = nil

// Anchor flush to the last log upload.
flush.Reset(flushTimeout)
}
if done {
return
}
a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend))
}
// Reset logsSending flag
logMutex.Lock()
logsSending = false
flushLogsTimer.Reset(100 * time.Millisecond)
logMutex.Unlock()
logsFlushed.Broadcast()
}
// queueLog function appends a log to the queue and triggers sendLogs if necessary
queueLog := func(log agentsdk.StartupLog) {
logMutex.Lock()
defer logMutex.Unlock()

// Append log to the queue
queuedLogs = append(queuedLogs, log)

// If there are more than 100 logs, send them immediately
if len(queuedLogs) > 100 {
// Don't early return after this, because we still want
// to reset the timer just in case logs come in while
// we're sending.
go sendLogs()
}
// Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
if flushLogsTimer != nil {
flushLogsTimer.Reset(100 * time.Millisecond)
return
}
flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs)
}
}()

// It's important that we either flush or drop all logs before returning
// because the startup state is reported after flush.
// Forward read lines to the sender or queue them for when the
// sender is ready to process them.
//
// It'd be weird for the startup state to be ready, but logs are still
// coming in.
logsFinished := make(chan struct{})
// We only need to track this goroutine since it will ensure that
// the sender has closed before returning.
logsDone := make(chan struct{})
err := a.trackConnGoroutine(func() {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
queueLog(agentsdk.StartupLog{
defer func() {
close(send)
<-sendDone
a.logger.Debug(ctx, "track script logs reader exited")
close(logsDone)
}()

var queue []agentsdk.StartupLog

s := bufio.NewScanner(reader)
for s.Scan() {
select {
case <-ctx.Done():
return
case <-a.closed:
return
case queue = <-send:
// Not captured by sender yet, re-use.
default:
}

queue = append(queue, agentsdk.StartupLog{
CreatedAt: database.Now(),
Output: scanner.Text(),
Output: s.Text(),
})
send <- queue
queue = nil
}
if err := scanner.Err(); err != nil {
a.logger.Error(ctx, "scan startup logs", slog.Error(err))
}
defer close(logsFinished)
logsFlushed.L.Lock()
for {
logMutex.Lock()
if len(queuedLogs) == 0 {
logMutex.Unlock()
break
}
logMutex.Unlock()
logsFlushed.Wait()
if err := s.Err(); err != nil {
a.logger.Warn(ctx, "scan startup logs ended unexpectedly", slog.Error(err))
}
})
if err != nil {
return nil, xerrors.Errorf("track conn goroutine: %w", err)
close(send)
<-sendDone
close(logsDone)
return logsDone, err
}
return logsFinished, nil

return logsDone, nil
}

func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {
Expand Down