From e32a982dcc7a80deb13791966244dc312aecf953 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Oct 2023 12:19:10 +0000 Subject: [PATCH] fix(codersdk/agentsdk): improve flush during ctx cancel in agent logs, fix test Fixes #9719 Related #9865 --- codersdk/agentsdk/logs.go | 36 ++++++++++++++++++++++++++-------- codersdk/agentsdk/logs_test.go | 4 +++- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/codersdk/agentsdk/logs.go b/codersdk/agentsdk/logs.go index ff63e68d60add..e7b86194cd1cb 100644 --- a/codersdk/agentsdk/logs.go +++ b/codersdk/agentsdk/logs.go @@ -90,12 +90,31 @@ func LogsWriter(ctx context.Context, sender func(ctx context.Context, log ...Log } } +// LogsSenderFlushTimeout changes the default flush timeout (250ms), +// this is mostly useful for tests. +func LogsSenderFlushTimeout(timeout time.Duration) func(*logsSenderOptions) { + return func(o *logsSenderOptions) { + o.flushTimeout = timeout + } +} + +type logsSenderOptions struct { + flushTimeout time.Duration +} + // LogsSender will send agent startup logs to the server. Calls to // sendLog are non-blocking and will return an error if flushAndClose // has been called. Calling sendLog concurrently is not supported. If // the context passed to flushAndClose is canceled, any remaining logs // will be discarded. -func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) { +func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger, opts ...func(*logsSenderOptions)) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) { + o := logsSenderOptions{ + flushTimeout: 250 * time.Millisecond, + } + for _, opt := range opts { + opt(&o) + } + // The main context is used to close the sender goroutine and cancel // any outbound requests to the API. The shutdown context is used to // signal the sender goroutine to flush logs and then exit. @@ -109,10 +128,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc // Set flushTimeout and backlogLimit so that logs are uploaded // once every 250ms or when 100 logs have been added to the // backlog, whichever comes first. - flushTimeout := 250 * time.Millisecond backlogLimit := 100 - flush := time.NewTicker(flushTimeout) + flush := time.NewTicker(o.flushTimeout) var backlog []Log defer func() { @@ -153,8 +171,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc // error occurs. Note that we use the main context here, // meaning these requests won't be interrupted by // shutdown. - for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx) && ctx.Err() == nil; { - err := patchLogs(ctx, PatchLogs{ + var err error + for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); { + err = patchLogs(ctx, PatchLogs{ Logs: backlog, LogSourceID: sourceID, }) @@ -163,7 +182,7 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc } if errors.Is(err, context.Canceled) { - return + break } // This error is expected to be codersdk.Error, but it has // private fields so we can't fake it in tests. @@ -171,18 +190,19 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc if errors.As(err, &statusErr) { if statusErr.StatusCode() == http.StatusRequestEntityTooLarge { logger.Warn(ctx, "startup logs too large, discarding logs", slog.F("discarded_logs_count", len(backlog)), slog.Error(err)) + err = nil break } } logger.Error(ctx, "startup logs sender failed to upload logs, retrying later", slog.F("logs_count", len(backlog)), slog.Error(err)) } - if ctx.Err() != nil { + if err != nil { return } backlog = nil // Anchor flush to the last log upload. - flush.Reset(flushTimeout) + flush.Reset(o.flushTimeout) } if done { return diff --git a/codersdk/agentsdk/logs_test.go b/codersdk/agentsdk/logs_test.go index 90e4ff42107d7..e411e6e7213e4 100644 --- a/codersdk/agentsdk/logs_test.go +++ b/codersdk/agentsdk/logs_test.go @@ -344,7 +344,9 @@ func TestStartupLogsSender(t *testing.T) { return nil } - sendLog, flushAndClose := agentsdk.LogsSender(uuid.New(), patchLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug)) + // Prevent race between auto-flush and context cancellation with + // a really long timeout. + sendLog, flushAndClose := agentsdk.LogsSender(uuid.New(), patchLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug), agentsdk.LogsSenderFlushTimeout(time.Hour)) defer func() { _ = flushAndClose(ctx) }()