Skip to content

feat: limit queued logs to database limit in agent #12067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions agent/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import (
)

const (
flushInterval = time.Second
logOutputMaxBytes = 1 << 20 // 1MiB
overheadPerLog = 21 // found by testing
flushInterval = time.Second
maxBytesPerBatch = 1 << 20 // 1MiB
overheadPerLog = 21 // found by testing

// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
// accept in the database.
maxBytesQueued = 1048576
)

type logQueue struct {
Expand All @@ -33,6 +38,7 @@ type logSender struct {
queues map[uuid.UUID]*logQueue
logger slog.Logger
exceededLogLimit bool
outputLen int
}

type logDest interface {
Expand All @@ -47,6 +53,8 @@ func newLogSender(logger slog.Logger) *logSender {
}
}

var MaxQueueExceededError = xerrors.New("maximum queued logs exceeded")

func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
logger := l.logger.With(slog.F("log_source_id", src))
if len(logs) == 0 {
Expand All @@ -66,12 +74,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
q = &logQueue{}
l.queues[src] = q
}
for _, log := range logs {
for k, log := range logs {
// Here we check the queue size before adding a log because we want to queue up slightly
// more logs than the database would store to ensure we trigger "logs truncated" at the
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
// examined the Coder agent logs.
if l.outputLen > maxBytesQueued {
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
return MaxQueueExceededError
}
pl, err := agentsdk.ProtoFromLog(log)
if err != nil {
return xerrors.Errorf("failed to convert log: %w", err)
}
if len(pl.Output) > maxBytesPerBatch {
logger.Warn(context.Background(), "dropping log line that exceeds our limit")
continue
}
q.logs = append(q.logs, pl)
l.outputLen += len(pl.Output)
}
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
return nil
Expand Down Expand Up @@ -140,21 +161,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
req := &proto.BatchCreateLogsRequest{
LogSourceId: src[:],
}
o := 0

// outputToSend keeps track of the size of the protobuf message we send, while
// outputToRemove keeps track of the size of the output we'll remove from the queues on
// success. They are different because outputToSend also counts protocol message overheads.
outputToSend := 0
outputToRemove := 0
n := 0
for n < len(q.logs) {
log := q.logs[n]
if len(log.Output) > logOutputMaxBytes {
logger.Warn(ctx, "dropping log line that exceeds our limit")
n++
continue
}
o += len(log.Output) + overheadPerLog
if o > logOutputMaxBytes {
outputToSend += len(log.Output) + overheadPerLog
if outputToSend > maxBytesPerBatch {
break
}
req.Logs = append(req.Logs, log)
n++
outputToRemove += len(log.Output)
}

l.L.Unlock()
Expand All @@ -181,6 +203,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
q.logs[i] = nil
}
q.logs = q.logs[n:]
l.outputLen -= outputToRemove
if len(q.logs) == 0 {
// no empty queues
delete(l.queues, src)
Expand Down
68 changes: 65 additions & 3 deletions agent/logs_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {

t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
hugeLog := make([]byte, logOutputMaxBytes+1)
hugeLog := make([]byte, maxBytesPerBatch+1)
for i := range hugeLog {
hugeLog[i] = 'q'
}
Expand Down Expand Up @@ -246,14 +246,14 @@ func TestLogSender_Batch(t *testing.T) {
gotLogs += len(req.Logs)
wire, err := protobuf.Marshal(req)
require.NoError(t, err)
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
gotLogs += len(req.Logs)
wire, err = protobuf.Marshal(req)
require.NoError(t, err)
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
require.Equal(t, 60000, gotLogs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})

Expand All @@ -262,6 +262,68 @@ func TestLogSender_Batch(t *testing.T) {
require.NoError(t, err)
}

func TestLogSender_MaxQueuedLogs(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := newLogSender(logger)

t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
n := 4
hugeLog := make([]byte, maxBytesQueued/n)
for i := range hugeLog {
hugeLog[i] = 'q'
}
var logs []agentsdk.Log
for i := 0; i < n; i++ {
logs = append(logs, agentsdk.Log{
CreatedAt: t0,
Output: string(hugeLog),
Level: codersdk.LogLevelInfo,
})
}
err := uut.enqueue(ls1, logs...)
require.NoError(t, err)

// we're now right at the limit of output
require.Equal(t, maxBytesQueued, uut.outputLen)

// adding more logs should error...
ls2 := uuid.UUID{0x22}
err = uut.enqueue(ls2, logs...)
require.ErrorIs(t, err, MaxQueueExceededError)

loopErr := make(chan error, 1)
go func() {
err := uut.sendLoop(ctx, fDest)
loopErr <- err
}()

// ...but, it should still queue up one log from source #2, so that we would exceed the database
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
// #1 come in 2 updates, plus 1 update for source #2.
logsBySource := make(map[uuid.UUID]int)
for i := 0; i < 3; i++ {
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
srcID, err := uuid.FromBytes(req.LogSourceId)
require.NoError(t, err)
logsBySource[srcID] += len(req.Logs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
}
require.Equal(t, map[uuid.UUID]int{
ls1: n,
ls2: 1,
}, logsBySource)

cancel()
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
require.NoError(t, err)
}

type fakeLogDest struct {
reqs chan *proto.BatchCreateLogsRequest
resps chan *proto.BatchCreateLogsResponse
Expand Down