Skip to content

Commit c8e7282

Browse files
committed
feat: handle log limit exceeded in logSender
1 parent 3047485 commit c8e7282

File tree

2 files changed

+85
-8
lines changed

2 files changed

+85
-8
lines changed

agent/logs.go

+27-4
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ type logQueue struct {
2727
// the agent calls sendLoop to send pending logs.
2828
type logSender struct {
2929
*sync.Cond
30-
queues map[uuid.UUID]*logQueue
31-
logger slog.Logger
30+
queues map[uuid.UUID]*logQueue
31+
logger slog.Logger
32+
exceededLogLimit bool
3233
}
3334

3435
type logDest interface {
@@ -51,6 +52,11 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
5152
}
5253
l.L.Lock()
5354
defer l.L.Unlock()
55+
if l.exceededLogLimit {
56+
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
57+
// don't error, as we also write to file and don't want the overall write to fail
58+
return nil
59+
}
5460
defer l.Broadcast()
5561
q, ok := l.queues[src]
5662
if !ok {
@@ -89,6 +95,8 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
8995
defer l.logger.Debug(ctx, "sendLoop exiting")
9096

9197
// wake 4 times per flush interval to check if anything needs to be flushed
98+
ctx, cancel := context.WithCancel(ctx)
99+
defer cancel()
92100
go func() {
93101
tkr := time.NewTicker(flushInterval / 4)
94102
defer tkr.Stop()
@@ -111,12 +119,18 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
111119
l.L.Lock()
112120
defer l.L.Unlock()
113121
for {
114-
for !ctxDone && !l.hasPendingWorkLocked() {
122+
for !ctxDone && !l.exceededLogLimit && !l.hasPendingWorkLocked() {
115123
l.Wait()
116124
}
117125
if ctxDone {
118126
return nil
119127
}
128+
if l.exceededLogLimit {
129+
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
130+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
131+
// error because we're already handled it
132+
return nil
133+
}
120134
src, q := l.getPendingWorkLocked()
121135
q.flushRequested = false // clear flag since we're now flushing
122136
req := &proto.BatchCreateLogsRequest{
@@ -127,11 +141,20 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
127141

128142
l.L.Unlock()
129143
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
130-
_, err := dest.BatchCreateLogs(ctx, req)
144+
resp, err := dest.BatchCreateLogs(ctx, req)
131145
l.L.Lock()
132146
if err != nil {
133147
return xerrors.Errorf("failed to upload logs: %w", err)
134148
}
149+
if resp.LogLimitExceeded {
150+
l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
151+
l.exceededLogLimit = true
152+
// no point in keeping anything we have queued around, server will not accept them
153+
l.queues = make(map[uuid.UUID]*logQueue)
154+
// We've handled the error as best as we can. We don't want the server limit to grind
155+
// other things to a halt, so this is all we can do.
156+
return nil
157+
}
135158

136159
// since elsewhere we only append to the logs, here we can remove them
137160
// since we successfully sent them

agent/logs_internal_test.go

+58-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"github.com/coder/coder/v2/testutil"
1818
)
1919

20-
func TestLogSender(t *testing.T) {
20+
func TestLogSender_Mainline(t *testing.T) {
2121
t.Parallel()
2222
testCtx := testutil.Context(t, testutil.WaitShort)
2323
ctx, cancel := context.WithCancel(testCtx)
@@ -60,7 +60,9 @@ func TestLogSender(t *testing.T) {
6060
// both, although the order is not controlled
6161
var logReqs []*proto.BatchCreateLogsRequest
6262
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
63+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
6364
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
65+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
6466
for _, req := range logReqs {
6567
require.NotNil(t, req)
6668
srcID, err := uuid.FromBytes(req.LogSourceId)
@@ -95,6 +97,7 @@ func TestLogSender(t *testing.T) {
9597
require.NoError(t, err)
9698

9799
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
100+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
98101
// give ourselves a 25% buffer if we're right on the cusp of a tick
99102
require.LessOrEqual(t, time.Since(t1), flushInterval*5/4)
100103
require.NotNil(t, req)
@@ -116,21 +119,72 @@ func TestLogSender(t *testing.T) {
116119
require.NoError(t, err)
117120
}
118121

122+
func TestLogSender_LogLimitExceeded(t *testing.T) {
123+
t.Parallel()
124+
ctx := testutil.Context(t, testutil.WaitShort)
125+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
126+
fDest := newFakeLogDest()
127+
uut := newLogSender(logger)
128+
129+
t0 := dbtime.Now()
130+
131+
ls1 := uuid.UUID{0x11}
132+
err := uut.enqueue(ls1, agentsdk.Log{
133+
CreatedAt: t0,
134+
Output: "test log 0, src 1",
135+
Level: codersdk.LogLevelInfo,
136+
})
137+
require.NoError(t, err)
138+
139+
loopErr := make(chan error, 1)
140+
go func() {
141+
err := uut.sendLoop(ctx, fDest)
142+
loopErr <- err
143+
}()
144+
145+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
146+
require.NotNil(t, req)
147+
testutil.RequireSendCtx(ctx, t, fDest.resps,
148+
&proto.BatchCreateLogsResponse{LogLimitExceeded: true})
149+
150+
err = testutil.RequireRecvCtx(ctx, t, loopErr)
151+
require.NoError(t, err)
152+
153+
// we can still enqueue more logs after sendLoop returns, but they don't
154+
// actually get enqueued
155+
err = uut.enqueue(ls1, agentsdk.Log{
156+
CreatedAt: t0,
157+
Output: "test log 2, src 1",
158+
Level: codersdk.LogLevelTrace,
159+
})
160+
require.NoError(t, err)
161+
uut.L.Lock()
162+
defer uut.L.Unlock()
163+
require.Len(t, uut.queues, 0)
164+
}
165+
119166
type fakeLogDest struct {
120-
reqs chan *proto.BatchCreateLogsRequest
167+
reqs chan *proto.BatchCreateLogsRequest
168+
resps chan *proto.BatchCreateLogsResponse
121169
}
122170

123171
func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
124172
select {
125173
case <-ctx.Done():
126174
return nil, ctx.Err()
127175
case f.reqs <- req:
128-
return &proto.BatchCreateLogsResponse{}, nil
176+
select {
177+
case <-ctx.Done():
178+
return nil, ctx.Err()
179+
case resp := <-f.resps:
180+
return resp, nil
181+
}
129182
}
130183
}
131184

132185
func newFakeLogDest() *fakeLogDest {
133186
return &fakeLogDest{
134-
reqs: make(chan *proto.BatchCreateLogsRequest),
187+
reqs: make(chan *proto.BatchCreateLogsRequest),
188+
resps: make(chan *proto.BatchCreateLogsResponse),
135189
}
136190
}

0 commit comments

Comments
 (0)