diff --git a/agent/agent.go b/agent/agent.go index 9a368195b6b67..0aaec4effac93 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -911,122 +911,141 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er return nil } -func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) { - // Initialize variables for log management - queuedLogs := make([]agentsdk.StartupLog, 0) - var flushLogsTimer *time.Timer - var logMutex sync.Mutex - logsFlushed := sync.NewCond(&sync.Mutex{}) - var logsSending bool - defer func() { - logMutex.Lock() - if flushLogsTimer != nil { - flushLogsTimer.Stop() - } - logMutex.Unlock() - }() +func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan struct{}, error) { + // Synchronous sender, there can only be one outbound send at a time. + // + // It's important that we either flush or drop all logs before returning + // because the startup state is reported after flush. + sendDone := make(chan struct{}) + send := make(chan []agentsdk.StartupLog, 1) + go func() { + // Set flushTimeout and backlogLimit so that logs are uploaded + // once every 250ms or when 100 logs have been added to the + // backlog, whichever comes first. + flushTimeout := 250 * time.Millisecond + backlogLimit := 100 - // sendLogs function uploads the queued logs to the server - sendLogs := func() { - // Lock logMutex and check if logs are already being sent - logMutex.Lock() - if logsSending { - logMutex.Unlock() - return - } - if flushLogsTimer != nil { - flushLogsTimer.Stop() - } - if len(queuedLogs) == 0 { - logMutex.Unlock() - return - } - // Move the current queued logs to logsToSend and clear the queue - logsToSend := queuedLogs - logsSending = true - queuedLogs = make([]agentsdk.StartupLog, 0) - logMutex.Unlock() - - // Retry uploading logs until successful or a specific error occurs - for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); { - err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{ - Logs: logsToSend, - }) - if err == nil { - break + flush := time.NewTicker(flushTimeout) + + var backlog []agentsdk.StartupLog + defer func() { + flush.Stop() + _ = reader.Close() // Ensure read routine is closed. + if len(backlog) > 0 { + a.logger.Debug(ctx, "track script logs sender exiting, discarding logs", slog.F("discarded_logs_count", len(backlog))) } - var sdkErr *codersdk.Error - if errors.As(err, &sdkErr) { - if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge { - a.logger.Warn(ctx, "startup logs too large, dropping logs") - break + a.logger.Debug(ctx, "track script logs sender exited") + close(sendDone) + }() + + done := false + for { + flushed := false + select { + case <-ctx.Done(): + return + case <-a.closed: + return + // Close (!ok) can be triggered by the reader closing due to + // EOF or due to agent closing, when this happens we attempt + // a final flush. If the context is canceled this will be a + // no-op. + case logs, ok := <-send: + done = !ok + if ok { + backlog = append(backlog, logs...) + flushed = len(backlog) >= backlogLimit } + case <-flush.C: + flushed = true + } + + if (done || flushed) && len(backlog) > 0 { + flush.Stop() // Lower the chance of a double flush. + + // Retry uploading logs until successful or a specific + // error occurs. + for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); { + err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{ + Logs: backlog, + }) + if err == nil { + break + } + + if errors.Is(err, context.Canceled) { + return + } + var sdkErr *codersdk.Error + if errors.As(err, &sdkErr) { + if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge { + a.logger.Warn(ctx, "startup logs too large, dropping logs") + break + } + } + a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog)) + } + if ctx.Err() != nil { + return + } + backlog = nil + + // Anchor flush to the last log upload. + flush.Reset(flushTimeout) + } + if done { + return } - a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend)) - } - // Reset logsSending flag - logMutex.Lock() - logsSending = false - flushLogsTimer.Reset(100 * time.Millisecond) - logMutex.Unlock() - logsFlushed.Broadcast() - } - // queueLog function appends a log to the queue and triggers sendLogs if necessary - queueLog := func(log agentsdk.StartupLog) { - logMutex.Lock() - defer logMutex.Unlock() - - // Append log to the queue - queuedLogs = append(queuedLogs, log) - - // If there are more than 100 logs, send them immediately - if len(queuedLogs) > 100 { - // Don't early return after this, because we still want - // to reset the timer just in case logs come in while - // we're sending. - go sendLogs() - } - // Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds - if flushLogsTimer != nil { - flushLogsTimer.Reset(100 * time.Millisecond) - return } - flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs) - } + }() - // It's important that we either flush or drop all logs before returning - // because the startup state is reported after flush. + // Forward read lines to the sender or queue them for when the + // sender is ready to process them. // - // It'd be weird for the startup state to be ready, but logs are still - // coming in. - logsFinished := make(chan struct{}) + // We only need to track this goroutine since it will ensure that + // the sender has closed before returning. + logsDone := make(chan struct{}) err := a.trackConnGoroutine(func() { - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - queueLog(agentsdk.StartupLog{ + defer func() { + close(send) + <-sendDone + a.logger.Debug(ctx, "track script logs reader exited") + close(logsDone) + }() + + var queue []agentsdk.StartupLog + + s := bufio.NewScanner(reader) + for s.Scan() { + select { + case <-ctx.Done(): + return + case <-a.closed: + return + case queue = <-send: + // Not captured by sender yet, re-use. + default: + } + + queue = append(queue, agentsdk.StartupLog{ CreatedAt: database.Now(), - Output: scanner.Text(), + Output: s.Text(), }) + send <- queue + queue = nil } - if err := scanner.Err(); err != nil { - a.logger.Error(ctx, "scan startup logs", slog.Error(err)) - } - defer close(logsFinished) - logsFlushed.L.Lock() - for { - logMutex.Lock() - if len(queuedLogs) == 0 { - logMutex.Unlock() - break - } - logMutex.Unlock() - logsFlushed.Wait() + if err := s.Err(); err != nil { + a.logger.Warn(ctx, "scan startup logs ended unexpectedly", slog.Error(err)) } }) if err != nil { - return nil, xerrors.Errorf("track conn goroutine: %w", err) + close(send) + <-sendDone + close(logsDone) + return logsDone, err } - return logsFinished, nil + + return logsDone, nil } func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {