Skip to content

feat: handle log limit exceeded in logSender #12079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions agent/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type logQueue struct {
// the agent calls sendLoop to send pending logs.
type logSender struct {
*sync.Cond
queues map[uuid.UUID]*logQueue
logger slog.Logger
queues map[uuid.UUID]*logQueue
logger slog.Logger
exceededLogLimit bool
}

type logDest interface {
Expand All @@ -50,6 +51,11 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
}
l.L.Lock()
defer l.L.Unlock()
if l.exceededLogLimit {
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
// don't error, as we also write to file and don't want the overall write to fail
return nil
}
defer l.Broadcast()
q, ok := l.queues[src]
if !ok {
Expand Down Expand Up @@ -88,6 +94,8 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
defer l.logger.Debug(ctx, "sendLoop exiting")

// wake 4 times per flush interval to check if anything needs to be flushed
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
tkr := time.NewTicker(flushInterval / 4)
defer tkr.Stop()
Expand All @@ -110,12 +118,18 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
l.L.Lock()
defer l.L.Unlock()
for {
for !ctxDone && !l.hasPendingWorkLocked() {
for !ctxDone && !l.exceededLogLimit && !l.hasPendingWorkLocked() {
l.Wait()
}
if ctxDone {
return nil
}
if l.exceededLogLimit {
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
// no point in keeping this loop going, if log limit is exceeded, but don't return an
// error because we're already handled it
return nil
}
src, q := l.getPendingWorkLocked()
q.flushRequested = false // clear flag since we're now flushing
req := &proto.BatchCreateLogsRequest{
Expand All @@ -125,11 +139,20 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {

l.L.Unlock()
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
_, err := dest.BatchCreateLogs(ctx, req)
resp, err := dest.BatchCreateLogs(ctx, req)
l.L.Lock()
if err != nil {
return xerrors.Errorf("failed to upload logs: %w", err)
}
if resp.LogLimitExceeded {
l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
l.exceededLogLimit = true
// no point in keeping anything we have queued around, server will not accept them
l.queues = make(map[uuid.UUID]*logQueue)
// We've handled the error as best as we can. We don't want the server limit to grind
// other things to a halt, so this is all we can do.
return nil
}

// Since elsewhere we only append to the logs, here we can remove them
// since we successfully sent them. First we nil the pointers though,
Expand Down
62 changes: 58 additions & 4 deletions agent/logs_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/coder/coder/v2/testutil"
)

func TestLogSender(t *testing.T) {
func TestLogSender_Mainline(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
Expand Down Expand Up @@ -62,7 +62,9 @@ func TestLogSender(t *testing.T) {
// both, although the order is not controlled
var logReqs []*proto.BatchCreateLogsRequest
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
for _, req := range logReqs {
require.NotNil(t, req)
srcID, err := uuid.FromBytes(req.LogSourceId)
Expand Down Expand Up @@ -97,6 +99,7 @@ func TestLogSender(t *testing.T) {
require.NoError(t, err)

req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
// give ourselves a 25% buffer if we're right on the cusp of a tick
require.LessOrEqual(t, time.Since(t1), flushInterval*5/4)
require.NotNil(t, req)
Expand All @@ -118,8 +121,53 @@ func TestLogSender(t *testing.T) {
require.NoError(t, err)
}

func TestLogSender_LogLimitExceeded(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := newLogSender(logger)

t0 := dbtime.Now()

ls1 := uuid.UUID{0x11}
err := uut.enqueue(ls1, agentsdk.Log{
CreatedAt: t0,
Output: "test log 0, src 1",
Level: codersdk.LogLevelInfo,
})
require.NoError(t, err)

loopErr := make(chan error, 1)
go func() {
err := uut.sendLoop(ctx, fDest)
loopErr <- err
}()

req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
testutil.RequireSendCtx(ctx, t, fDest.resps,
&proto.BatchCreateLogsResponse{LogLimitExceeded: true})

err = testutil.RequireRecvCtx(ctx, t, loopErr)
require.NoError(t, err)

// we can still enqueue more logs after sendLoop returns, but they don't
// actually get enqueued
err = uut.enqueue(ls1, agentsdk.Log{
CreatedAt: t0,
Output: "test log 2, src 1",
Level: codersdk.LogLevelTrace,
})
require.NoError(t, err)
uut.L.Lock()
defer uut.L.Unlock()
require.Len(t, uut.queues, 0)
}

type fakeLogDest struct {
reqs chan *proto.BatchCreateLogsRequest
reqs chan *proto.BatchCreateLogsRequest
resps chan *proto.BatchCreateLogsResponse
}

func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
Expand All @@ -130,12 +178,18 @@ func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreate
case <-ctx.Done():
return nil, ctx.Err()
case f.reqs <- req:
return &proto.BatchCreateLogsResponse{}, nil
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-f.resps:
return resp, nil
}
}
}

func newFakeLogDest() *fakeLogDest {
return &fakeLogDest{
reqs: make(chan *proto.BatchCreateLogsRequest),
reqs: make(chan *proto.BatchCreateLogsRequest),
resps: make(chan *proto.BatchCreateLogsResponse),
}
}