Skip to content

Commit ede6f2f

Browse files
committed
feat: ensure that log batches don't exceed 1MiB in logSender
1 parent c8e7282 commit ede6f2f

File tree

2 files changed

+121
-6
lines changed

2 files changed

+121
-6
lines changed

agent/logs.go

+24-6
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@ import (
66
"time"
77

88
"github.com/google/uuid"
9-
"golang.org/x/exp/slices"
109
"golang.org/x/xerrors"
1110

1211
"cdr.dev/slog"
1312
"github.com/coder/coder/v2/agent/proto"
1413
"github.com/coder/coder/v2/codersdk/agentsdk"
1514
)
1615

17-
const flushInterval = time.Second
16+
const (
17+
flushInterval = time.Second
18+
logOutputMaxBytes = 1 << 20 // 1MiB
19+
overheadPerLog = 21 // found by testing
20+
)
1821

1922
type logQueue struct {
2023
logs []*proto.Log
@@ -132,15 +135,30 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
132135
return nil
133136
}
134137
src, q := l.getPendingWorkLocked()
138+
logger := l.logger.With(slog.F("log_source_id", src))
135139
q.flushRequested = false // clear flag since we're now flushing
136140
req := &proto.BatchCreateLogsRequest{
137141
LogSourceId: src[:],
138-
// when we release the lock, we don't want modifications to the slice to affect us
139-
Logs: slices.Clone(q.logs),
142+
}
143+
o := 0
144+
n := 0
145+
for n < len(q.logs) {
146+
log := q.logs[n]
147+
if len(log.Output) > logOutputMaxBytes {
148+
logger.Warn(ctx, "dropping log line that exceeds our limit")
149+
n++
150+
continue
151+
}
152+
o += len(log.Output) + overheadPerLog
153+
if o > logOutputMaxBytes {
154+
break
155+
}
156+
req.Logs = append(req.Logs, log)
157+
n++
140158
}
141159

142160
l.L.Unlock()
143-
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
161+
logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
144162
resp, err := dest.BatchCreateLogs(ctx, req)
145163
l.L.Lock()
146164
if err != nil {
@@ -158,7 +176,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
158176

159177
// since elsewhere we only append to the logs, here we can remove them
160178
// since we successfully sent them
161-
q.logs = q.logs[len(req.Logs):]
179+
q.logs = q.logs[n:]
162180
if len(q.logs) == 0 {
163181
// no empty queues
164182
delete(l.queues, src)

agent/logs_internal_test.go

+97
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/google/uuid"
99
"github.com/stretchr/testify/require"
10+
protobuf "google.golang.org/protobuf/proto"
1011

1112
"cdr.dev/slog"
1213
"cdr.dev/slog/sloggers/slogtest"
@@ -163,6 +164,102 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
163164
require.Len(t, uut.queues, 0)
164165
}
165166

167+
func TestLogSender_SkipHugeLog(t *testing.T) {
168+
t.Parallel()
169+
testCtx := testutil.Context(t, testutil.WaitShort)
170+
ctx, cancel := context.WithCancel(testCtx)
171+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
172+
fDest := newFakeLogDest()
173+
uut := newLogSender(logger)
174+
175+
t0 := dbtime.Now()
176+
ls1 := uuid.UUID{0x11}
177+
hugeLog := make([]byte, logOutputMaxBytes+1)
178+
for i := range hugeLog {
179+
hugeLog[i] = 'q'
180+
}
181+
err := uut.enqueue(ls1,
182+
agentsdk.Log{
183+
CreatedAt: t0,
184+
Output: string(hugeLog),
185+
Level: codersdk.LogLevelInfo,
186+
},
187+
agentsdk.Log{
188+
CreatedAt: t0,
189+
Output: "test log 1, src 1",
190+
Level: codersdk.LogLevelInfo,
191+
})
192+
require.NoError(t, err)
193+
194+
loopErr := make(chan error, 1)
195+
go func() {
196+
err := uut.sendLoop(ctx, fDest)
197+
loopErr <- err
198+
}()
199+
200+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
201+
require.NotNil(t, req)
202+
require.Len(t, req.Logs, 1, "it should skip the huge log")
203+
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
204+
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
205+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
206+
207+
cancel()
208+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
209+
require.NoError(t, err)
210+
}
211+
212+
func TestLogSender_Batch(t *testing.T) {
213+
t.Parallel()
214+
testCtx := testutil.Context(t, testutil.WaitShort)
215+
ctx, cancel := context.WithCancel(testCtx)
216+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
217+
fDest := newFakeLogDest()
218+
uut := newLogSender(logger)
219+
220+
t0 := dbtime.Now()
221+
ls1 := uuid.UUID{0x11}
222+
var logs []agentsdk.Log
223+
for i := 0; i < 60000; i++ {
224+
logs = append(logs, agentsdk.Log{
225+
CreatedAt: t0,
226+
Output: "r",
227+
Level: codersdk.LogLevelInfo,
228+
})
229+
}
230+
err := uut.enqueue(ls1, logs...)
231+
require.NoError(t, err)
232+
233+
loopErr := make(chan error, 1)
234+
go func() {
235+
err := uut.sendLoop(ctx, fDest)
236+
loopErr <- err
237+
}()
238+
239+
// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log
240+
// is about 21 bytes.
241+
gotLogs := 0
242+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
243+
require.NotNil(t, req)
244+
gotLogs += len(req.Logs)
245+
wire, err := protobuf.Marshal(req)
246+
require.NoError(t, err)
247+
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
248+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
249+
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
250+
require.NotNil(t, req)
251+
gotLogs += len(req.Logs)
252+
wire, err = protobuf.Marshal(req)
253+
require.NoError(t, err)
254+
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
255+
require.Equal(t, 60000, gotLogs)
256+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
257+
258+
cancel()
259+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
260+
require.NoError(t, err)
261+
}
262+
166263
type fakeLogDest struct {
167264
reqs chan *proto.BatchCreateLogsRequest
168265
resps chan *proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)