Skip to content

fix(codersdk/agentsdk): improve ctx cancel in agent logs flush, fix test #10214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions codersdk/agentsdk/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,31 @@ func LogsWriter(ctx context.Context, sender func(ctx context.Context, log ...Log
}
}

// LogsSenderFlushTimeout changes the default flush timeout (250ms),
// this is mostly useful for tests.
func LogsSenderFlushTimeout(timeout time.Duration) func(*logsSenderOptions) {
return func(o *logsSenderOptions) {
o.flushTimeout = timeout
}
}

type logsSenderOptions struct {
flushTimeout time.Duration
}

// LogsSender will send agent startup logs to the server. Calls to
// sendLog are non-blocking and will return an error if flushAndClose
// has been called. Calling sendLog concurrently is not supported. If
// the context passed to flushAndClose is canceled, any remaining logs
// will be discarded.
func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) {
func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger, opts ...func(*logsSenderOptions)) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) {
o := logsSenderOptions{
flushTimeout: 250 * time.Millisecond,
}
for _, opt := range opts {
opt(&o)
}

// The main context is used to close the sender goroutine and cancel
// any outbound requests to the API. The shutdown context is used to
// signal the sender goroutine to flush logs and then exit.
Expand All @@ -109,10 +128,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
// Set flushTimeout and backlogLimit so that logs are uploaded
// once every 250ms or when 100 logs have been added to the
// backlog, whichever comes first.
flushTimeout := 250 * time.Millisecond
backlogLimit := 100

flush := time.NewTicker(flushTimeout)
flush := time.NewTicker(o.flushTimeout)

var backlog []Log
defer func() {
Expand Down Expand Up @@ -153,8 +171,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
// error occurs. Note that we use the main context here,
// meaning these requests won't be interrupted by
// shutdown.
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx) && ctx.Err() == nil; {
err := patchLogs(ctx, PatchLogs{
var err error
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
err = patchLogs(ctx, PatchLogs{
Logs: backlog,
LogSourceID: sourceID,
})
Expand All @@ -163,26 +182,27 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
}

if errors.Is(err, context.Canceled) {
return
break
}
// This error is expected to be codersdk.Error, but it has
// private fields so we can't fake it in tests.
var statusErr interface{ StatusCode() int }
if errors.As(err, &statusErr) {
if statusErr.StatusCode() == http.StatusRequestEntityTooLarge {
logger.Warn(ctx, "startup logs too large, discarding logs", slog.F("discarded_logs_count", len(backlog)), slog.Error(err))
err = nil
break
}
}
logger.Error(ctx, "startup logs sender failed to upload logs, retrying later", slog.F("logs_count", len(backlog)), slog.Error(err))
}
if ctx.Err() != nil {
if err != nil {
return
}
backlog = nil

// Anchor flush to the last log upload.
flush.Reset(flushTimeout)
flush.Reset(o.flushTimeout)
}
if done {
return
Expand Down
4 changes: 3 additions & 1 deletion codersdk/agentsdk/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ func TestStartupLogsSender(t *testing.T) {
return nil
}

sendLog, flushAndClose := agentsdk.LogsSender(uuid.New(), patchLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug))
// Prevent race between auto-flush and context cancellation with
// a really long timeout.
sendLog, flushAndClose := agentsdk.LogsSender(uuid.New(), patchLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug), agentsdk.LogsSenderFlushTimeout(time.Hour))
defer func() {
_ = flushAndClose(ctx)
}()
Expand Down