Skip to content

Commit a2cd664

Browse files
authored
fix(codersdk/agentsdk): improve ctx cancel in agent logs flush, fix test (#10214)
Fixes #9719 Related #9865
1 parent a1ee4d4 commit a2cd664

File tree

2 files changed

+31
-9
lines changed

2 files changed

+31
-9
lines changed

codersdk/agentsdk/logs.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,31 @@ func LogsWriter(ctx context.Context, sender func(ctx context.Context, log ...Log
9090
}
9191
}
9292

93+
// LogsSenderFlushTimeout changes the default flush timeout (250ms),
94+
// this is mostly useful for tests.
95+
func LogsSenderFlushTimeout(timeout time.Duration) func(*logsSenderOptions) {
96+
return func(o *logsSenderOptions) {
97+
o.flushTimeout = timeout
98+
}
99+
}
100+
101+
type logsSenderOptions struct {
102+
flushTimeout time.Duration
103+
}
104+
93105
// LogsSender will send agent startup logs to the server. Calls to
94106
// sendLog are non-blocking and will return an error if flushAndClose
95107
// has been called. Calling sendLog concurrently is not supported. If
96108
// the context passed to flushAndClose is canceled, any remaining logs
97109
// will be discarded.
98-
func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) {
110+
func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger, opts ...func(*logsSenderOptions)) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) {
111+
o := logsSenderOptions{
112+
flushTimeout: 250 * time.Millisecond,
113+
}
114+
for _, opt := range opts {
115+
opt(&o)
116+
}
117+
99118
// The main context is used to close the sender goroutine and cancel
100119
// any outbound requests to the API. The shutdown context is used to
101120
// signal the sender goroutine to flush logs and then exit.
@@ -109,10 +128,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
109128
// Set flushTimeout and backlogLimit so that logs are uploaded
110129
// once every 250ms or when 100 logs have been added to the
111130
// backlog, whichever comes first.
112-
flushTimeout := 250 * time.Millisecond
113131
backlogLimit := 100
114132

115-
flush := time.NewTicker(flushTimeout)
133+
flush := time.NewTicker(o.flushTimeout)
116134

117135
var backlog []Log
118136
defer func() {
@@ -153,8 +171,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
153171
// error occurs. Note that we use the main context here,
154172
// meaning these requests won't be interrupted by
155173
// shutdown.
156-
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx) && ctx.Err() == nil; {
157-
err := patchLogs(ctx, PatchLogs{
174+
var err error
175+
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
176+
err = patchLogs(ctx, PatchLogs{
158177
Logs: backlog,
159178
LogSourceID: sourceID,
160179
})
@@ -163,26 +182,27 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
163182
}
164183

165184
if errors.Is(err, context.Canceled) {
166-
return
185+
break
167186
}
168187
// This error is expected to be codersdk.Error, but it has
169188
// private fields so we can't fake it in tests.
170189
var statusErr interface{ StatusCode() int }
171190
if errors.As(err, &statusErr) {
172191
if statusErr.StatusCode() == http.StatusRequestEntityTooLarge {
173192
logger.Warn(ctx, "startup logs too large, discarding logs", slog.F("discarded_logs_count", len(backlog)), slog.Error(err))
193+
err = nil
174194
break
175195
}
176196
}
177197
logger.Error(ctx, "startup logs sender failed to upload logs, retrying later", slog.F("logs_count", len(backlog)), slog.Error(err))
178198
}
179-
if ctx.Err() != nil {
199+
if err != nil {
180200
return
181201
}
182202
backlog = nil
183203

184204
// Anchor flush to the last log upload.
185-
flush.Reset(flushTimeout)
205+
flush.Reset(o.flushTimeout)
186206
}
187207
if done {
188208
return

codersdk/agentsdk/logs_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,9 @@ func TestStartupLogsSender(t *testing.T) {
344344
return nil
345345
}
346346

347-
sendLog, flushAndClose := agentsdk.LogsSender(uuid.New(), patchLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug))
347+
// Prevent race between auto-flush and context cancellation with
348+
// a really long timeout.
349+
sendLog, flushAndClose := agentsdk.LogsSender(uuid.New(), patchLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug), agentsdk.LogsSenderFlushTimeout(time.Hour))
348350
defer func() {
349351
_ = flushAndClose(ctx)
350352
}()

0 commit comments

Comments
 (0)