Skip to content

Commit 2014110

Browse files
committed
feat: limit queued logs to database limit in agent
1 parent ede6f2f commit 2014110

File tree

2 files changed

+100
-15
lines changed

2 files changed

+100
-15
lines changed

agent/logs.go

+35-12
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ import (
1414
)
1515

1616
const (
17-
flushInterval = time.Second
18-
logOutputMaxBytes = 1 << 20 // 1MiB
19-
overheadPerLog = 21 // found by testing
17+
flushInterval = time.Second
18+
maxBytesPerBatch = 1 << 20 // 1MiB
19+
overheadPerLog = 21 // found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued = 1048576
2025
)
2126

2227
type logQueue struct {
@@ -33,6 +38,7 @@ type logSender struct {
3338
queues map[uuid.UUID]*logQueue
3439
logger slog.Logger
3540
exceededLogLimit bool
41+
outputLen int
3642
}
3743

3844
type logDest interface {
@@ -47,6 +53,8 @@ func newLogSender(logger slog.Logger) *logSender {
4753
}
4854
}
4955

56+
var MaxQueueExceededError = xerrors.New("maximum queued logs exceeded")
57+
5058
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
5159
logger := l.logger.With(slog.F("log_source_id", src))
5260
if len(logs) == 0 {
@@ -66,12 +74,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
6674
q = &logQueue{}
6775
l.queues[src] = q
6876
}
69-
for _, log := range logs {
77+
for k, log := range logs {
78+
// Here we check the queue size before adding a log because we want to queue up slightly
79+
// more logs than the database would store to ensure we trigger "logs truncated" at the
80+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
81+
// examined the Coder agent logs.
82+
if l.outputLen > maxBytesQueued {
83+
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
84+
return MaxQueueExceededError
85+
}
7086
pl, err := agentsdk.ProtoFromLog(log)
7187
if err != nil {
7288
return xerrors.Errorf("failed to convert log: %w", err)
7389
}
90+
if len(pl.Output) > maxBytesPerBatch {
91+
logger.Warn(context.Background(), "dropping log line that exceeds our limit")
92+
continue
93+
}
7494
q.logs = append(q.logs, pl)
95+
l.outputLen += len(pl.Output)
7596
}
7697
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
7798
return nil
@@ -140,21 +161,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
140161
req := &proto.BatchCreateLogsRequest{
141162
LogSourceId: src[:],
142163
}
143-
o := 0
164+
165+
// outputToSend keeps track of the size of the protobuf message we send, while
166+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
167+
// success. They are different because outputToSend also counts protocol message overheads.
168+
outputToSend := 0
169+
outputToRemove := 0
144170
n := 0
145171
for n < len(q.logs) {
146172
log := q.logs[n]
147-
if len(log.Output) > logOutputMaxBytes {
148-
logger.Warn(ctx, "dropping log line that exceeds our limit")
149-
n++
150-
continue
151-
}
152-
o += len(log.Output) + overheadPerLog
153-
if o > logOutputMaxBytes {
173+
outputToSend += len(log.Output) + overheadPerLog
174+
if outputToSend > maxBytesPerBatch {
154175
break
155176
}
156177
req.Logs = append(req.Logs, log)
157178
n++
179+
outputToRemove += len(log.Output)
158180
}
159181

160182
l.L.Unlock()
@@ -177,6 +199,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
177199
// since elsewhere we only append to the logs, here we can remove them
178200
// since we successfully sent them
179201
q.logs = q.logs[n:]
202+
l.outputLen -= outputToRemove
180203
if len(q.logs) == 0 {
181204
// no empty queues
182205
delete(l.queues, src)

agent/logs_internal_test.go

+65-3
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
174174

175175
t0 := dbtime.Now()
176176
ls1 := uuid.UUID{0x11}
177-
hugeLog := make([]byte, logOutputMaxBytes+1)
177+
hugeLog := make([]byte, maxBytesPerBatch+1)
178178
for i := range hugeLog {
179179
hugeLog[i] = 'q'
180180
}
@@ -244,14 +244,14 @@ func TestLogSender_Batch(t *testing.T) {
244244
gotLogs += len(req.Logs)
245245
wire, err := protobuf.Marshal(req)
246246
require.NoError(t, err)
247-
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
247+
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
248248
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
249249
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
250250
require.NotNil(t, req)
251251
gotLogs += len(req.Logs)
252252
wire, err = protobuf.Marshal(req)
253253
require.NoError(t, err)
254-
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
254+
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
255255
require.Equal(t, 60000, gotLogs)
256256
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
257257

@@ -260,6 +260,68 @@ func TestLogSender_Batch(t *testing.T) {
260260
require.NoError(t, err)
261261
}
262262

263+
func TestLogSender_MaxQueuedLogs(t *testing.T) {
264+
t.Parallel()
265+
testCtx := testutil.Context(t, testutil.WaitShort)
266+
ctx, cancel := context.WithCancel(testCtx)
267+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
268+
fDest := newFakeLogDest()
269+
uut := newLogSender(logger)
270+
271+
t0 := dbtime.Now()
272+
ls1 := uuid.UUID{0x11}
273+
n := 4
274+
hugeLog := make([]byte, maxBytesQueued/n)
275+
for i := range hugeLog {
276+
hugeLog[i] = 'q'
277+
}
278+
var logs []agentsdk.Log
279+
for i := 0; i < n; i++ {
280+
logs = append(logs, agentsdk.Log{
281+
CreatedAt: t0,
282+
Output: string(hugeLog),
283+
Level: codersdk.LogLevelInfo,
284+
})
285+
}
286+
err := uut.enqueue(ls1, logs...)
287+
require.NoError(t, err)
288+
289+
// we're now right at the limit of output
290+
require.Equal(t, maxBytesQueued, uut.outputLen)
291+
292+
// adding more logs should error...
293+
ls2 := uuid.UUID{0x22}
294+
err = uut.enqueue(ls2, logs...)
295+
require.ErrorIs(t, err, MaxQueueExceededError)
296+
297+
loopErr := make(chan error, 1)
298+
go func() {
299+
err := uut.sendLoop(ctx, fDest)
300+
loopErr <- err
301+
}()
302+
303+
// ...but, it should still queue up one log from source #2, so that we would exceed the database
304+
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
305+
// #1 come in 2 updates, plus 1 update for source #2.
306+
logsBySource := make(map[uuid.UUID]int)
307+
for i := 0; i < 3; i++ {
308+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
309+
require.NotNil(t, req)
310+
srcID, err := uuid.FromBytes(req.LogSourceId)
311+
require.NoError(t, err)
312+
logsBySource[srcID] += len(req.Logs)
313+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
314+
}
315+
require.Equal(t, map[uuid.UUID]int{
316+
ls1: n,
317+
ls2: 1,
318+
}, logsBySource)
319+
320+
cancel()
321+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
322+
require.NoError(t, err)
323+
}
324+
263325
type fakeLogDest struct {
264326
reqs chan *proto.BatchCreateLogsRequest
265327
resps chan *proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)