diff --git a/agent/logs.go b/agent/logs.go deleted file mode 100644 index e5241bf36b4a1..0000000000000 --- a/agent/logs.go +++ /dev/null @@ -1,239 +0,0 @@ -package agent - -import ( - "context" - "sync" - "time" - - "github.com/google/uuid" - "golang.org/x/xerrors" - - "cdr.dev/slog" - "github.com/coder/coder/v2/agent/proto" - "github.com/coder/coder/v2/codersdk/agentsdk" -) - -const ( - flushInterval = time.Second - maxBytesPerBatch = 1 << 20 // 1MiB - overheadPerLog = 21 // found by testing - - // maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken - // from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll - // accept in the database. - maxBytesQueued = 1048576 -) - -type logQueue struct { - logs []*proto.Log - flushRequested bool - lastFlush time.Time -} - -// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the -// agent API. Things that need to log call enqueue and flush. When the agent API becomes available, -// the agent calls sendLoop to send pending logs. -type logSender struct { - *sync.Cond - queues map[uuid.UUID]*logQueue - logger slog.Logger - exceededLogLimit bool - outputLen int -} - -type logDest interface { - BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) -} - -func newLogSender(logger slog.Logger) *logSender { - return &logSender{ - Cond: sync.NewCond(&sync.Mutex{}), - logger: logger, - queues: make(map[uuid.UUID]*logQueue), - } -} - -func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) { - logger := l.logger.With(slog.F("log_source_id", src)) - if len(logs) == 0 { - logger.Debug(context.Background(), "enqueue called with no logs") - return - } - l.L.Lock() - defer l.L.Unlock() - if l.exceededLogLimit { - logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit") - // don't error, as we also write to file and don't want the overall write to fail - return - } - defer l.Broadcast() - q, ok := l.queues[src] - if !ok { - q = &logQueue{} - l.queues[src] = q - } - for k, log := range logs { - // Here we check the queue size before adding a log because we want to queue up slightly - // more logs than the database would store to ensure we trigger "logs truncated" at the - // database layer. Otherwise, the end user wouldn't know logs are truncated unless they - // examined the Coder agent logs. - if l.outputLen > maxBytesQueued { - logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs))) - return - } - pl, err := agentsdk.ProtoFromLog(log) - if err != nil { - logger.Critical(context.Background(), "failed to convert log", slog.Error(err)) - return - } - if len(pl.Output)+overheadPerLog > maxBytesPerBatch { - logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output))) - continue - } - q.logs = append(q.logs, pl) - l.outputLen += len(pl.Output) - } - logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs))) -} - -func (l *logSender) flush(src uuid.UUID) { - l.L.Lock() - defer l.L.Unlock() - defer l.Broadcast() - q, ok := l.queues[src] - if ok { - q.flushRequested = true - } - // queue might not exist because it's already been flushed and removed from - // the map. -} - -// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not -// retry as it is expected that a higher layer retries establishing connection to the agent API and -// calls sendLoop again. -func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { - l.L.Lock() - defer l.L.Unlock() - if l.exceededLogLimit { - l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded") - // no point in keeping this loop going, if log limit is exceeded, but don't return an - // error because we're already handled it - return nil - } - - ctxDone := false - defer l.logger.Debug(ctx, "sendLoop exiting") - - // wake 4 times per flush interval to check if anything needs to be flushed - ctx, cancel := context.WithCancel(ctx) - defer cancel() - go func() { - tkr := time.NewTicker(flushInterval / 4) - defer tkr.Stop() - for { - select { - // also monitor the context here, so we notice immediately, rather - // than waiting for the next tick or logs - case <-ctx.Done(): - l.L.Lock() - ctxDone = true - l.L.Unlock() - l.Broadcast() - return - case <-tkr.C: - l.Broadcast() - } - } - }() - - for { - for !ctxDone && !l.hasPendingWorkLocked() { - l.Wait() - } - if ctxDone { - return nil - } - - src, q := l.getPendingWorkLocked() - logger := l.logger.With(slog.F("log_source_id", src)) - q.flushRequested = false // clear flag since we're now flushing - req := &proto.BatchCreateLogsRequest{ - LogSourceId: src[:], - } - - // outputToSend keeps track of the size of the protobuf message we send, while - // outputToRemove keeps track of the size of the output we'll remove from the queues on - // success. They are different because outputToSend also counts protocol message overheads. - outputToSend := 0 - outputToRemove := 0 - n := 0 - for n < len(q.logs) { - log := q.logs[n] - outputToSend += len(log.Output) + overheadPerLog - if outputToSend > maxBytesPerBatch { - break - } - req.Logs = append(req.Logs, log) - n++ - outputToRemove += len(log.Output) - } - - l.L.Unlock() - logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs))) - resp, err := dest.BatchCreateLogs(ctx, req) - l.L.Lock() - if err != nil { - return xerrors.Errorf("failed to upload logs: %w", err) - } - if resp.LogLimitExceeded { - l.logger.Warn(ctx, "server log limit exceeded; logs truncated") - l.exceededLogLimit = true - // no point in keeping anything we have queued around, server will not accept them - l.queues = make(map[uuid.UUID]*logQueue) - // We've handled the error as best as we can. We don't want the server limit to grind - // other things to a halt, so this is all we can do. - return nil - } - - // Since elsewhere we only append to the logs, here we can remove them - // since we successfully sent them. First we nil the pointers though, - // so that they can be gc'd. - for i := 0; i < n; i++ { - q.logs[i] = nil - } - q.logs = q.logs[n:] - l.outputLen -= outputToRemove - if len(q.logs) == 0 { - // no empty queues - delete(l.queues, src) - continue - } - q.lastFlush = time.Now() - } -} - -func (l *logSender) hasPendingWorkLocked() bool { - for _, q := range l.queues { - if time.Since(q.lastFlush) > flushInterval { - return true - } - if q.flushRequested { - return true - } - } - return false -} - -func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) { - // take the one it's been the longest since we've flushed, so that we have some sense of - // fairness across sources - var earliestFlush time.Time - for is, iq := range l.queues { - if q == nil || iq.lastFlush.Before(earliestFlush) { - src = is - q = iq - earliestFlush = iq.lastFlush - } - } - return src, q -} diff --git a/codersdk/agentsdk/logs.go b/codersdk/agentsdk/logs.go index e7b86194cd1cb..a28aede7bd177 100644 --- a/codersdk/agentsdk/logs.go +++ b/codersdk/agentsdk/logs.go @@ -6,17 +6,31 @@ import ( "errors" "io" "net/http" + "sync" "time" - "golang.org/x/xerrors" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/google/uuid" + "golang.org/x/xerrors" "cdr.dev/slog" + "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" "github.com/coder/retry" ) +const ( + flushInterval = time.Second + maxBytesPerBatch = 1 << 20 // 1MiB + overheadPerLog = 21 // found by testing + + // maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken + // from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll + // accept in the database. + maxBytesQueued = 1048576 +) + type startupLogsWriter struct { buf bytes.Buffer // Buffer to track partial lines. ctx context.Context @@ -107,6 +121,8 @@ type logsSenderOptions struct { // has been called. Calling sendLog concurrently is not supported. If // the context passed to flushAndClose is canceled, any remaining logs // will be discarded. +// +// Deprecated: Use NewLogSender instead, based on the v2 Agent API. func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger, opts ...func(*logsSenderOptions)) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) { o := logsSenderOptions{ flushTimeout: 250 * time.Millisecond, @@ -250,3 +266,248 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc } return sendLog, flushAndClose } + +type logQueue struct { + logs []*proto.Log + flushRequested bool + lastFlush time.Time +} + +// LogSender is a component that handles enqueuing logs and then sending them over the agent API. +// Things that need to log call Enqueue and Flush. When the agent API becomes available, call +// SendLoop to send pending logs. +type LogSender struct { + *sync.Cond + queues map[uuid.UUID]*logQueue + logger slog.Logger + exceededLogLimit bool + outputLen int +} + +type logDest interface { + BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) +} + +func NewLogSender(logger slog.Logger) *LogSender { + return &LogSender{ + Cond: sync.NewCond(&sync.Mutex{}), + logger: logger, + queues: make(map[uuid.UUID]*logQueue), + } +} + +func (l *LogSender) Enqueue(src uuid.UUID, logs ...Log) { + logger := l.logger.With(slog.F("log_source_id", src)) + if len(logs) == 0 { + logger.Debug(context.Background(), "enqueue called with no logs") + return + } + l.L.Lock() + defer l.L.Unlock() + if l.exceededLogLimit { + logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit") + // don't error, as we also write to file and don't want the overall write to fail + return + } + defer l.Broadcast() + q, ok := l.queues[src] + if !ok { + q = &logQueue{} + l.queues[src] = q + } + for k, log := range logs { + // Here we check the queue size before adding a log because we want to queue up slightly + // more logs than the database would store to ensure we trigger "logs truncated" at the + // database layer. Otherwise, the end user wouldn't know logs are truncated unless they + // examined the Coder agent logs. + if l.outputLen > maxBytesQueued { + logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs))) + return + } + pl, err := ProtoFromLog(log) + if err != nil { + logger.Critical(context.Background(), "failed to convert log", slog.Error(err)) + pl = &proto.Log{ + CreatedAt: timestamppb.Now(), + Level: proto.Log_ERROR, + Output: "**Coder Internal Error**: Failed to convert log", + } + } + if len(pl.Output)+overheadPerLog > maxBytesPerBatch { + logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output))) + continue + } + q.logs = append(q.logs, pl) + l.outputLen += len(pl.Output) + } + logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs))) +} + +func (l *LogSender) Flush(src uuid.UUID) { + l.L.Lock() + defer l.L.Unlock() + defer l.Broadcast() + q, ok := l.queues[src] + if ok { + q.flushRequested = true + } + // queue might not exist because it's already been flushed and removed from + // the map. +} + +var LogLimitExceededError = xerrors.New("Log limit exceeded") + +// SendLoop sends any pending logs until it hits an error or the context is canceled. It does not +// retry as it is expected that a higher layer retries establishing connection to the agent API and +// calls SendLoop again. +func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error { + l.L.Lock() + defer l.L.Unlock() + if l.exceededLogLimit { + l.logger.Debug(ctx, "aborting SendLoop because log limit is already exceeded") + return LogLimitExceededError + } + + ctxDone := false + defer l.logger.Debug(ctx, "log sender send loop exiting") + + // wake 4 times per Flush interval to check if anything needs to be flushed + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + tkr := time.NewTicker(flushInterval / 4) + defer tkr.Stop() + for { + select { + // also monitor the context here, so we notice immediately, rather + // than waiting for the next tick or logs + case <-ctx.Done(): + l.L.Lock() + ctxDone = true + l.L.Unlock() + l.Broadcast() + return + case <-tkr.C: + l.Broadcast() + } + } + }() + + for { + for !ctxDone && !l.hasPendingWorkLocked() { + l.Wait() + } + if ctxDone { + return ctx.Err() + } + + src, q := l.getPendingWorkLocked() + logger := l.logger.With(slog.F("log_source_id", src)) + q.flushRequested = false // clear flag since we're now flushing + req := &proto.BatchCreateLogsRequest{ + LogSourceId: src[:], + } + + // outputToSend keeps track of the size of the protobuf message we send, while + // outputToRemove keeps track of the size of the output we'll remove from the queues on + // success. They are different because outputToSend also counts protocol message overheads. + outputToSend := 0 + outputToRemove := 0 + n := 0 + for n < len(q.logs) { + log := q.logs[n] + outputToSend += len(log.Output) + overheadPerLog + if outputToSend > maxBytesPerBatch { + break + } + req.Logs = append(req.Logs, log) + n++ + outputToRemove += len(log.Output) + } + + l.L.Unlock() + logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs))) + resp, err := dest.BatchCreateLogs(ctx, req) + l.L.Lock() + if err != nil { + return xerrors.Errorf("failed to upload logs: %w", err) + } + if resp.LogLimitExceeded { + l.logger.Warn(ctx, "server log limit exceeded; logs truncated") + l.exceededLogLimit = true + // no point in keeping anything we have queued around, server will not accept them + l.queues = make(map[uuid.UUID]*logQueue) + return LogLimitExceededError + } + + // Since elsewhere we only append to the logs, here we can remove them + // since we successfully sent them. First we nil the pointers though, + // so that they can be gc'd. + for i := 0; i < n; i++ { + q.logs[i] = nil + } + q.logs = q.logs[n:] + l.outputLen -= outputToRemove + if len(q.logs) == 0 { + // no empty queues + delete(l.queues, src) + continue + } + q.lastFlush = time.Now() + } +} + +func (l *LogSender) hasPendingWorkLocked() bool { + for _, q := range l.queues { + if time.Since(q.lastFlush) > flushInterval { + return true + } + if q.flushRequested { + return true + } + } + return false +} + +func (l *LogSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) { + // take the one it's been the longest since we've flushed, so that we have some sense of + // fairness across sources + var earliestFlush time.Time + for is, iq := range l.queues { + if q == nil || iq.lastFlush.Before(earliestFlush) { + src = is + q = iq + earliestFlush = iq.lastFlush + } + } + return src, q +} + +func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger { + return ScriptLogger{srcID: logSourceID, sender: l} +} + +type ScriptLogger struct { + sender *LogSender + srcID uuid.UUID +} + +func (s ScriptLogger) Send(ctx context.Context, log ...Log) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + s.sender.Enqueue(s.srcID, log...) + return nil + } +} + +func (s ScriptLogger) Flush(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + s.sender.Flush(s.srcID) + return nil + } +} diff --git a/agent/logs_internal_test.go b/codersdk/agentsdk/logs_internal_test.go similarity index 87% rename from agent/logs_internal_test.go rename to codersdk/agentsdk/logs_internal_test.go index d688ed4a8d468..527ebbea81f64 100644 --- a/agent/logs_internal_test.go +++ b/codersdk/agentsdk/logs_internal_test.go @@ -1,16 +1,14 @@ -package agent +package agentsdk import ( "context" "testing" "time" - "golang.org/x/xerrors" - - "golang.org/x/exp/slices" - "github.com/google/uuid" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + "golang.org/x/xerrors" protobuf "google.golang.org/protobuf/proto" "cdr.dev/slog" @@ -18,7 +16,6 @@ import ( "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/testutil" ) @@ -28,25 +25,25 @@ func TestLogSender_Mainline(t *testing.T) { ctx, cancel := context.WithCancel(testCtx) logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) fDest := newFakeLogDest() - uut := newLogSender(logger) + uut := NewLogSender(logger) t0 := dbtime.Now() ls1 := uuid.UUID{0x11} - uut.enqueue(ls1, agentsdk.Log{ + uut.Enqueue(ls1, Log{ CreatedAt: t0, Output: "test log 0, src 1", Level: codersdk.LogLevelInfo, }) ls2 := uuid.UUID{0x22} - uut.enqueue(ls2, - agentsdk.Log{ + uut.Enqueue(ls2, + Log{ CreatedAt: t0, Output: "test log 0, src 2", Level: codersdk.LogLevelError, }, - agentsdk.Log{ + Log{ CreatedAt: t0, Output: "test log 1, src 2", Level: codersdk.LogLevelWarn, @@ -55,11 +52,11 @@ func TestLogSender_Mainline(t *testing.T) { loopErr := make(chan error, 1) go func() { - err := uut.sendLoop(ctx, fDest) + err := uut.SendLoop(ctx, fDest) loopErr <- err }() - // since neither source has even been flushed, it should immediately flush + // since neither source has even been flushed, it should immediately Flush // both, although the order is not controlled var logReqs []*proto.BatchCreateLogsRequest logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs)) @@ -90,12 +87,12 @@ func TestLogSender_Mainline(t *testing.T) { } t1 := dbtime.Now() - uut.enqueue(ls1, agentsdk.Log{ + uut.Enqueue(ls1, Log{ CreatedAt: t1, Output: "test log 1, src 1", Level: codersdk.LogLevelDebug, }) - uut.flush(ls1) + uut.Flush(ls1) req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{}) @@ -109,10 +106,10 @@ func TestLogSender_Mainline(t *testing.T) { cancel() err := testutil.RequireRecvCtx(testCtx, t, loopErr) - require.NoError(t, err) + require.ErrorIs(t, err, context.Canceled) - // we can still enqueue more logs after sendLoop returns - uut.enqueue(ls1, agentsdk.Log{ + // we can still enqueue more logs after SendLoop returns + uut.Enqueue(ls1, Log{ CreatedAt: t1, Output: "test log 2, src 1", Level: codersdk.LogLevelTrace, @@ -124,12 +121,12 @@ func TestLogSender_LogLimitExceeded(t *testing.T) { ctx := testutil.Context(t, testutil.WaitShort) logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) fDest := newFakeLogDest() - uut := newLogSender(logger) + uut := NewLogSender(logger) t0 := dbtime.Now() ls1 := uuid.UUID{0x11} - uut.enqueue(ls1, agentsdk.Log{ + uut.Enqueue(ls1, Log{ CreatedAt: t0, Output: "test log 0, src 1", Level: codersdk.LogLevelInfo, @@ -137,7 +134,7 @@ func TestLogSender_LogLimitExceeded(t *testing.T) { loopErr := make(chan error, 1) go func() { - err := uut.sendLoop(ctx, fDest) + err := uut.SendLoop(ctx, fDest) loopErr <- err }() @@ -147,11 +144,11 @@ func TestLogSender_LogLimitExceeded(t *testing.T) { &proto.BatchCreateLogsResponse{LogLimitExceeded: true}) err := testutil.RequireRecvCtx(ctx, t, loopErr) - require.NoError(t, err) + require.ErrorIs(t, err, LogLimitExceededError) - // we can still enqueue more logs after sendLoop returns, but they don't + // we can still enqueue more logs after SendLoop returns, but they don't // actually get enqueued - uut.enqueue(ls1, agentsdk.Log{ + uut.Enqueue(ls1, Log{ CreatedAt: t0, Output: "test log 2, src 1", Level: codersdk.LogLevelTrace, @@ -160,13 +157,13 @@ func TestLogSender_LogLimitExceeded(t *testing.T) { require.Len(t, uut.queues, 0) uut.L.Unlock() - // Also, if we run sendLoop again, it should immediately exit. + // Also, if we run SendLoop again, it should immediately exit. go func() { - err := uut.sendLoop(ctx, fDest) + err := uut.SendLoop(ctx, fDest) loopErr <- err }() err = testutil.RequireRecvCtx(ctx, t, loopErr) - require.NoError(t, err) + require.ErrorIs(t, err, LogLimitExceededError) } func TestLogSender_SkipHugeLog(t *testing.T) { @@ -175,7 +172,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) { ctx, cancel := context.WithCancel(testCtx) logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) fDest := newFakeLogDest() - uut := newLogSender(logger) + uut := NewLogSender(logger) t0 := dbtime.Now() ls1 := uuid.UUID{0x11} @@ -185,13 +182,13 @@ func TestLogSender_SkipHugeLog(t *testing.T) { for i := range hugeLog { hugeLog[i] = 'q' } - uut.enqueue(ls1, - agentsdk.Log{ + uut.Enqueue(ls1, + Log{ CreatedAt: t0, Output: string(hugeLog), Level: codersdk.LogLevelInfo, }, - agentsdk.Log{ + Log{ CreatedAt: t0, Output: "test log 1, src 1", Level: codersdk.LogLevelInfo, @@ -199,7 +196,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) { loopErr := make(chan error, 1) go func() { - err := uut.sendLoop(ctx, fDest) + err := uut.SendLoop(ctx, fDest) loopErr <- err }() @@ -212,7 +209,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) { cancel() err := testutil.RequireRecvCtx(testCtx, t, loopErr) - require.NoError(t, err) + require.ErrorIs(t, err, context.Canceled) } func TestLogSender_Batch(t *testing.T) { @@ -221,23 +218,23 @@ func TestLogSender_Batch(t *testing.T) { ctx, cancel := context.WithCancel(testCtx) logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) fDest := newFakeLogDest() - uut := newLogSender(logger) + uut := NewLogSender(logger) t0 := dbtime.Now() ls1 := uuid.UUID{0x11} - var logs []agentsdk.Log + var logs []Log for i := 0; i < 60000; i++ { - logs = append(logs, agentsdk.Log{ + logs = append(logs, Log{ CreatedAt: t0, Output: "r", Level: codersdk.LogLevelInfo, }) } - uut.enqueue(ls1, logs...) + uut.Enqueue(ls1, logs...) loopErr := make(chan error, 1) go func() { - err := uut.sendLoop(ctx, fDest) + err := uut.SendLoop(ctx, fDest) loopErr <- err }() @@ -262,7 +259,7 @@ func TestLogSender_Batch(t *testing.T) { cancel() err = testutil.RequireRecvCtx(testCtx, t, loopErr) - require.NoError(t, err) + require.ErrorIs(t, err, context.Canceled) } func TestLogSender_MaxQueuedLogs(t *testing.T) { @@ -271,7 +268,7 @@ func TestLogSender_MaxQueuedLogs(t *testing.T) { ctx, cancel := context.WithCancel(testCtx) logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) fDest := newFakeLogDest() - uut := newLogSender(logger) + uut := NewLogSender(logger) t0 := dbtime.Now() ls1 := uuid.UUID{0x11} @@ -280,26 +277,26 @@ func TestLogSender_MaxQueuedLogs(t *testing.T) { for i := range hugeLog { hugeLog[i] = 'q' } - var logs []agentsdk.Log + var logs []Log for i := 0; i < n; i++ { - logs = append(logs, agentsdk.Log{ + logs = append(logs, Log{ CreatedAt: t0, Output: string(hugeLog), Level: codersdk.LogLevelInfo, }) } - uut.enqueue(ls1, logs...) + uut.Enqueue(ls1, logs...) // we're now right at the limit of output require.Equal(t, maxBytesQueued, uut.outputLen) // adding more logs should not error... ls2 := uuid.UUID{0x22} - uut.enqueue(ls2, logs...) + uut.Enqueue(ls2, logs...) loopErr := make(chan error, 1) go func() { - err := uut.sendLoop(ctx, fDest) + err := uut.SendLoop(ctx, fDest) loopErr <- err }() @@ -322,7 +319,7 @@ func TestLogSender_MaxQueuedLogs(t *testing.T) { cancel() err := testutil.RequireRecvCtx(testCtx, t, loopErr) - require.NoError(t, err) + require.ErrorIs(t, err, context.Canceled) } func TestLogSender_SendError(t *testing.T) { @@ -332,12 +329,12 @@ func TestLogSender_SendError(t *testing.T) { fDest := newFakeLogDest() expectedErr := xerrors.New("test") fDest.err = expectedErr - uut := newLogSender(logger) + uut := NewLogSender(logger) t0 := dbtime.Now() ls1 := uuid.UUID{0x11} - uut.enqueue(ls1, agentsdk.Log{ + uut.Enqueue(ls1, Log{ CreatedAt: t0, Output: "test log 0, src 1", Level: codersdk.LogLevelInfo, @@ -345,7 +342,7 @@ func TestLogSender_SendError(t *testing.T) { loopErr := make(chan error, 1) go func() { - err := uut.sendLoop(ctx, fDest) + err := uut.SendLoop(ctx, fDest) loopErr <- err }() @@ -355,8 +352,8 @@ func TestLogSender_SendError(t *testing.T) { err := testutil.RequireRecvCtx(ctx, t, loopErr) require.ErrorIs(t, err, expectedErr) - // we can still enqueue more logs after sendLoop returns - uut.enqueue(ls1, agentsdk.Log{ + // we can still enqueue more logs after SendLoop returns + uut.Enqueue(ls1, Log{ CreatedAt: t0, Output: "test log 2, src 1", Level: codersdk.LogLevelTrace,