Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix(agent): refacor trackScriptLogs to avoid deadlock
During agent close it was possible for the startup script logs consumer
to enter a deadlock state where by agent close was waiting via
`a.trackConnGoroutine` and the log reader for a flush event.

This refactor removes the mutex in favor of channel communication and
relies on two goroutines without shared state.
  • Loading branch information
mafredri committed Jun 20, 2023
commit 391385424beb8fd50f7c5f97c84295f08a7c4529
203 changes: 103 additions & 100 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,121 +912,124 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
}

func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) {
// Initialize variables for log management
queuedLogs := make([]agentsdk.StartupLog, 0)
var flushLogsTimer *time.Timer
var logMutex sync.Mutex
logsFlushed := sync.NewCond(&sync.Mutex{})
var logsSending bool
defer func() {
logMutex.Lock()
if flushLogsTimer != nil {
flushLogsTimer.Stop()
}
logMutex.Unlock()
}()
// Synchronous sender, there can only be one outbound send at a time.
//
// It's important that we either flush or drop all logs before returning
// because the startup state is reported after flush.
sendDone := make(chan struct{})
send := make(chan []agentsdk.StartupLog, 1)
go func() {
flushTimeout := 100 * time.Millisecond
flush := time.NewTimer(flushTimeout)
flush.Stop()

// sendLogs function uploads the queued logs to the server
sendLogs := func() {
// Lock logMutex and check if logs are already being sent
logMutex.Lock()
if logsSending {
logMutex.Unlock()
return
}
if flushLogsTimer != nil {
flushLogsTimer.Stop()
}
if len(queuedLogs) == 0 {
logMutex.Unlock()
return
}
// Move the current queued logs to logsToSend and clear the queue
logsToSend := queuedLogs
logsSending = true
queuedLogs = make([]agentsdk.StartupLog, 0)
logMutex.Unlock()

// Retry uploading logs until successful or a specific error occurs
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
Logs: logsToSend,
})
if err == nil {
break
defer func() {
flush.Stop()
a.logger.Debug(ctx, "track script logs sender exited")
close(sendDone)
}()

var backlog []agentsdk.StartupLog
done := false
for {
flushed := false
select {
// Close (!ok) can be triggered by the reader closing due to
// EOF or due to agent closing, when this happens we attempt
// a final flush. If the context is canceled this will be a
// no-op.
case logs, ok := <-send:
done = !ok
if ok {
backlog = append(backlog, logs...)
flushed = len(backlog) >= 100
}
case <-flush.C:
flushed = true
}
var sdkErr *codersdk.Error
if errors.As(err, &sdkErr) {
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
a.logger.Warn(ctx, "startup logs too large, dropping logs")
break

if (done || flushed) && len(backlog) > 0 {
// Retry uploading logs until successful or a specific
// error occurs.
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
Logs: backlog,
})
if err == nil {
break
}

if errors.Is(err, context.Canceled) {
return
}
var sdkErr *codersdk.Error
if errors.As(err, &sdkErr) {
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
a.logger.Warn(ctx, "startup logs too large, dropping logs")
break
}
}
a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog))
}
if ctx.Err() != nil {
return
}
backlog = nil
}
a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend))
}
// Reset logsSending flag
logMutex.Lock()
logsSending = false
flushLogsTimer.Reset(100 * time.Millisecond)
logMutex.Unlock()
logsFlushed.Broadcast()
}
// queueLog function appends a log to the queue and triggers sendLogs if necessary
queueLog := func(log agentsdk.StartupLog) {
logMutex.Lock()
defer logMutex.Unlock()

// Append log to the queue
queuedLogs = append(queuedLogs, log)

// If there are more than 100 logs, send them immediately
if len(queuedLogs) > 100 {
// Don't early return after this, because we still want
// to reset the timer just in case logs come in while
// we're sending.
go sendLogs()
}
// Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
if flushLogsTimer != nil {
flushLogsTimer.Reset(100 * time.Millisecond)
return
if done {
return
}

flush.Reset(flushTimeout)
}
flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs)
}
}()

// It's important that we either flush or drop all logs before returning
// because the startup state is reported after flush.
// Forward read lines to the sender or queue them for when the
// sender is ready to process them.
//
// It'd be weird for the startup state to be ready, but logs are still
// coming in.
logsFinished := make(chan struct{})
// We only need to track this goroutine since it will ensure that
// the sender has closed before returning.
logsDone := make(chan struct{})
err := a.trackConnGoroutine(func() {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
queueLog(agentsdk.StartupLog{
defer func() {
close(send)
<-sendDone
a.logger.Debug(ctx, "track script logs reader exited")
close(logsDone)
}()

var queue []agentsdk.StartupLog

s := bufio.NewScanner(reader)
for s.Scan() {
select {
case <-ctx.Done():
return
case <-a.closed:
return
case queue = <-send:
// Not captured by sender yet, re-use.
default:
}

queue = append(queue, agentsdk.StartupLog{
CreatedAt: database.Now(),
Output: scanner.Text(),
Output: s.Text(),
})
send <- queue
queue = nil
}
if err := scanner.Err(); err != nil {
a.logger.Error(ctx, "scan startup logs", slog.Error(err))
}
defer close(logsFinished)
logsFlushed.L.Lock()
for {
logMutex.Lock()
if len(queuedLogs) == 0 {
logMutex.Unlock()
break
}
logMutex.Unlock()
logsFlushed.Wait()
if err := s.Err(); err != nil {
a.logger.Error(ctx, "scan startup logs failed", slog.Error(err))
}
})
if err != nil {
return nil, xerrors.Errorf("track conn goroutine: %w", err)
close(send)
<-sendDone
return nil, err
}
return logsFinished, nil

return logsDone, nil
}

func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {
Expand Down