Skip to content

Commit fe53927

Browse files
committed
fix(agent): refacor trackScriptLogs to avoid deadlock
During agent close it was possible for the startup script logs consumer to enter a deadlock state where by agent close was waiting via `a.trackConnGoroutine` and the log reader for a flush event. This refactor removes the mutex in favor of channel communication and relies on two goroutines without shared state.
1 parent 73f19d6 commit fe53927

File tree

1 file changed

+103
-100
lines changed

1 file changed

+103
-100
lines changed

agent/agent.go

Lines changed: 103 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -913,121 +913,124 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
913913
}
914914

915915
func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) {
916-
// Initialize variables for log management
917-
queuedLogs := make([]agentsdk.StartupLog, 0)
918-
var flushLogsTimer *time.Timer
919-
var logMutex sync.Mutex
920-
logsFlushed := sync.NewCond(&sync.Mutex{})
921-
var logsSending bool
922-
defer func() {
923-
logMutex.Lock()
924-
if flushLogsTimer != nil {
925-
flushLogsTimer.Stop()
926-
}
927-
logMutex.Unlock()
928-
}()
916+
// Synchronous sender, there can only be one outbound send at a time.
917+
//
918+
// It's important that we either flush or drop all logs before returning
919+
// because the startup state is reported after flush.
920+
sendDone := make(chan struct{})
921+
send := make(chan []agentsdk.StartupLog, 1)
922+
go func() {
923+
flushTimeout := 100 * time.Millisecond
924+
flush := time.NewTimer(flushTimeout)
925+
flush.Stop()
929926

930-
// sendLogs function uploads the queued logs to the server
931-
sendLogs := func() {
932-
// Lock logMutex and check if logs are already being sent
933-
logMutex.Lock()
934-
if logsSending {
935-
logMutex.Unlock()
936-
return
937-
}
938-
if flushLogsTimer != nil {
939-
flushLogsTimer.Stop()
940-
}
941-
if len(queuedLogs) == 0 {
942-
logMutex.Unlock()
943-
return
944-
}
945-
// Move the current queued logs to logsToSend and clear the queue
946-
logsToSend := queuedLogs
947-
logsSending = true
948-
queuedLogs = make([]agentsdk.StartupLog, 0)
949-
logMutex.Unlock()
950-
951-
// Retry uploading logs until successful or a specific error occurs
952-
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
953-
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
954-
Logs: logsToSend,
955-
})
956-
if err == nil {
957-
break
927+
defer func() {
928+
flush.Stop()
929+
a.logger.Debug(ctx, "track script logs sender exited")
930+
close(sendDone)
931+
}()
932+
933+
var backlog []agentsdk.StartupLog
934+
done := false
935+
for {
936+
flushed := false
937+
select {
938+
case <-ctx.Done():
939+
return
940+
case <-a.closed:
941+
return
942+
case logs, ok := <-send:
943+
done = !ok
944+
if ok {
945+
backlog = append(backlog, logs...)
946+
flushed = len(backlog) >= 100
947+
}
948+
case <-flush.C:
949+
flushed = true
958950
}
959-
var sdkErr *codersdk.Error
960-
if errors.As(err, &sdkErr) {
961-
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
962-
a.logger.Warn(ctx, "startup logs too large, dropping logs")
963-
break
951+
952+
if (done || flushed) && len(backlog) > 0 {
953+
// Retry uploading logs until successful or a specific
954+
// error occurs.
955+
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
956+
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
957+
Logs: backlog,
958+
})
959+
if err == nil {
960+
break
961+
}
962+
963+
if errors.Is(err, context.Canceled) {
964+
return
965+
}
966+
var sdkErr *codersdk.Error
967+
if errors.As(err, &sdkErr) {
968+
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
969+
a.logger.Warn(ctx, "startup logs too large, dropping logs")
970+
break
971+
}
972+
}
973+
a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog))
974+
}
975+
if ctx.Err() != nil {
976+
return
964977
}
978+
backlog = nil
965979
}
966-
a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend))
967-
}
968-
// Reset logsSending flag
969-
logMutex.Lock()
970-
logsSending = false
971-
flushLogsTimer.Reset(100 * time.Millisecond)
972-
logMutex.Unlock()
973-
logsFlushed.Broadcast()
974-
}
975-
// queueLog function appends a log to the queue and triggers sendLogs if necessary
976-
queueLog := func(log agentsdk.StartupLog) {
977-
logMutex.Lock()
978-
defer logMutex.Unlock()
979-
980-
// Append log to the queue
981-
queuedLogs = append(queuedLogs, log)
982-
983-
// If there are more than 100 logs, send them immediately
984-
if len(queuedLogs) > 100 {
985-
// Don't early return after this, because we still want
986-
// to reset the timer just in case logs come in while
987-
// we're sending.
988-
go sendLogs()
989-
}
990-
// Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
991-
if flushLogsTimer != nil {
992-
flushLogsTimer.Reset(100 * time.Millisecond)
993-
return
980+
if done {
981+
return
982+
}
983+
984+
flush.Reset(flushTimeout)
994985
}
995-
flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs)
996-
}
986+
}()
997987

998-
// It's important that we either flush or drop all logs before returning
999-
// because the startup state is reported after flush.
988+
// Forward read lines to the sender or queue them for when the
989+
// sender is ready to process them.
1000990
//
1001-
// It'd be weird for the startup state to be ready, but logs are still
1002-
// coming in.
1003-
logsFinished := make(chan struct{})
991+
// We only need to track this goroutine since it will ensure that
992+
// the sender has closed before returning.
993+
logsDone := make(chan struct{})
1004994
err := a.trackConnGoroutine(func() {
1005-
scanner := bufio.NewScanner(reader)
1006-
for scanner.Scan() {
1007-
queueLog(agentsdk.StartupLog{
995+
defer func() {
996+
close(send)
997+
<-sendDone
998+
a.logger.Debug(ctx, "track script logs reader exited")
999+
close(logsDone)
1000+
}()
1001+
1002+
var queue []agentsdk.StartupLog
1003+
1004+
s := bufio.NewScanner(reader)
1005+
for s.Scan() {
1006+
select {
1007+
case <-ctx.Done():
1008+
return
1009+
case <-a.closed:
1010+
return
1011+
case queue = <-send:
1012+
// Not captured by sender yet, re-use.
1013+
default:
1014+
}
1015+
1016+
queue = append(queue, agentsdk.StartupLog{
10081017
CreatedAt: database.Now(),
1009-
Output: scanner.Text(),
1018+
Output: s.Text(),
10101019
})
1020+
send <- queue
1021+
queue = nil
10111022
}
1012-
if err := scanner.Err(); err != nil {
1013-
a.logger.Error(ctx, "scan startup logs", slog.Error(err))
1014-
}
1015-
defer close(logsFinished)
1016-
logsFlushed.L.Lock()
1017-
for {
1018-
logMutex.Lock()
1019-
if len(queuedLogs) == 0 {
1020-
logMutex.Unlock()
1021-
break
1022-
}
1023-
logMutex.Unlock()
1024-
logsFlushed.Wait()
1023+
if err := s.Err(); err != nil {
1024+
a.logger.Error(ctx, "scan startup logs failed", slog.Error(err))
10251025
}
10261026
})
10271027
if err != nil {
1028-
return nil, xerrors.Errorf("track conn goroutine: %w", err)
1028+
close(send)
1029+
<-sendDone
1030+
return nil, err
10291031
}
1030-
return logsFinished, nil
1032+
1033+
return logsDone, nil
10311034
}
10321035

10331036
func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {

0 commit comments

Comments
 (0)