Skip to content

Commit 3913854

Browse files
committed
fix(agent): refacor trackScriptLogs to avoid deadlock
During agent close it was possible for the startup script logs consumer to enter a deadlock state where by agent close was waiting via `a.trackConnGoroutine` and the log reader for a flush event. This refactor removes the mutex in favor of channel communication and relies on two goroutines without shared state.
1 parent 8dac035 commit 3913854

File tree

1 file changed

+103
-100
lines changed

1 file changed

+103
-100
lines changed

agent/agent.go

Lines changed: 103 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -912,121 +912,124 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
912912
}
913913

914914
func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) {
915-
// Initialize variables for log management
916-
queuedLogs := make([]agentsdk.StartupLog, 0)
917-
var flushLogsTimer *time.Timer
918-
var logMutex sync.Mutex
919-
logsFlushed := sync.NewCond(&sync.Mutex{})
920-
var logsSending bool
921-
defer func() {
922-
logMutex.Lock()
923-
if flushLogsTimer != nil {
924-
flushLogsTimer.Stop()
925-
}
926-
logMutex.Unlock()
927-
}()
915+
// Synchronous sender, there can only be one outbound send at a time.
916+
//
917+
// It's important that we either flush or drop all logs before returning
918+
// because the startup state is reported after flush.
919+
sendDone := make(chan struct{})
920+
send := make(chan []agentsdk.StartupLog, 1)
921+
go func() {
922+
flushTimeout := 100 * time.Millisecond
923+
flush := time.NewTimer(flushTimeout)
924+
flush.Stop()
928925

929-
// sendLogs function uploads the queued logs to the server
930-
sendLogs := func() {
931-
// Lock logMutex and check if logs are already being sent
932-
logMutex.Lock()
933-
if logsSending {
934-
logMutex.Unlock()
935-
return
936-
}
937-
if flushLogsTimer != nil {
938-
flushLogsTimer.Stop()
939-
}
940-
if len(queuedLogs) == 0 {
941-
logMutex.Unlock()
942-
return
943-
}
944-
// Move the current queued logs to logsToSend and clear the queue
945-
logsToSend := queuedLogs
946-
logsSending = true
947-
queuedLogs = make([]agentsdk.StartupLog, 0)
948-
logMutex.Unlock()
949-
950-
// Retry uploading logs until successful or a specific error occurs
951-
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
952-
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
953-
Logs: logsToSend,
954-
})
955-
if err == nil {
956-
break
926+
defer func() {
927+
flush.Stop()
928+
a.logger.Debug(ctx, "track script logs sender exited")
929+
close(sendDone)
930+
}()
931+
932+
var backlog []agentsdk.StartupLog
933+
done := false
934+
for {
935+
flushed := false
936+
select {
937+
// Close (!ok) can be triggered by the reader closing due to
938+
// EOF or due to agent closing, when this happens we attempt
939+
// a final flush. If the context is canceled this will be a
940+
// no-op.
941+
case logs, ok := <-send:
942+
done = !ok
943+
if ok {
944+
backlog = append(backlog, logs...)
945+
flushed = len(backlog) >= 100
946+
}
947+
case <-flush.C:
948+
flushed = true
957949
}
958-
var sdkErr *codersdk.Error
959-
if errors.As(err, &sdkErr) {
960-
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
961-
a.logger.Warn(ctx, "startup logs too large, dropping logs")
962-
break
950+
951+
if (done || flushed) && len(backlog) > 0 {
952+
// Retry uploading logs until successful or a specific
953+
// error occurs.
954+
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
955+
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
956+
Logs: backlog,
957+
})
958+
if err == nil {
959+
break
960+
}
961+
962+
if errors.Is(err, context.Canceled) {
963+
return
964+
}
965+
var sdkErr *codersdk.Error
966+
if errors.As(err, &sdkErr) {
967+
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
968+
a.logger.Warn(ctx, "startup logs too large, dropping logs")
969+
break
970+
}
971+
}
972+
a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog))
973+
}
974+
if ctx.Err() != nil {
975+
return
963976
}
977+
backlog = nil
964978
}
965-
a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend))
966-
}
967-
// Reset logsSending flag
968-
logMutex.Lock()
969-
logsSending = false
970-
flushLogsTimer.Reset(100 * time.Millisecond)
971-
logMutex.Unlock()
972-
logsFlushed.Broadcast()
973-
}
974-
// queueLog function appends a log to the queue and triggers sendLogs if necessary
975-
queueLog := func(log agentsdk.StartupLog) {
976-
logMutex.Lock()
977-
defer logMutex.Unlock()
978-
979-
// Append log to the queue
980-
queuedLogs = append(queuedLogs, log)
981-
982-
// If there are more than 100 logs, send them immediately
983-
if len(queuedLogs) > 100 {
984-
// Don't early return after this, because we still want
985-
// to reset the timer just in case logs come in while
986-
// we're sending.
987-
go sendLogs()
988-
}
989-
// Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
990-
if flushLogsTimer != nil {
991-
flushLogsTimer.Reset(100 * time.Millisecond)
992-
return
979+
if done {
980+
return
981+
}
982+
983+
flush.Reset(flushTimeout)
993984
}
994-
flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs)
995-
}
985+
}()
996986

997-
// It's important that we either flush or drop all logs before returning
998-
// because the startup state is reported after flush.
987+
// Forward read lines to the sender or queue them for when the
988+
// sender is ready to process them.
999989
//
1000-
// It'd be weird for the startup state to be ready, but logs are still
1001-
// coming in.
1002-
logsFinished := make(chan struct{})
990+
// We only need to track this goroutine since it will ensure that
991+
// the sender has closed before returning.
992+
logsDone := make(chan struct{})
1003993
err := a.trackConnGoroutine(func() {
1004-
scanner := bufio.NewScanner(reader)
1005-
for scanner.Scan() {
1006-
queueLog(agentsdk.StartupLog{
994+
defer func() {
995+
close(send)
996+
<-sendDone
997+
a.logger.Debug(ctx, "track script logs reader exited")
998+
close(logsDone)
999+
}()
1000+
1001+
var queue []agentsdk.StartupLog
1002+
1003+
s := bufio.NewScanner(reader)
1004+
for s.Scan() {
1005+
select {
1006+
case <-ctx.Done():
1007+
return
1008+
case <-a.closed:
1009+
return
1010+
case queue = <-send:
1011+
// Not captured by sender yet, re-use.
1012+
default:
1013+
}
1014+
1015+
queue = append(queue, agentsdk.StartupLog{
10071016
CreatedAt: database.Now(),
1008-
Output: scanner.Text(),
1017+
Output: s.Text(),
10091018
})
1019+
send <- queue
1020+
queue = nil
10101021
}
1011-
if err := scanner.Err(); err != nil {
1012-
a.logger.Error(ctx, "scan startup logs", slog.Error(err))
1013-
}
1014-
defer close(logsFinished)
1015-
logsFlushed.L.Lock()
1016-
for {
1017-
logMutex.Lock()
1018-
if len(queuedLogs) == 0 {
1019-
logMutex.Unlock()
1020-
break
1021-
}
1022-
logMutex.Unlock()
1023-
logsFlushed.Wait()
1022+
if err := s.Err(); err != nil {
1023+
a.logger.Error(ctx, "scan startup logs failed", slog.Error(err))
10241024
}
10251025
})
10261026
if err != nil {
1027-
return nil, xerrors.Errorf("track conn goroutine: %w", err)
1027+
close(send)
1028+
<-sendDone
1029+
return nil, err
10281030
}
1029-
return logsFinished, nil
1031+
1032+
return logsDone, nil
10301033
}
10311034

10321035
func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {

0 commit comments

Comments
 (0)