diff --git a/agent/logs.go b/agent/logs.go index 590a46cd0d431..3a16e0e1ac9e7 100644 --- a/agent/logs.go +++ b/agent/logs.go @@ -14,9 +14,14 @@ import ( ) const ( - flushInterval = time.Second - logOutputMaxBytes = 1 << 20 // 1MiB - overheadPerLog = 21 // found by testing + flushInterval = time.Second + maxBytesPerBatch = 1 << 20 // 1MiB + overheadPerLog = 21 // found by testing + + // maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken + // from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll + // accept in the database. + maxBytesQueued = 1048576 ) type logQueue struct { @@ -33,6 +38,7 @@ type logSender struct { queues map[uuid.UUID]*logQueue logger slog.Logger exceededLogLimit bool + outputLen int } type logDest interface { @@ -47,6 +53,8 @@ func newLogSender(logger slog.Logger) *logSender { } } +var MaxQueueExceededError = xerrors.New("maximum queued logs exceeded") + func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error { logger := l.logger.With(slog.F("log_source_id", src)) if len(logs) == 0 { @@ -66,12 +74,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error { q = &logQueue{} l.queues[src] = q } - for _, log := range logs { + for k, log := range logs { + // Here we check the queue size before adding a log because we want to queue up slightly + // more logs than the database would store to ensure we trigger "logs truncated" at the + // database layer. Otherwise, the end user wouldn't know logs are truncated unless they + // examined the Coder agent logs. + if l.outputLen > maxBytesQueued { + logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs))) + return MaxQueueExceededError + } pl, err := agentsdk.ProtoFromLog(log) if err != nil { return xerrors.Errorf("failed to convert log: %w", err) } + if len(pl.Output) > maxBytesPerBatch { + logger.Warn(context.Background(), "dropping log line that exceeds our limit") + continue + } q.logs = append(q.logs, pl) + l.outputLen += len(pl.Output) } logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs))) return nil @@ -140,21 +161,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { req := &proto.BatchCreateLogsRequest{ LogSourceId: src[:], } - o := 0 + + // outputToSend keeps track of the size of the protobuf message we send, while + // outputToRemove keeps track of the size of the output we'll remove from the queues on + // success. They are different because outputToSend also counts protocol message overheads. + outputToSend := 0 + outputToRemove := 0 n := 0 for n < len(q.logs) { log := q.logs[n] - if len(log.Output) > logOutputMaxBytes { - logger.Warn(ctx, "dropping log line that exceeds our limit") - n++ - continue - } - o += len(log.Output) + overheadPerLog - if o > logOutputMaxBytes { + outputToSend += len(log.Output) + overheadPerLog + if outputToSend > maxBytesPerBatch { break } req.Logs = append(req.Logs, log) n++ + outputToRemove += len(log.Output) } l.L.Unlock() @@ -181,6 +203,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { q.logs[i] = nil } q.logs = q.logs[n:] + l.outputLen -= outputToRemove if len(q.logs) == 0 { // no empty queues delete(l.queues, src) diff --git a/agent/logs_internal_test.go b/agent/logs_internal_test.go index 146777d5882e6..df5b164685de3 100644 --- a/agent/logs_internal_test.go +++ b/agent/logs_internal_test.go @@ -176,7 +176,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) { t0 := dbtime.Now() ls1 := uuid.UUID{0x11} - hugeLog := make([]byte, logOutputMaxBytes+1) + hugeLog := make([]byte, maxBytesPerBatch+1) for i := range hugeLog { hugeLog[i] = 'q' } @@ -246,14 +246,14 @@ func TestLogSender_Batch(t *testing.T) { gotLogs += len(req.Logs) wire, err := protobuf.Marshal(req) require.NoError(t, err) - require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB") + require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB") testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) req = testutil.RequireRecvCtx(ctx, t, fDest.reqs) require.NotNil(t, req) gotLogs += len(req.Logs) wire, err = protobuf.Marshal(req) require.NoError(t, err) - require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB") + require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB") require.Equal(t, 60000, gotLogs) testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) @@ -262,6 +262,68 @@ func TestLogSender_Batch(t *testing.T) { require.NoError(t, err) } +func TestLogSender_MaxQueuedLogs(t *testing.T) { + t.Parallel() + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fDest := newFakeLogDest() + uut := newLogSender(logger) + + t0 := dbtime.Now() + ls1 := uuid.UUID{0x11} + n := 4 + hugeLog := make([]byte, maxBytesQueued/n) + for i := range hugeLog { + hugeLog[i] = 'q' + } + var logs []agentsdk.Log + for i := 0; i < n; i++ { + logs = append(logs, agentsdk.Log{ + CreatedAt: t0, + Output: string(hugeLog), + Level: codersdk.LogLevelInfo, + }) + } + err := uut.enqueue(ls1, logs...) + require.NoError(t, err) + + // we're now right at the limit of output + require.Equal(t, maxBytesQueued, uut.outputLen) + + // adding more logs should error... + ls2 := uuid.UUID{0x22} + err = uut.enqueue(ls2, logs...) + require.ErrorIs(t, err, MaxQueueExceededError) + + loopErr := make(chan error, 1) + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + + // ...but, it should still queue up one log from source #2, so that we would exceed the database + // limit. These come over a total of 3 updates, because due to overhead, the n logs from source + // #1 come in 2 updates, plus 1 update for source #2. + logsBySource := make(map[uuid.UUID]int) + for i := 0; i < 3; i++ { + req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + require.NotNil(t, req) + srcID, err := uuid.FromBytes(req.LogSourceId) + require.NoError(t, err) + logsBySource[srcID] += len(req.Logs) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) + } + require.Equal(t, map[uuid.UUID]int{ + ls1: n, + ls2: 1, + }, logsBySource) + + cancel() + err = testutil.RequireRecvCtx(testCtx, t, loopErr) + require.NoError(t, err) +} + type fakeLogDest struct { reqs chan *proto.BatchCreateLogsRequest resps chan *proto.BatchCreateLogsResponse