From 391385424beb8fd50f7c5f97c84295f08a7c4529 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 19 Jun 2023 17:38:11 +0000 Subject: [PATCH 1/4] fix(agent): refacor `trackScriptLogs` to avoid deadlock During agent close it was possible for the startup script logs consumer to enter a deadlock state where by agent close was waiting via `a.trackConnGoroutine` and the log reader for a flush event. This refactor removes the mutex in favor of channel communication and relies on two goroutines without shared state. --- agent/agent.go | 203 +++++++++++++++++++++++++------------------------ 1 file changed, 103 insertions(+), 100 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 9a368195b6b67..754105f8edf14 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -912,121 +912,124 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er } func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) { - // Initialize variables for log management - queuedLogs := make([]agentsdk.StartupLog, 0) - var flushLogsTimer *time.Timer - var logMutex sync.Mutex - logsFlushed := sync.NewCond(&sync.Mutex{}) - var logsSending bool - defer func() { - logMutex.Lock() - if flushLogsTimer != nil { - flushLogsTimer.Stop() - } - logMutex.Unlock() - }() + // Synchronous sender, there can only be one outbound send at a time. + // + // It's important that we either flush or drop all logs before returning + // because the startup state is reported after flush. + sendDone := make(chan struct{}) + send := make(chan []agentsdk.StartupLog, 1) + go func() { + flushTimeout := 100 * time.Millisecond + flush := time.NewTimer(flushTimeout) + flush.Stop() - // sendLogs function uploads the queued logs to the server - sendLogs := func() { - // Lock logMutex and check if logs are already being sent - logMutex.Lock() - if logsSending { - logMutex.Unlock() - return - } - if flushLogsTimer != nil { - flushLogsTimer.Stop() - } - if len(queuedLogs) == 0 { - logMutex.Unlock() - return - } - // Move the current queued logs to logsToSend and clear the queue - logsToSend := queuedLogs - logsSending = true - queuedLogs = make([]agentsdk.StartupLog, 0) - logMutex.Unlock() - - // Retry uploading logs until successful or a specific error occurs - for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); { - err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{ - Logs: logsToSend, - }) - if err == nil { - break + defer func() { + flush.Stop() + a.logger.Debug(ctx, "track script logs sender exited") + close(sendDone) + }() + + var backlog []agentsdk.StartupLog + done := false + for { + flushed := false + select { + // Close (!ok) can be triggered by the reader closing due to + // EOF or due to agent closing, when this happens we attempt + // a final flush. If the context is canceled this will be a + // no-op. + case logs, ok := <-send: + done = !ok + if ok { + backlog = append(backlog, logs...) + flushed = len(backlog) >= 100 + } + case <-flush.C: + flushed = true } - var sdkErr *codersdk.Error - if errors.As(err, &sdkErr) { - if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge { - a.logger.Warn(ctx, "startup logs too large, dropping logs") - break + + if (done || flushed) && len(backlog) > 0 { + // Retry uploading logs until successful or a specific + // error occurs. + for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); { + err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{ + Logs: backlog, + }) + if err == nil { + break + } + + if errors.Is(err, context.Canceled) { + return + } + var sdkErr *codersdk.Error + if errors.As(err, &sdkErr) { + if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge { + a.logger.Warn(ctx, "startup logs too large, dropping logs") + break + } + } + a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog)) + } + if ctx.Err() != nil { + return } + backlog = nil } - a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend)) - } - // Reset logsSending flag - logMutex.Lock() - logsSending = false - flushLogsTimer.Reset(100 * time.Millisecond) - logMutex.Unlock() - logsFlushed.Broadcast() - } - // queueLog function appends a log to the queue and triggers sendLogs if necessary - queueLog := func(log agentsdk.StartupLog) { - logMutex.Lock() - defer logMutex.Unlock() - - // Append log to the queue - queuedLogs = append(queuedLogs, log) - - // If there are more than 100 logs, send them immediately - if len(queuedLogs) > 100 { - // Don't early return after this, because we still want - // to reset the timer just in case logs come in while - // we're sending. - go sendLogs() - } - // Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds - if flushLogsTimer != nil { - flushLogsTimer.Reset(100 * time.Millisecond) - return + if done { + return + } + + flush.Reset(flushTimeout) } - flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs) - } + }() - // It's important that we either flush or drop all logs before returning - // because the startup state is reported after flush. + // Forward read lines to the sender or queue them for when the + // sender is ready to process them. // - // It'd be weird for the startup state to be ready, but logs are still - // coming in. - logsFinished := make(chan struct{}) + // We only need to track this goroutine since it will ensure that + // the sender has closed before returning. + logsDone := make(chan struct{}) err := a.trackConnGoroutine(func() { - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - queueLog(agentsdk.StartupLog{ + defer func() { + close(send) + <-sendDone + a.logger.Debug(ctx, "track script logs reader exited") + close(logsDone) + }() + + var queue []agentsdk.StartupLog + + s := bufio.NewScanner(reader) + for s.Scan() { + select { + case <-ctx.Done(): + return + case <-a.closed: + return + case queue = <-send: + // Not captured by sender yet, re-use. + default: + } + + queue = append(queue, agentsdk.StartupLog{ CreatedAt: database.Now(), - Output: scanner.Text(), + Output: s.Text(), }) + send <- queue + queue = nil } - if err := scanner.Err(); err != nil { - a.logger.Error(ctx, "scan startup logs", slog.Error(err)) - } - defer close(logsFinished) - logsFlushed.L.Lock() - for { - logMutex.Lock() - if len(queuedLogs) == 0 { - logMutex.Unlock() - break - } - logMutex.Unlock() - logsFlushed.Wait() + if err := s.Err(); err != nil { + a.logger.Error(ctx, "scan startup logs failed", slog.Error(err)) } }) if err != nil { - return nil, xerrors.Errorf("track conn goroutine: %w", err) + close(send) + <-sendDone + return nil, err } - return logsFinished, nil + + return logsDone, nil } func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) { From 78c478c4dafd52da26ac1cc92f39165e9acb2934 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 20 Jun 2023 12:26:48 +0000 Subject: [PATCH 2/4] fix: ensure startup logs scanner can be stopped --- agent/agent.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 754105f8edf14..a71adfeb0fc08 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -911,7 +911,7 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er return nil } -func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) { +func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan struct{}, error) { // Synchronous sender, there can only be one outbound send at a time. // // It's important that we either flush or drop all logs before returning @@ -923,17 +923,25 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan str flush := time.NewTimer(flushTimeout) flush.Stop() + var backlog []agentsdk.StartupLog defer func() { flush.Stop() + _ = reader.Close() // Ensure read routine is closed. + if len(backlog) > 0 { + a.logger.Debug(ctx, "track script logs sender exiting, discarding logs", slog.F("discarded_logs_count", len(backlog))) + } a.logger.Debug(ctx, "track script logs sender exited") close(sendDone) }() - var backlog []agentsdk.StartupLog done := false for { flushed := false select { + case <-ctx.Done(): + return + case <-a.closed: + return // Close (!ok) can be triggered by the reader closing due to // EOF or due to agent closing, when this happens we attempt // a final flush. If the context is canceled this will be a @@ -1026,7 +1034,8 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan str if err != nil { close(send) <-sendDone - return nil, err + close(logsDone) + return logsDone, err } return logsDone, nil From 6d2fcb5197234c03e11660b9c4f1d73866ca4dd6 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 20 Jun 2023 12:50:08 +0000 Subject: [PATCH 3/4] increase flush timeout and anchor it to send --- agent/agent.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a71adfeb0fc08..c611b08d4ed55 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -919,9 +919,13 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan sendDone := make(chan struct{}) send := make(chan []agentsdk.StartupLog, 1) go func() { - flushTimeout := 100 * time.Millisecond - flush := time.NewTimer(flushTimeout) - flush.Stop() + // Set flushTimeout and backlogLimit so that logs are uploaded + // once every 250ms or when 100 logs have been added to the + // backlog, whichever comes first. + flushTimeout := 250 * time.Millisecond + backlogLimit := 100 + + flush := time.NewTicker(flushTimeout) var backlog []agentsdk.StartupLog defer func() { @@ -950,13 +954,15 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan done = !ok if ok { backlog = append(backlog, logs...) - flushed = len(backlog) >= 100 + flushed = len(backlog) >= backlogLimit } case <-flush.C: flushed = true } if (done || flushed) && len(backlog) > 0 { + flush.Stop() // Lower the chance of a double flush. + // Retry uploading logs until successful or a specific // error occurs. for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); { @@ -983,12 +989,13 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan return } backlog = nil + + // Anchor flush to the last log upload. + flush.Reset(flushTimeout) } if done { return } - - flush.Reset(flushTimeout) } }() From 000623251cc05d413724cd15254c4a00da3ae51f Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 20 Jun 2023 13:36:43 +0000 Subject: [PATCH 4/4] use warn log to avoid ci failure due to reader close --- agent/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agent.go b/agent/agent.go index c611b08d4ed55..0aaec4effac93 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1035,7 +1035,7 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan queue = nil } if err := s.Err(); err != nil { - a.logger.Error(ctx, "scan startup logs failed", slog.Error(err)) + a.logger.Warn(ctx, "scan startup logs ended unexpectedly", slog.Error(err)) } }) if err != nil {