Skip to content

Commit ea4b7d6

Browse files
authored
fix(agent): refactor trackScriptLogs to avoid deadlock (#8084)
During agent close it was possible for the startup script logs consumer to enter a deadlock state where by agent close was waiting via `a.trackConnGoroutine` and the log reader for a flush event. This refactor removes the mutex in favor of channel communication and relies on two goroutines without shared state.
1 parent adf14f1 commit ea4b7d6

File tree

1 file changed

+120
-101
lines changed

1 file changed

+120
-101
lines changed

agent/agent.go

+120-101
Original file line numberDiff line numberDiff line change
@@ -911,122 +911,141 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
911911
return nil
912912
}
913913

914-
func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) {
915-
// Initialize variables for log management
916-
queuedLogs := make([]agentsdk.StartupLog, 0)
917-
var flushLogsTimer *time.Timer
918-
var logMutex sync.Mutex
919-
logsFlushed := sync.NewCond(&sync.Mutex{})
920-
var logsSending bool
921-
defer func() {
922-
logMutex.Lock()
923-
if flushLogsTimer != nil {
924-
flushLogsTimer.Stop()
925-
}
926-
logMutex.Unlock()
927-
}()
914+
func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan struct{}, error) {
915+
// Synchronous sender, there can only be one outbound send at a time.
916+
//
917+
// It's important that we either flush or drop all logs before returning
918+
// because the startup state is reported after flush.
919+
sendDone := make(chan struct{})
920+
send := make(chan []agentsdk.StartupLog, 1)
921+
go func() {
922+
// Set flushTimeout and backlogLimit so that logs are uploaded
923+
// once every 250ms or when 100 logs have been added to the
924+
// backlog, whichever comes first.
925+
flushTimeout := 250 * time.Millisecond
926+
backlogLimit := 100
928927

929-
// sendLogs function uploads the queued logs to the server
930-
sendLogs := func() {
931-
// Lock logMutex and check if logs are already being sent
932-
logMutex.Lock()
933-
if logsSending {
934-
logMutex.Unlock()
935-
return
936-
}
937-
if flushLogsTimer != nil {
938-
flushLogsTimer.Stop()
939-
}
940-
if len(queuedLogs) == 0 {
941-
logMutex.Unlock()
942-
return
943-
}
944-
// Move the current queued logs to logsToSend and clear the queue
945-
logsToSend := queuedLogs
946-
logsSending = true
947-
queuedLogs = make([]agentsdk.StartupLog, 0)
948-
logMutex.Unlock()
949-
950-
// Retry uploading logs until successful or a specific error occurs
951-
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
952-
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
953-
Logs: logsToSend,
954-
})
955-
if err == nil {
956-
break
928+
flush := time.NewTicker(flushTimeout)
929+
930+
var backlog []agentsdk.StartupLog
931+
defer func() {
932+
flush.Stop()
933+
_ = reader.Close() // Ensure read routine is closed.
934+
if len(backlog) > 0 {
935+
a.logger.Debug(ctx, "track script logs sender exiting, discarding logs", slog.F("discarded_logs_count", len(backlog)))
957936
}
958-
var sdkErr *codersdk.Error
959-
if errors.As(err, &sdkErr) {
960-
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
961-
a.logger.Warn(ctx, "startup logs too large, dropping logs")
962-
break
937+
a.logger.Debug(ctx, "track script logs sender exited")
938+
close(sendDone)
939+
}()
940+
941+
done := false
942+
for {
943+
flushed := false
944+
select {
945+
case <-ctx.Done():
946+
return
947+
case <-a.closed:
948+
return
949+
// Close (!ok) can be triggered by the reader closing due to
950+
// EOF or due to agent closing, when this happens we attempt
951+
// a final flush. If the context is canceled this will be a
952+
// no-op.
953+
case logs, ok := <-send:
954+
done = !ok
955+
if ok {
956+
backlog = append(backlog, logs...)
957+
flushed = len(backlog) >= backlogLimit
963958
}
959+
case <-flush.C:
960+
flushed = true
961+
}
962+
963+
if (done || flushed) && len(backlog) > 0 {
964+
flush.Stop() // Lower the chance of a double flush.
965+
966+
// Retry uploading logs until successful or a specific
967+
// error occurs.
968+
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
969+
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
970+
Logs: backlog,
971+
})
972+
if err == nil {
973+
break
974+
}
975+
976+
if errors.Is(err, context.Canceled) {
977+
return
978+
}
979+
var sdkErr *codersdk.Error
980+
if errors.As(err, &sdkErr) {
981+
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
982+
a.logger.Warn(ctx, "startup logs too large, dropping logs")
983+
break
984+
}
985+
}
986+
a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog))
987+
}
988+
if ctx.Err() != nil {
989+
return
990+
}
991+
backlog = nil
992+
993+
// Anchor flush to the last log upload.
994+
flush.Reset(flushTimeout)
995+
}
996+
if done {
997+
return
964998
}
965-
a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend))
966-
}
967-
// Reset logsSending flag
968-
logMutex.Lock()
969-
logsSending = false
970-
flushLogsTimer.Reset(100 * time.Millisecond)
971-
logMutex.Unlock()
972-
logsFlushed.Broadcast()
973-
}
974-
// queueLog function appends a log to the queue and triggers sendLogs if necessary
975-
queueLog := func(log agentsdk.StartupLog) {
976-
logMutex.Lock()
977-
defer logMutex.Unlock()
978-
979-
// Append log to the queue
980-
queuedLogs = append(queuedLogs, log)
981-
982-
// If there are more than 100 logs, send them immediately
983-
if len(queuedLogs) > 100 {
984-
// Don't early return after this, because we still want
985-
// to reset the timer just in case logs come in while
986-
// we're sending.
987-
go sendLogs()
988-
}
989-
// Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
990-
if flushLogsTimer != nil {
991-
flushLogsTimer.Reset(100 * time.Millisecond)
992-
return
993999
}
994-
flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs)
995-
}
1000+
}()
9961001

997-
// It's important that we either flush or drop all logs before returning
998-
// because the startup state is reported after flush.
1002+
// Forward read lines to the sender or queue them for when the
1003+
// sender is ready to process them.
9991004
//
1000-
// It'd be weird for the startup state to be ready, but logs are still
1001-
// coming in.
1002-
logsFinished := make(chan struct{})
1005+
// We only need to track this goroutine since it will ensure that
1006+
// the sender has closed before returning.
1007+
logsDone := make(chan struct{})
10031008
err := a.trackConnGoroutine(func() {
1004-
scanner := bufio.NewScanner(reader)
1005-
for scanner.Scan() {
1006-
queueLog(agentsdk.StartupLog{
1009+
defer func() {
1010+
close(send)
1011+
<-sendDone
1012+
a.logger.Debug(ctx, "track script logs reader exited")
1013+
close(logsDone)
1014+
}()
1015+
1016+
var queue []agentsdk.StartupLog
1017+
1018+
s := bufio.NewScanner(reader)
1019+
for s.Scan() {
1020+
select {
1021+
case <-ctx.Done():
1022+
return
1023+
case <-a.closed:
1024+
return
1025+
case queue = <-send:
1026+
// Not captured by sender yet, re-use.
1027+
default:
1028+
}
1029+
1030+
queue = append(queue, agentsdk.StartupLog{
10071031
CreatedAt: database.Now(),
1008-
Output: scanner.Text(),
1032+
Output: s.Text(),
10091033
})
1034+
send <- queue
1035+
queue = nil
10101036
}
1011-
if err := scanner.Err(); err != nil {
1012-
a.logger.Error(ctx, "scan startup logs", slog.Error(err))
1013-
}
1014-
defer close(logsFinished)
1015-
logsFlushed.L.Lock()
1016-
for {
1017-
logMutex.Lock()
1018-
if len(queuedLogs) == 0 {
1019-
logMutex.Unlock()
1020-
break
1021-
}
1022-
logMutex.Unlock()
1023-
logsFlushed.Wait()
1037+
if err := s.Err(); err != nil {
1038+
a.logger.Warn(ctx, "scan startup logs ended unexpectedly", slog.Error(err))
10241039
}
10251040
})
10261041
if err != nil {
1027-
return nil, xerrors.Errorf("track conn goroutine: %w", err)
1042+
close(send)
1043+
<-sendDone
1044+
close(logsDone)
1045+
return logsDone, err
10281046
}
1029-
return logsFinished, nil
1047+
1048+
return logsDone, nil
10301049
}
10311050

10321051
func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {

0 commit comments

Comments
 (0)