Skip to content

Commit 0515169

Browse files
committed
feat: handle log limit exceeded in logSender
1 parent ad9d94a commit 0515169

File tree

2 files changed

+85
-8
lines changed

2 files changed

+85
-8
lines changed

agent/logs.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ type logQueue struct {
2626
// the agent calls sendLoop to send pending logs.
2727
type logSender struct {
2828
*sync.Cond
29-
queues map[uuid.UUID]*logQueue
30-
logger slog.Logger
29+
queues map[uuid.UUID]*logQueue
30+
logger slog.Logger
31+
exceededLogLimit bool
3132
}
3233

3334
type logDest interface {
@@ -50,6 +51,11 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
5051
}
5152
l.L.Lock()
5253
defer l.L.Unlock()
54+
if l.exceededLogLimit {
55+
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
56+
// don't error, as we also write to file and don't want the overall write to fail
57+
return nil
58+
}
5359
defer l.Broadcast()
5460
q, ok := l.queues[src]
5561
if !ok {
@@ -88,6 +94,8 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
8894
defer l.logger.Debug(ctx, "sendLoop exiting")
8995

9096
// wake 4 times per flush interval to check if anything needs to be flushed
97+
ctx, cancel := context.WithCancel(ctx)
98+
defer cancel()
9199
go func() {
92100
tkr := time.NewTicker(flushInterval / 4)
93101
defer tkr.Stop()
@@ -110,12 +118,18 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
110118
l.L.Lock()
111119
defer l.L.Unlock()
112120
for {
113-
for !ctxDone && !l.hasPendingWorkLocked() {
121+
for !ctxDone && !l.exceededLogLimit && !l.hasPendingWorkLocked() {
114122
l.Wait()
115123
}
116124
if ctxDone {
117125
return nil
118126
}
127+
if l.exceededLogLimit {
128+
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
129+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
130+
// error because we're already handled it
131+
return nil
132+
}
119133
src, q := l.getPendingWorkLocked()
120134
q.flushRequested = false // clear flag since we're now flushing
121135
req := &proto.BatchCreateLogsRequest{
@@ -125,11 +139,20 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
125139

126140
l.L.Unlock()
127141
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
128-
_, err := dest.BatchCreateLogs(ctx, req)
142+
resp, err := dest.BatchCreateLogs(ctx, req)
129143
l.L.Lock()
130144
if err != nil {
131145
return xerrors.Errorf("failed to upload logs: %w", err)
132146
}
147+
if resp.LogLimitExceeded {
148+
l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
149+
l.exceededLogLimit = true
150+
// no point in keeping anything we have queued around, server will not accept them
151+
l.queues = make(map[uuid.UUID]*logQueue)
152+
// We've handled the error as best as we can. We don't want the server limit to grind
153+
// other things to a halt, so this is all we can do.
154+
return nil
155+
}
133156

134157
// Since elsewhere we only append to the logs, here we can remove them
135158
// since we successfully sent them. First we nil the pointers though,

agent/logs_internal_test.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/coder/coder/v2/testutil"
2020
)
2121

22-
func TestLogSender(t *testing.T) {
22+
func TestLogSender_Mainline(t *testing.T) {
2323
t.Parallel()
2424
testCtx := testutil.Context(t, testutil.WaitShort)
2525
ctx, cancel := context.WithCancel(testCtx)
@@ -62,7 +62,9 @@ func TestLogSender(t *testing.T) {
6262
// both, although the order is not controlled
6363
var logReqs []*proto.BatchCreateLogsRequest
6464
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
65+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
6566
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
67+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
6668
for _, req := range logReqs {
6769
require.NotNil(t, req)
6870
srcID, err := uuid.FromBytes(req.LogSourceId)
@@ -97,6 +99,7 @@ func TestLogSender(t *testing.T) {
9799
require.NoError(t, err)
98100

99101
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
102+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
100103
// give ourselves a 25% buffer if we're right on the cusp of a tick
101104
require.LessOrEqual(t, time.Since(t1), flushInterval*5/4)
102105
require.NotNil(t, req)
@@ -118,8 +121,53 @@ func TestLogSender(t *testing.T) {
118121
require.NoError(t, err)
119122
}
120123

124+
func TestLogSender_LogLimitExceeded(t *testing.T) {
125+
t.Parallel()
126+
ctx := testutil.Context(t, testutil.WaitShort)
127+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
128+
fDest := newFakeLogDest()
129+
uut := newLogSender(logger)
130+
131+
t0 := dbtime.Now()
132+
133+
ls1 := uuid.UUID{0x11}
134+
err := uut.enqueue(ls1, agentsdk.Log{
135+
CreatedAt: t0,
136+
Output: "test log 0, src 1",
137+
Level: codersdk.LogLevelInfo,
138+
})
139+
require.NoError(t, err)
140+
141+
loopErr := make(chan error, 1)
142+
go func() {
143+
err := uut.sendLoop(ctx, fDest)
144+
loopErr <- err
145+
}()
146+
147+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
148+
require.NotNil(t, req)
149+
testutil.RequireSendCtx(ctx, t, fDest.resps,
150+
&proto.BatchCreateLogsResponse{LogLimitExceeded: true})
151+
152+
err = testutil.RequireRecvCtx(ctx, t, loopErr)
153+
require.NoError(t, err)
154+
155+
// we can still enqueue more logs after sendLoop returns, but they don't
156+
// actually get enqueued
157+
err = uut.enqueue(ls1, agentsdk.Log{
158+
CreatedAt: t0,
159+
Output: "test log 2, src 1",
160+
Level: codersdk.LogLevelTrace,
161+
})
162+
require.NoError(t, err)
163+
uut.L.Lock()
164+
defer uut.L.Unlock()
165+
require.Len(t, uut.queues, 0)
166+
}
167+
121168
type fakeLogDest struct {
122-
reqs chan *proto.BatchCreateLogsRequest
169+
reqs chan *proto.BatchCreateLogsRequest
170+
resps chan *proto.BatchCreateLogsResponse
123171
}
124172

125173
func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
@@ -130,12 +178,18 @@ func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreate
130178
case <-ctx.Done():
131179
return nil, ctx.Err()
132180
case f.reqs <- req:
133-
return &proto.BatchCreateLogsResponse{}, nil
181+
select {
182+
case <-ctx.Done():
183+
return nil, ctx.Err()
184+
case resp := <-f.resps:
185+
return resp, nil
186+
}
134187
}
135188
}
136189

137190
func newFakeLogDest() *fakeLogDest {
138191
return &fakeLogDest{
139-
reqs: make(chan *proto.BatchCreateLogsRequest),
192+
reqs: make(chan *proto.BatchCreateLogsRequest),
193+
resps: make(chan *proto.BatchCreateLogsResponse),
140194
}
141195
}

0 commit comments

Comments
 (0)