Skip to content

Commit 446cb12

Browse files
committed
feat: limit queued logs to database limit in agent
1 parent 743f5f3 commit 446cb12

File tree

2 files changed

+100
-15
lines changed

2 files changed

+100
-15
lines changed

agent/logs.go

+35-12
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ import (
1414
)
1515

1616
const (
17-
flushInterval = time.Second
18-
logOutputMaxBytes = 1 << 20 // 1MiB
19-
overheadPerLog = 21 // found by testing
17+
flushInterval = time.Second
18+
maxBytesPerBatch = 1 << 20 // 1MiB
19+
overheadPerLog = 21 // found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued = 1048576
2025
)
2126

2227
type logQueue struct {
@@ -33,6 +38,7 @@ type logSender struct {
3338
queues map[uuid.UUID]*logQueue
3439
logger slog.Logger
3540
exceededLogLimit bool
41+
outputLen int
3642
}
3743

3844
type logDest interface {
@@ -47,6 +53,8 @@ func newLogSender(logger slog.Logger) *logSender {
4753
}
4854
}
4955

56+
var MaxQueueExceededError = xerrors.New("maximum queued logs exceeded")
57+
5058
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
5159
logger := l.logger.With(slog.F("log_source_id", src))
5260
if len(logs) == 0 {
@@ -66,12 +74,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
6674
q = &logQueue{}
6775
l.queues[src] = q
6876
}
69-
for _, log := range logs {
77+
for k, log := range logs {
78+
// Here we check the queue size before adding a log because we want to queue up slightly
79+
// more logs than the database would store to ensure we trigger "logs truncated" at the
80+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
81+
// examined the Coder agent logs.
82+
if l.outputLen > maxBytesQueued {
83+
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
84+
return MaxQueueExceededError
85+
}
7086
pl, err := agentsdk.ProtoFromLog(log)
7187
if err != nil {
7288
return xerrors.Errorf("failed to convert log: %w", err)
7389
}
90+
if len(pl.Output) > maxBytesPerBatch {
91+
logger.Warn(context.Background(), "dropping log line that exceeds our limit")
92+
continue
93+
}
7494
q.logs = append(q.logs, pl)
95+
l.outputLen += len(pl.Output)
7596
}
7697
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
7798
return nil
@@ -140,21 +161,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
140161
req := &proto.BatchCreateLogsRequest{
141162
LogSourceId: src[:],
142163
}
143-
o := 0
164+
165+
// outputToSend keeps track of the size of the protobuf message we send, while
166+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
167+
// success. They are different because outputToSend also counts protocol message overheads.
168+
outputToSend := 0
169+
outputToRemove := 0
144170
n := 0
145171
for n < len(q.logs) {
146172
log := q.logs[n]
147-
if len(log.Output) > logOutputMaxBytes {
148-
logger.Warn(ctx, "dropping log line that exceeds our limit")
149-
n++
150-
continue
151-
}
152-
o += len(log.Output) + overheadPerLog
153-
if o > logOutputMaxBytes {
173+
outputToSend += len(log.Output) + overheadPerLog
174+
if outputToSend > maxBytesPerBatch {
154175
break
155176
}
156177
req.Logs = append(req.Logs, log)
157178
n++
179+
outputToRemove += len(log.Output)
158180
}
159181

160182
l.L.Unlock()
@@ -181,6 +203,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
181203
q.logs[i] = nil
182204
}
183205
q.logs = q.logs[n:]
206+
l.outputLen -= outputToRemove
184207
if len(q.logs) == 0 {
185208
// no empty queues
186209
delete(l.queues, src)

agent/logs_internal_test.go

+65-3
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
176176

177177
t0 := dbtime.Now()
178178
ls1 := uuid.UUID{0x11}
179-
hugeLog := make([]byte, logOutputMaxBytes+1)
179+
hugeLog := make([]byte, maxBytesPerBatch+1)
180180
for i := range hugeLog {
181181
hugeLog[i] = 'q'
182182
}
@@ -246,14 +246,14 @@ func TestLogSender_Batch(t *testing.T) {
246246
gotLogs += len(req.Logs)
247247
wire, err := protobuf.Marshal(req)
248248
require.NoError(t, err)
249-
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
249+
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
250250
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
251251
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
252252
require.NotNil(t, req)
253253
gotLogs += len(req.Logs)
254254
wire, err = protobuf.Marshal(req)
255255
require.NoError(t, err)
256-
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
256+
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
257257
require.Equal(t, 60000, gotLogs)
258258
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
259259

@@ -262,6 +262,68 @@ func TestLogSender_Batch(t *testing.T) {
262262
require.NoError(t, err)
263263
}
264264

265+
func TestLogSender_MaxQueuedLogs(t *testing.T) {
266+
t.Parallel()
267+
testCtx := testutil.Context(t, testutil.WaitShort)
268+
ctx, cancel := context.WithCancel(testCtx)
269+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
270+
fDest := newFakeLogDest()
271+
uut := newLogSender(logger)
272+
273+
t0 := dbtime.Now()
274+
ls1 := uuid.UUID{0x11}
275+
n := 4
276+
hugeLog := make([]byte, maxBytesQueued/n)
277+
for i := range hugeLog {
278+
hugeLog[i] = 'q'
279+
}
280+
var logs []agentsdk.Log
281+
for i := 0; i < n; i++ {
282+
logs = append(logs, agentsdk.Log{
283+
CreatedAt: t0,
284+
Output: string(hugeLog),
285+
Level: codersdk.LogLevelInfo,
286+
})
287+
}
288+
err := uut.enqueue(ls1, logs...)
289+
require.NoError(t, err)
290+
291+
// we're now right at the limit of output
292+
require.Equal(t, maxBytesQueued, uut.outputLen)
293+
294+
// adding more logs should error...
295+
ls2 := uuid.UUID{0x22}
296+
err = uut.enqueue(ls2, logs...)
297+
require.ErrorIs(t, err, MaxQueueExceededError)
298+
299+
loopErr := make(chan error, 1)
300+
go func() {
301+
err := uut.sendLoop(ctx, fDest)
302+
loopErr <- err
303+
}()
304+
305+
// ...but, it should still queue up one log from source #2, so that we would exceed the database
306+
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
307+
// #1 come in 2 updates, plus 1 update for source #2.
308+
logsBySource := make(map[uuid.UUID]int)
309+
for i := 0; i < 3; i++ {
310+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
311+
require.NotNil(t, req)
312+
srcID, err := uuid.FromBytes(req.LogSourceId)
313+
require.NoError(t, err)
314+
logsBySource[srcID] += len(req.Logs)
315+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
316+
}
317+
require.Equal(t, map[uuid.UUID]int{
318+
ls1: n,
319+
ls2: 1,
320+
}, logsBySource)
321+
322+
cancel()
323+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
324+
require.NoError(t, err)
325+
}
326+
265327
type fakeLogDest struct {
266328
reqs chan *proto.BatchCreateLogsRequest
267329
resps chan *proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)