diff --git a/agent/logs.go b/agent/logs.go index af02d87a9200f..75ff343a67212 100644 --- a/agent/logs.go +++ b/agent/logs.go @@ -26,8 +26,9 @@ type logQueue struct { // the agent calls sendLoop to send pending logs. type logSender struct { *sync.Cond - queues map[uuid.UUID]*logQueue - logger slog.Logger + queues map[uuid.UUID]*logQueue + logger slog.Logger + exceededLogLimit bool } type logDest interface { @@ -50,6 +51,11 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error { } l.L.Lock() defer l.L.Unlock() + if l.exceededLogLimit { + logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit") + // don't error, as we also write to file and don't want the overall write to fail + return nil + } defer l.Broadcast() q, ok := l.queues[src] if !ok { @@ -88,6 +94,8 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { defer l.logger.Debug(ctx, "sendLoop exiting") // wake 4 times per flush interval to check if anything needs to be flushed + ctx, cancel := context.WithCancel(ctx) + defer cancel() go func() { tkr := time.NewTicker(flushInterval / 4) defer tkr.Stop() @@ -110,12 +118,18 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { l.L.Lock() defer l.L.Unlock() for { - for !ctxDone && !l.hasPendingWorkLocked() { + for !ctxDone && !l.exceededLogLimit && !l.hasPendingWorkLocked() { l.Wait() } if ctxDone { return nil } + if l.exceededLogLimit { + l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded") + // no point in keeping this loop going, if log limit is exceeded, but don't return an + // error because we're already handled it + return nil + } src, q := l.getPendingWorkLocked() q.flushRequested = false // clear flag since we're now flushing req := &proto.BatchCreateLogsRequest{ @@ -125,11 +139,20 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { l.L.Unlock() l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs))) - _, err := dest.BatchCreateLogs(ctx, req) + resp, err := dest.BatchCreateLogs(ctx, req) l.L.Lock() if err != nil { return xerrors.Errorf("failed to upload logs: %w", err) } + if resp.LogLimitExceeded { + l.logger.Warn(ctx, "server log limit exceeded; logs truncated") + l.exceededLogLimit = true + // no point in keeping anything we have queued around, server will not accept them + l.queues = make(map[uuid.UUID]*logQueue) + // We've handled the error as best as we can. We don't want the server limit to grind + // other things to a halt, so this is all we can do. + return nil + } // Since elsewhere we only append to the logs, here we can remove them // since we successfully sent them. First we nil the pointers though, diff --git a/agent/logs_internal_test.go b/agent/logs_internal_test.go index 83cdf33b007d3..22bd6b632ded9 100644 --- a/agent/logs_internal_test.go +++ b/agent/logs_internal_test.go @@ -19,7 +19,7 @@ import ( "github.com/coder/coder/v2/testutil" ) -func TestLogSender(t *testing.T) { +func TestLogSender_Mainline(t *testing.T) { t.Parallel() testCtx := testutil.Context(t, testutil.WaitShort) ctx, cancel := context.WithCancel(testCtx) @@ -62,7 +62,9 @@ func TestLogSender(t *testing.T) { // both, although the order is not controlled var logReqs []*proto.BatchCreateLogsRequest logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs)) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs)) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) for _, req := range logReqs { require.NotNil(t, req) srcID, err := uuid.FromBytes(req.LogSourceId) @@ -97,6 +99,7 @@ func TestLogSender(t *testing.T) { require.NoError(t, err) req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) // give ourselves a 25% buffer if we're right on the cusp of a tick require.LessOrEqual(t, time.Since(t1), flushInterval*5/4) require.NotNil(t, req) @@ -118,8 +121,53 @@ func TestLogSender(t *testing.T) { require.NoError(t, err) } +func TestLogSender_LogLimitExceeded(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fDest := newFakeLogDest() + uut := newLogSender(logger) + + t0 := dbtime.Now() + + ls1 := uuid.UUID{0x11} + err := uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t0, + Output: "test log 0, src 1", + Level: codersdk.LogLevelInfo, + }) + require.NoError(t, err) + + loopErr := make(chan error, 1) + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + + req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + require.NotNil(t, req) + testutil.RequireSendCtx(ctx, t, fDest.resps, + &proto.BatchCreateLogsResponse{LogLimitExceeded: true}) + + err = testutil.RequireRecvCtx(ctx, t, loopErr) + require.NoError(t, err) + + // we can still enqueue more logs after sendLoop returns, but they don't + // actually get enqueued + err = uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t0, + Output: "test log 2, src 1", + Level: codersdk.LogLevelTrace, + }) + require.NoError(t, err) + uut.L.Lock() + defer uut.L.Unlock() + require.Len(t, uut.queues, 0) +} + type fakeLogDest struct { - reqs chan *proto.BatchCreateLogsRequest + reqs chan *proto.BatchCreateLogsRequest + resps chan *proto.BatchCreateLogsResponse } func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) { @@ -130,12 +178,18 @@ func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreate case <-ctx.Done(): return nil, ctx.Err() case f.reqs <- req: - return &proto.BatchCreateLogsResponse{}, nil + select { + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-f.resps: + return resp, nil + } } } func newFakeLogDest() *fakeLogDest { return &fakeLogDest{ - reqs: make(chan *proto.BatchCreateLogsRequest), + reqs: make(chan *proto.BatchCreateLogsRequest), + resps: make(chan *proto.BatchCreateLogsResponse), } }