Skip to content

Commit 27b55aa

Browse files
committed
feat: ensure that log batches don't exceed 1MiB in logSender
1 parent 0f99947 commit 27b55aa

File tree

2 files changed

+119
-7
lines changed

2 files changed

+119
-7
lines changed

agent/logs.go

+24-6
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@ import (
66
"time"
77

88
"github.com/google/uuid"
9-
"golang.org/x/exp/slices"
109
"golang.org/x/xerrors"
1110

1211
"cdr.dev/slog"
1312
"github.com/coder/coder/v2/agent/proto"
1413
"github.com/coder/coder/v2/codersdk/agentsdk"
1514
)
1615

17-
const flushInterval = time.Second
16+
const (
17+
flushInterval = time.Second
18+
logOutputMaxBytes = 1 << 20 // 1MiB
19+
overheadPerLog = 21 // found by testing
20+
)
1821

1922
type logQueue struct {
2023
logs []*proto.Log
@@ -118,15 +121,30 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
118121
return nil
119122
}
120123
src, q := l.getPendingWorkLocked()
124+
logger := l.logger.With(slog.F("log_source_id", src))
121125
q.flushRequested = false // clear flag since we're now flushing
122126
req := &proto.BatchCreateLogsRequest{
123127
LogSourceId: src[:],
124-
// when we release the lock, we don't want modifications to the slice to affect us
125-
Logs: slices.Clone(q.logs),
128+
}
129+
o := 0
130+
n := 0
131+
for n < len(q.logs) {
132+
log := q.logs[n]
133+
if len(log.Output) > logOutputMaxBytes {
134+
logger.Warn(ctx, "dropping log line that exceeds our limit")
135+
n++
136+
continue
137+
}
138+
o += len(log.Output) + overheadPerLog
139+
if o > logOutputMaxBytes {
140+
break
141+
}
142+
req.Logs = append(req.Logs, log)
143+
n++
126144
}
127145

128146
l.L.Unlock()
129-
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
147+
logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
130148
_, err := dest.BatchCreateLogs(ctx, req)
131149
l.L.Lock()
132150
if err != nil {
@@ -135,7 +153,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
135153

136154
// since elsewhere we only append to the logs, here we can remove them
137155
// since we successfully sent them
138-
q.logs = q.logs[len(req.Logs):]
156+
q.logs = q.logs[n:]
139157
if len(q.logs) == 0 {
140158
// no empty queues
141159
delete(l.queues, src)

agent/logs_internal_test.go

+95-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/google/uuid"
99
"github.com/stretchr/testify/require"
10+
protobuf "google.golang.org/protobuf/proto"
1011

1112
"cdr.dev/slog"
1213
"cdr.dev/slog/sloggers/slogtest"
@@ -17,7 +18,7 @@ import (
1718
"github.com/coder/coder/v2/testutil"
1819
)
1920

20-
func TestLogSender(t *testing.T) {
21+
func TestLogSender_Mainline(t *testing.T) {
2122
t.Parallel()
2223
testCtx := testutil.Context(t, testutil.WaitShort)
2324
ctx, cancel := context.WithCancel(testCtx)
@@ -116,6 +117,99 @@ func TestLogSender(t *testing.T) {
116117
require.NoError(t, err)
117118
}
118119

120+
func TestLogSender_SkipHugeLog(t *testing.T) {
121+
t.Parallel()
122+
testCtx := testutil.Context(t, testutil.WaitShort)
123+
ctx, cancel := context.WithCancel(testCtx)
124+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
125+
fDest := newFakeLogDest()
126+
uut := newLogSender(logger)
127+
128+
t0 := dbtime.Now()
129+
ls1 := uuid.UUID{0x11}
130+
hugeLog := make([]byte, logOutputMaxBytes+1)
131+
for i := range hugeLog {
132+
hugeLog[i] = 'q'
133+
}
134+
err := uut.enqueue(ls1,
135+
agentsdk.Log{
136+
CreatedAt: t0,
137+
Output: string(hugeLog),
138+
Level: codersdk.LogLevelInfo,
139+
},
140+
agentsdk.Log{
141+
CreatedAt: t0,
142+
Output: "test log 1, src 1",
143+
Level: codersdk.LogLevelInfo,
144+
})
145+
require.NoError(t, err)
146+
147+
loopErr := make(chan error, 1)
148+
go func() {
149+
err := uut.sendLoop(ctx, fDest)
150+
loopErr <- err
151+
}()
152+
153+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
154+
require.NotNil(t, req)
155+
require.Len(t, req.Logs, 1, "it should skip the huge log")
156+
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
157+
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
158+
159+
cancel()
160+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
161+
require.NoError(t, err)
162+
}
163+
164+
func TestLogSender_Batch(t *testing.T) {
165+
t.Parallel()
166+
testCtx := testutil.Context(t, testutil.WaitShort)
167+
ctx, cancel := context.WithCancel(testCtx)
168+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
169+
fDest := newFakeLogDest()
170+
uut := newLogSender(logger)
171+
172+
t0 := dbtime.Now()
173+
ls1 := uuid.UUID{0x11}
174+
var logs []agentsdk.Log
175+
for i := 0; i < 60000; i++ {
176+
logs = append(logs, agentsdk.Log{
177+
CreatedAt: t0,
178+
Output: "r",
179+
Level: codersdk.LogLevelInfo,
180+
})
181+
}
182+
err := uut.enqueue(ls1, logs...)
183+
require.NoError(t, err)
184+
185+
loopErr := make(chan error, 1)
186+
go func() {
187+
err := uut.sendLoop(ctx, fDest)
188+
loopErr <- err
189+
}()
190+
191+
// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log
192+
// is about 21 bytes.
193+
gotLogs := 0
194+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
195+
require.NotNil(t, req)
196+
gotLogs += len(req.Logs)
197+
wire, err := protobuf.Marshal(req)
198+
require.NoError(t, err)
199+
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
200+
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
201+
require.NotNil(t, req)
202+
gotLogs += len(req.Logs)
203+
wire, err = protobuf.Marshal(req)
204+
require.NoError(t, err)
205+
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
206+
require.Equal(t, 60000, gotLogs)
207+
208+
cancel()
209+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
210+
require.NoError(t, err)
211+
}
212+
119213
type fakeLogDest struct {
120214
reqs chan *proto.BatchCreateLogsRequest
121215
}

0 commit comments

Comments
 (0)