diff --git a/agent/logs.go b/agent/logs.go new file mode 100644 index 0000000000000..e5241bf36b4a1 --- /dev/null +++ b/agent/logs.go @@ -0,0 +1,239 @@ +package agent + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "github.com/coder/coder/v2/agent/proto" + "github.com/coder/coder/v2/codersdk/agentsdk" +) + +const ( + flushInterval = time.Second + maxBytesPerBatch = 1 << 20 // 1MiB + overheadPerLog = 21 // found by testing + + // maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken + // from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll + // accept in the database. + maxBytesQueued = 1048576 +) + +type logQueue struct { + logs []*proto.Log + flushRequested bool + lastFlush time.Time +} + +// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the +// agent API. Things that need to log call enqueue and flush. When the agent API becomes available, +// the agent calls sendLoop to send pending logs. +type logSender struct { + *sync.Cond + queues map[uuid.UUID]*logQueue + logger slog.Logger + exceededLogLimit bool + outputLen int +} + +type logDest interface { + BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) +} + +func newLogSender(logger slog.Logger) *logSender { + return &logSender{ + Cond: sync.NewCond(&sync.Mutex{}), + logger: logger, + queues: make(map[uuid.UUID]*logQueue), + } +} + +func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) { + logger := l.logger.With(slog.F("log_source_id", src)) + if len(logs) == 0 { + logger.Debug(context.Background(), "enqueue called with no logs") + return + } + l.L.Lock() + defer l.L.Unlock() + if l.exceededLogLimit { + logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit") + // don't error, as we also write to file and don't want the overall write to fail + return + } + defer l.Broadcast() + q, ok := l.queues[src] + if !ok { + q = &logQueue{} + l.queues[src] = q + } + for k, log := range logs { + // Here we check the queue size before adding a log because we want to queue up slightly + // more logs than the database would store to ensure we trigger "logs truncated" at the + // database layer. Otherwise, the end user wouldn't know logs are truncated unless they + // examined the Coder agent logs. + if l.outputLen > maxBytesQueued { + logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs))) + return + } + pl, err := agentsdk.ProtoFromLog(log) + if err != nil { + logger.Critical(context.Background(), "failed to convert log", slog.Error(err)) + return + } + if len(pl.Output)+overheadPerLog > maxBytesPerBatch { + logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output))) + continue + } + q.logs = append(q.logs, pl) + l.outputLen += len(pl.Output) + } + logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs))) +} + +func (l *logSender) flush(src uuid.UUID) { + l.L.Lock() + defer l.L.Unlock() + defer l.Broadcast() + q, ok := l.queues[src] + if ok { + q.flushRequested = true + } + // queue might not exist because it's already been flushed and removed from + // the map. +} + +// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not +// retry as it is expected that a higher layer retries establishing connection to the agent API and +// calls sendLoop again. +func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { + l.L.Lock() + defer l.L.Unlock() + if l.exceededLogLimit { + l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded") + // no point in keeping this loop going, if log limit is exceeded, but don't return an + // error because we're already handled it + return nil + } + + ctxDone := false + defer l.logger.Debug(ctx, "sendLoop exiting") + + // wake 4 times per flush interval to check if anything needs to be flushed + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + tkr := time.NewTicker(flushInterval / 4) + defer tkr.Stop() + for { + select { + // also monitor the context here, so we notice immediately, rather + // than waiting for the next tick or logs + case <-ctx.Done(): + l.L.Lock() + ctxDone = true + l.L.Unlock() + l.Broadcast() + return + case <-tkr.C: + l.Broadcast() + } + } + }() + + for { + for !ctxDone && !l.hasPendingWorkLocked() { + l.Wait() + } + if ctxDone { + return nil + } + + src, q := l.getPendingWorkLocked() + logger := l.logger.With(slog.F("log_source_id", src)) + q.flushRequested = false // clear flag since we're now flushing + req := &proto.BatchCreateLogsRequest{ + LogSourceId: src[:], + } + + // outputToSend keeps track of the size of the protobuf message we send, while + // outputToRemove keeps track of the size of the output we'll remove from the queues on + // success. They are different because outputToSend also counts protocol message overheads. + outputToSend := 0 + outputToRemove := 0 + n := 0 + for n < len(q.logs) { + log := q.logs[n] + outputToSend += len(log.Output) + overheadPerLog + if outputToSend > maxBytesPerBatch { + break + } + req.Logs = append(req.Logs, log) + n++ + outputToRemove += len(log.Output) + } + + l.L.Unlock() + logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs))) + resp, err := dest.BatchCreateLogs(ctx, req) + l.L.Lock() + if err != nil { + return xerrors.Errorf("failed to upload logs: %w", err) + } + if resp.LogLimitExceeded { + l.logger.Warn(ctx, "server log limit exceeded; logs truncated") + l.exceededLogLimit = true + // no point in keeping anything we have queued around, server will not accept them + l.queues = make(map[uuid.UUID]*logQueue) + // We've handled the error as best as we can. We don't want the server limit to grind + // other things to a halt, so this is all we can do. + return nil + } + + // Since elsewhere we only append to the logs, here we can remove them + // since we successfully sent them. First we nil the pointers though, + // so that they can be gc'd. + for i := 0; i < n; i++ { + q.logs[i] = nil + } + q.logs = q.logs[n:] + l.outputLen -= outputToRemove + if len(q.logs) == 0 { + // no empty queues + delete(l.queues, src) + continue + } + q.lastFlush = time.Now() + } +} + +func (l *logSender) hasPendingWorkLocked() bool { + for _, q := range l.queues { + if time.Since(q.lastFlush) > flushInterval { + return true + } + if q.flushRequested { + return true + } + } + return false +} + +func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) { + // take the one it's been the longest since we've flushed, so that we have some sense of + // fairness across sources + var earliestFlush time.Time + for is, iq := range l.queues { + if q == nil || iq.lastFlush.Before(earliestFlush) { + src = is + q = iq + earliestFlush = iq.lastFlush + } + } + return src, q +} diff --git a/agent/logs_internal_test.go b/agent/logs_internal_test.go new file mode 100644 index 0000000000000..d688ed4a8d468 --- /dev/null +++ b/agent/logs_internal_test.go @@ -0,0 +1,400 @@ +package agent + +import ( + "context" + "testing" + "time" + + "golang.org/x/xerrors" + + "golang.org/x/exp/slices" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + protobuf "google.golang.org/protobuf/proto" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/agent/proto" + "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/coder/v2/testutil" +) + +func TestLogSender_Mainline(t *testing.T) { + t.Parallel() + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fDest := newFakeLogDest() + uut := newLogSender(logger) + + t0 := dbtime.Now() + + ls1 := uuid.UUID{0x11} + uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t0, + Output: "test log 0, src 1", + Level: codersdk.LogLevelInfo, + }) + + ls2 := uuid.UUID{0x22} + uut.enqueue(ls2, + agentsdk.Log{ + CreatedAt: t0, + Output: "test log 0, src 2", + Level: codersdk.LogLevelError, + }, + agentsdk.Log{ + CreatedAt: t0, + Output: "test log 1, src 2", + Level: codersdk.LogLevelWarn, + }, + ) + + loopErr := make(chan error, 1) + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + + // since neither source has even been flushed, it should immediately flush + // both, although the order is not controlled + var logReqs []*proto.BatchCreateLogsRequest + logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs)) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) + logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs)) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) + for _, req := range logReqs { + require.NotNil(t, req) + srcID, err := uuid.FromBytes(req.LogSourceId) + require.NoError(t, err) + switch srcID { + case ls1: + require.Len(t, req.Logs, 1) + require.Equal(t, "test log 0, src 1", req.Logs[0].GetOutput()) + require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel()) + require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime()) + case ls2: + require.Len(t, req.Logs, 2) + require.Equal(t, "test log 0, src 2", req.Logs[0].GetOutput()) + require.Equal(t, proto.Log_ERROR, req.Logs[0].GetLevel()) + require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime()) + require.Equal(t, "test log 1, src 2", req.Logs[1].GetOutput()) + require.Equal(t, proto.Log_WARN, req.Logs[1].GetLevel()) + require.Equal(t, t0, req.Logs[1].GetCreatedAt().AsTime()) + default: + t.Fatal("unknown log source") + } + } + + t1 := dbtime.Now() + uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t1, + Output: "test log 1, src 1", + Level: codersdk.LogLevelDebug, + }) + uut.flush(ls1) + + req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) + // give ourselves a 25% buffer if we're right on the cusp of a tick + require.LessOrEqual(t, time.Since(t1), flushInterval*5/4) + require.NotNil(t, req) + require.Len(t, req.Logs, 1) + require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput()) + require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel()) + require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime()) + + cancel() + err := testutil.RequireRecvCtx(testCtx, t, loopErr) + require.NoError(t, err) + + // we can still enqueue more logs after sendLoop returns + uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t1, + Output: "test log 2, src 1", + Level: codersdk.LogLevelTrace, + }) +} + +func TestLogSender_LogLimitExceeded(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fDest := newFakeLogDest() + uut := newLogSender(logger) + + t0 := dbtime.Now() + + ls1 := uuid.UUID{0x11} + uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t0, + Output: "test log 0, src 1", + Level: codersdk.LogLevelInfo, + }) + + loopErr := make(chan error, 1) + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + + req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + require.NotNil(t, req) + testutil.RequireSendCtx(ctx, t, fDest.resps, + &proto.BatchCreateLogsResponse{LogLimitExceeded: true}) + + err := testutil.RequireRecvCtx(ctx, t, loopErr) + require.NoError(t, err) + + // we can still enqueue more logs after sendLoop returns, but they don't + // actually get enqueued + uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t0, + Output: "test log 2, src 1", + Level: codersdk.LogLevelTrace, + }) + uut.L.Lock() + require.Len(t, uut.queues, 0) + uut.L.Unlock() + + // Also, if we run sendLoop again, it should immediately exit. + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + err = testutil.RequireRecvCtx(ctx, t, loopErr) + require.NoError(t, err) +} + +func TestLogSender_SkipHugeLog(t *testing.T) { + t.Parallel() + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fDest := newFakeLogDest() + uut := newLogSender(logger) + + t0 := dbtime.Now() + ls1 := uuid.UUID{0x11} + // since we add some overhead to the actual length of the output, a log just + // under the perBatch limit will not be accepted. + hugeLog := make([]byte, maxBytesPerBatch-1) + for i := range hugeLog { + hugeLog[i] = 'q' + } + uut.enqueue(ls1, + agentsdk.Log{ + CreatedAt: t0, + Output: string(hugeLog), + Level: codersdk.LogLevelInfo, + }, + agentsdk.Log{ + CreatedAt: t0, + Output: "test log 1, src 1", + Level: codersdk.LogLevelInfo, + }) + + loopErr := make(chan error, 1) + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + + req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + require.NotNil(t, req) + require.Len(t, req.Logs, 1, "it should skip the huge log") + require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput()) + require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel()) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) + + cancel() + err := testutil.RequireRecvCtx(testCtx, t, loopErr) + require.NoError(t, err) +} + +func TestLogSender_Batch(t *testing.T) { + t.Parallel() + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fDest := newFakeLogDest() + uut := newLogSender(logger) + + t0 := dbtime.Now() + ls1 := uuid.UUID{0x11} + var logs []agentsdk.Log + for i := 0; i < 60000; i++ { + logs = append(logs, agentsdk.Log{ + CreatedAt: t0, + Output: "r", + Level: codersdk.LogLevelInfo, + }) + } + uut.enqueue(ls1, logs...) + + loopErr := make(chan error, 1) + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + + // with 60k logs, we should split into two updates to avoid going over 1MiB, since each log + // is about 21 bytes. + gotLogs := 0 + req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + require.NotNil(t, req) + gotLogs += len(req.Logs) + wire, err := protobuf.Marshal(req) + require.NoError(t, err) + require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB") + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) + req = testutil.RequireRecvCtx(ctx, t, fDest.reqs) + require.NotNil(t, req) + gotLogs += len(req.Logs) + wire, err = protobuf.Marshal(req) + require.NoError(t, err) + require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB") + require.Equal(t, 60000, gotLogs) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) + + cancel() + err = testutil.RequireRecvCtx(testCtx, t, loopErr) + require.NoError(t, err) +} + +func TestLogSender_MaxQueuedLogs(t *testing.T) { + t.Parallel() + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fDest := newFakeLogDest() + uut := newLogSender(logger) + + t0 := dbtime.Now() + ls1 := uuid.UUID{0x11} + n := 4 + hugeLog := make([]byte, maxBytesQueued/n) + for i := range hugeLog { + hugeLog[i] = 'q' + } + var logs []agentsdk.Log + for i := 0; i < n; i++ { + logs = append(logs, agentsdk.Log{ + CreatedAt: t0, + Output: string(hugeLog), + Level: codersdk.LogLevelInfo, + }) + } + uut.enqueue(ls1, logs...) + + // we're now right at the limit of output + require.Equal(t, maxBytesQueued, uut.outputLen) + + // adding more logs should not error... + ls2 := uuid.UUID{0x22} + uut.enqueue(ls2, logs...) + + loopErr := make(chan error, 1) + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + + // It should still queue up one log from source #2, so that we would exceed the database + // limit. These come over a total of 3 updates, because due to overhead, the n logs from source + // #1 come in 2 updates, plus 1 update for source #2. + logsBySource := make(map[uuid.UUID]int) + for i := 0; i < 3; i++ { + req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + require.NotNil(t, req) + srcID, err := uuid.FromBytes(req.LogSourceId) + require.NoError(t, err) + logsBySource[srcID] += len(req.Logs) + testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) + } + require.Equal(t, map[uuid.UUID]int{ + ls1: n, + ls2: 1, + }, logsBySource) + + cancel() + err := testutil.RequireRecvCtx(testCtx, t, loopErr) + require.NoError(t, err) +} + +func TestLogSender_SendError(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fDest := newFakeLogDest() + expectedErr := xerrors.New("test") + fDest.err = expectedErr + uut := newLogSender(logger) + + t0 := dbtime.Now() + + ls1 := uuid.UUID{0x11} + uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t0, + Output: "test log 0, src 1", + Level: codersdk.LogLevelInfo, + }) + + loopErr := make(chan error, 1) + go func() { + err := uut.sendLoop(ctx, fDest) + loopErr <- err + }() + + req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) + require.NotNil(t, req) + + err := testutil.RequireRecvCtx(ctx, t, loopErr) + require.ErrorIs(t, err, expectedErr) + + // we can still enqueue more logs after sendLoop returns + uut.enqueue(ls1, agentsdk.Log{ + CreatedAt: t0, + Output: "test log 2, src 1", + Level: codersdk.LogLevelTrace, + }) + uut.L.Lock() + require.Len(t, uut.queues, 1) + uut.L.Unlock() +} + +type fakeLogDest struct { + reqs chan *proto.BatchCreateLogsRequest + resps chan *proto.BatchCreateLogsResponse + err error +} + +func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) { + // clone the logs so that modifications the sender makes don't affect our tests. In production + // these would be serialized/deserialized so we don't have to worry too much. + req.Logs = slices.Clone(req.Logs) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case f.reqs <- req: + if f.err != nil { + return nil, f.err + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-f.resps: + return resp, nil + } + } +} + +func newFakeLogDest() *fakeLogDest { + return &fakeLogDest{ + reqs: make(chan *proto.BatchCreateLogsRequest), + resps: make(chan *proto.BatchCreateLogsResponse), + } +} diff --git a/codersdk/agentsdk/convert.go b/codersdk/agentsdk/convert.go index 584aa158f2970..9628f1d05eb49 100644 --- a/codersdk/agentsdk/convert.go +++ b/codersdk/agentsdk/convert.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "golang.org/x/xerrors" "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" @@ -298,3 +299,15 @@ func ProtoFromAppHealthsRequest(req PostAppHealthsRequest) (*proto.BatchUpdateAp } return pReq, nil } + +func ProtoFromLog(log Log) (*proto.Log, error) { + lvl, ok := proto.Log_Level_value[strings.ToUpper(string(log.Level))] + if !ok { + return nil, xerrors.Errorf("unknown log level: %s", log.Level) + } + return &proto.Log{ + CreatedAt: timestamppb.New(log.CreatedAt), + Output: log.Output, + Level: proto.Log_Level(lvl), + }, nil +}