From 900e32adb6b0f376f54c000c04f4417d78bd7492 Mon Sep 17 00:00:00 2001
From: Spike Curtis <spike@coder.com>
Date: Thu, 15 Feb 2024 11:36:58 +0400
Subject: [PATCH] chore: move LogSender to agentsdk

---
 agent/logs.go                                 | 239 ----------------
 codersdk/agentsdk/logs.go                     | 263 +++++++++++++++++-
 .../agentsdk}/logs_internal_test.go           |  99 ++++---
 3 files changed, 310 insertions(+), 291 deletions(-)
 delete mode 100644 agent/logs.go
 rename {agent => codersdk/agentsdk}/logs_internal_test.go (87%)

diff --git a/agent/logs.go b/agent/logs.go
deleted file mode 100644
index e5241bf36b4a1..0000000000000
--- a/agent/logs.go
+++ /dev/null
@@ -1,239 +0,0 @@
-package agent
-
-import (
-	"context"
-	"sync"
-	"time"
-
-	"github.com/google/uuid"
-	"golang.org/x/xerrors"
-
-	"cdr.dev/slog"
-	"github.com/coder/coder/v2/agent/proto"
-	"github.com/coder/coder/v2/codersdk/agentsdk"
-)
-
-const (
-	flushInterval    = time.Second
-	maxBytesPerBatch = 1 << 20 // 1MiB
-	overheadPerLog   = 21      // found by testing
-
-	// maxBytesQueued is the maximum length of logs we will queue in memory.  The number is taken
-	// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
-	// accept in the database.
-	maxBytesQueued = 1048576
-)
-
-type logQueue struct {
-	logs           []*proto.Log
-	flushRequested bool
-	lastFlush      time.Time
-}
-
-// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
-// agent API.  Things that need to log call enqueue and flush.  When the agent API becomes available,
-// the agent calls sendLoop to send pending logs.
-type logSender struct {
-	*sync.Cond
-	queues           map[uuid.UUID]*logQueue
-	logger           slog.Logger
-	exceededLogLimit bool
-	outputLen        int
-}
-
-type logDest interface {
-	BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
-}
-
-func newLogSender(logger slog.Logger) *logSender {
-	return &logSender{
-		Cond:   sync.NewCond(&sync.Mutex{}),
-		logger: logger,
-		queues: make(map[uuid.UUID]*logQueue),
-	}
-}
-
-func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) {
-	logger := l.logger.With(slog.F("log_source_id", src))
-	if len(logs) == 0 {
-		logger.Debug(context.Background(), "enqueue called with no logs")
-		return
-	}
-	l.L.Lock()
-	defer l.L.Unlock()
-	if l.exceededLogLimit {
-		logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
-		// don't error, as we also write to file and don't want the overall write to fail
-		return
-	}
-	defer l.Broadcast()
-	q, ok := l.queues[src]
-	if !ok {
-		q = &logQueue{}
-		l.queues[src] = q
-	}
-	for k, log := range logs {
-		// Here we check the queue size before adding a log because we want to queue up slightly
-		// more logs than the database would store to ensure we trigger "logs truncated" at the
-		// database layer.  Otherwise, the end user wouldn't know logs are truncated unless they
-		// examined the Coder agent logs.
-		if l.outputLen > maxBytesQueued {
-			logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
-			return
-		}
-		pl, err := agentsdk.ProtoFromLog(log)
-		if err != nil {
-			logger.Critical(context.Background(), "failed to convert log", slog.Error(err))
-			return
-		}
-		if len(pl.Output)+overheadPerLog > maxBytesPerBatch {
-			logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output)))
-			continue
-		}
-		q.logs = append(q.logs, pl)
-		l.outputLen += len(pl.Output)
-	}
-	logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
-}
-
-func (l *logSender) flush(src uuid.UUID) {
-	l.L.Lock()
-	defer l.L.Unlock()
-	defer l.Broadcast()
-	q, ok := l.queues[src]
-	if ok {
-		q.flushRequested = true
-	}
-	// queue might not exist because it's already been flushed and removed from
-	// the map.
-}
-
-// sendLoop sends any pending logs until it hits an error or the context is canceled.  It does not
-// retry as it is expected that a higher layer retries establishing connection to the agent API and
-// calls sendLoop again.
-func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
-	l.L.Lock()
-	defer l.L.Unlock()
-	if l.exceededLogLimit {
-		l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
-		// no point in keeping this loop going, if log limit is exceeded, but don't return an
-		// error because we're already handled it
-		return nil
-	}
-
-	ctxDone := false
-	defer l.logger.Debug(ctx, "sendLoop exiting")
-
-	// wake 4 times per flush interval to check if anything needs to be flushed
-	ctx, cancel := context.WithCancel(ctx)
-	defer cancel()
-	go func() {
-		tkr := time.NewTicker(flushInterval / 4)
-		defer tkr.Stop()
-		for {
-			select {
-			// also monitor the context here, so we notice immediately, rather
-			// than waiting for the next tick or logs
-			case <-ctx.Done():
-				l.L.Lock()
-				ctxDone = true
-				l.L.Unlock()
-				l.Broadcast()
-				return
-			case <-tkr.C:
-				l.Broadcast()
-			}
-		}
-	}()
-
-	for {
-		for !ctxDone && !l.hasPendingWorkLocked() {
-			l.Wait()
-		}
-		if ctxDone {
-			return nil
-		}
-
-		src, q := l.getPendingWorkLocked()
-		logger := l.logger.With(slog.F("log_source_id", src))
-		q.flushRequested = false // clear flag since we're now flushing
-		req := &proto.BatchCreateLogsRequest{
-			LogSourceId: src[:],
-		}
-
-		// outputToSend keeps track of the size of the protobuf message we send, while
-		// outputToRemove keeps track of the size of the output we'll remove from the queues on
-		// success.  They are different because outputToSend also counts protocol message overheads.
-		outputToSend := 0
-		outputToRemove := 0
-		n := 0
-		for n < len(q.logs) {
-			log := q.logs[n]
-			outputToSend += len(log.Output) + overheadPerLog
-			if outputToSend > maxBytesPerBatch {
-				break
-			}
-			req.Logs = append(req.Logs, log)
-			n++
-			outputToRemove += len(log.Output)
-		}
-
-		l.L.Unlock()
-		logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
-		resp, err := dest.BatchCreateLogs(ctx, req)
-		l.L.Lock()
-		if err != nil {
-			return xerrors.Errorf("failed to upload logs: %w", err)
-		}
-		if resp.LogLimitExceeded {
-			l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
-			l.exceededLogLimit = true
-			// no point in keeping anything we have queued around, server will not accept them
-			l.queues = make(map[uuid.UUID]*logQueue)
-			// We've handled the error as best as we can. We don't want the server limit to grind
-			// other things to a halt, so this is all we can do.
-			return nil
-		}
-
-		// Since elsewhere we only append to the logs, here we can remove them
-		// since we successfully sent them.  First we nil the pointers though,
-		// so that they can be gc'd.
-		for i := 0; i < n; i++ {
-			q.logs[i] = nil
-		}
-		q.logs = q.logs[n:]
-		l.outputLen -= outputToRemove
-		if len(q.logs) == 0 {
-			// no empty queues
-			delete(l.queues, src)
-			continue
-		}
-		q.lastFlush = time.Now()
-	}
-}
-
-func (l *logSender) hasPendingWorkLocked() bool {
-	for _, q := range l.queues {
-		if time.Since(q.lastFlush) > flushInterval {
-			return true
-		}
-		if q.flushRequested {
-			return true
-		}
-	}
-	return false
-}
-
-func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
-	// take the one it's been the longest since we've flushed, so that we have some sense of
-	// fairness across sources
-	var earliestFlush time.Time
-	for is, iq := range l.queues {
-		if q == nil || iq.lastFlush.Before(earliestFlush) {
-			src = is
-			q = iq
-			earliestFlush = iq.lastFlush
-		}
-	}
-	return src, q
-}
diff --git a/codersdk/agentsdk/logs.go b/codersdk/agentsdk/logs.go
index e7b86194cd1cb..a28aede7bd177 100644
--- a/codersdk/agentsdk/logs.go
+++ b/codersdk/agentsdk/logs.go
@@ -6,17 +6,31 @@ import (
 	"errors"
 	"io"
 	"net/http"
+	"sync"
 	"time"
 
-	"golang.org/x/xerrors"
+	"google.golang.org/protobuf/types/known/timestamppb"
 
 	"github.com/google/uuid"
+	"golang.org/x/xerrors"
 
 	"cdr.dev/slog"
+	"github.com/coder/coder/v2/agent/proto"
 	"github.com/coder/coder/v2/codersdk"
 	"github.com/coder/retry"
 )
 
+const (
+	flushInterval    = time.Second
+	maxBytesPerBatch = 1 << 20 // 1MiB
+	overheadPerLog   = 21      // found by testing
+
+	// maxBytesQueued is the maximum length of logs we will queue in memory.  The number is taken
+	// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
+	// accept in the database.
+	maxBytesQueued = 1048576
+)
+
 type startupLogsWriter struct {
 	buf    bytes.Buffer // Buffer to track partial lines.
 	ctx    context.Context
@@ -107,6 +121,8 @@ type logsSenderOptions struct {
 // has been called. Calling sendLog concurrently is not supported. If
 // the context passed to flushAndClose is canceled, any remaining logs
 // will be discarded.
+//
+// Deprecated: Use NewLogSender instead, based on the v2 Agent API.
 func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger, opts ...func(*logsSenderOptions)) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) {
 	o := logsSenderOptions{
 		flushTimeout: 250 * time.Millisecond,
@@ -250,3 +266,248 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
 	}
 	return sendLog, flushAndClose
 }
+
+type logQueue struct {
+	logs           []*proto.Log
+	flushRequested bool
+	lastFlush      time.Time
+}
+
+// LogSender is a component that handles enqueuing logs and then sending them over the agent API.
+// Things that need to log call Enqueue and Flush.  When the agent API becomes available, call
+// SendLoop to send pending logs.
+type LogSender struct {
+	*sync.Cond
+	queues           map[uuid.UUID]*logQueue
+	logger           slog.Logger
+	exceededLogLimit bool
+	outputLen        int
+}
+
+type logDest interface {
+	BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
+}
+
+func NewLogSender(logger slog.Logger) *LogSender {
+	return &LogSender{
+		Cond:   sync.NewCond(&sync.Mutex{}),
+		logger: logger,
+		queues: make(map[uuid.UUID]*logQueue),
+	}
+}
+
+func (l *LogSender) Enqueue(src uuid.UUID, logs ...Log) {
+	logger := l.logger.With(slog.F("log_source_id", src))
+	if len(logs) == 0 {
+		logger.Debug(context.Background(), "enqueue called with no logs")
+		return
+	}
+	l.L.Lock()
+	defer l.L.Unlock()
+	if l.exceededLogLimit {
+		logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
+		// don't error, as we also write to file and don't want the overall write to fail
+		return
+	}
+	defer l.Broadcast()
+	q, ok := l.queues[src]
+	if !ok {
+		q = &logQueue{}
+		l.queues[src] = q
+	}
+	for k, log := range logs {
+		// Here we check the queue size before adding a log because we want to queue up slightly
+		// more logs than the database would store to ensure we trigger "logs truncated" at the
+		// database layer.  Otherwise, the end user wouldn't know logs are truncated unless they
+		// examined the Coder agent logs.
+		if l.outputLen > maxBytesQueued {
+			logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
+			return
+		}
+		pl, err := ProtoFromLog(log)
+		if err != nil {
+			logger.Critical(context.Background(), "failed to convert log", slog.Error(err))
+			pl = &proto.Log{
+				CreatedAt: timestamppb.Now(),
+				Level:     proto.Log_ERROR,
+				Output:    "**Coder Internal Error**: Failed to convert log",
+			}
+		}
+		if len(pl.Output)+overheadPerLog > maxBytesPerBatch {
+			logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output)))
+			continue
+		}
+		q.logs = append(q.logs, pl)
+		l.outputLen += len(pl.Output)
+	}
+	logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
+}
+
+func (l *LogSender) Flush(src uuid.UUID) {
+	l.L.Lock()
+	defer l.L.Unlock()
+	defer l.Broadcast()
+	q, ok := l.queues[src]
+	if ok {
+		q.flushRequested = true
+	}
+	// queue might not exist because it's already been flushed and removed from
+	// the map.
+}
+
+var LogLimitExceededError = xerrors.New("Log limit exceeded")
+
+// SendLoop sends any pending logs until it hits an error or the context is canceled.  It does not
+// retry as it is expected that a higher layer retries establishing connection to the agent API and
+// calls SendLoop again.
+func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error {
+	l.L.Lock()
+	defer l.L.Unlock()
+	if l.exceededLogLimit {
+		l.logger.Debug(ctx, "aborting SendLoop because log limit is already exceeded")
+		return LogLimitExceededError
+	}
+
+	ctxDone := false
+	defer l.logger.Debug(ctx, "log sender send loop exiting")
+
+	// wake 4 times per Flush interval to check if anything needs to be flushed
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+	go func() {
+		tkr := time.NewTicker(flushInterval / 4)
+		defer tkr.Stop()
+		for {
+			select {
+			// also monitor the context here, so we notice immediately, rather
+			// than waiting for the next tick or logs
+			case <-ctx.Done():
+				l.L.Lock()
+				ctxDone = true
+				l.L.Unlock()
+				l.Broadcast()
+				return
+			case <-tkr.C:
+				l.Broadcast()
+			}
+		}
+	}()
+
+	for {
+		for !ctxDone && !l.hasPendingWorkLocked() {
+			l.Wait()
+		}
+		if ctxDone {
+			return ctx.Err()
+		}
+
+		src, q := l.getPendingWorkLocked()
+		logger := l.logger.With(slog.F("log_source_id", src))
+		q.flushRequested = false // clear flag since we're now flushing
+		req := &proto.BatchCreateLogsRequest{
+			LogSourceId: src[:],
+		}
+
+		// outputToSend keeps track of the size of the protobuf message we send, while
+		// outputToRemove keeps track of the size of the output we'll remove from the queues on
+		// success.  They are different because outputToSend also counts protocol message overheads.
+		outputToSend := 0
+		outputToRemove := 0
+		n := 0
+		for n < len(q.logs) {
+			log := q.logs[n]
+			outputToSend += len(log.Output) + overheadPerLog
+			if outputToSend > maxBytesPerBatch {
+				break
+			}
+			req.Logs = append(req.Logs, log)
+			n++
+			outputToRemove += len(log.Output)
+		}
+
+		l.L.Unlock()
+		logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
+		resp, err := dest.BatchCreateLogs(ctx, req)
+		l.L.Lock()
+		if err != nil {
+			return xerrors.Errorf("failed to upload logs: %w", err)
+		}
+		if resp.LogLimitExceeded {
+			l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
+			l.exceededLogLimit = true
+			// no point in keeping anything we have queued around, server will not accept them
+			l.queues = make(map[uuid.UUID]*logQueue)
+			return LogLimitExceededError
+		}
+
+		// Since elsewhere we only append to the logs, here we can remove them
+		// since we successfully sent them.  First we nil the pointers though,
+		// so that they can be gc'd.
+		for i := 0; i < n; i++ {
+			q.logs[i] = nil
+		}
+		q.logs = q.logs[n:]
+		l.outputLen -= outputToRemove
+		if len(q.logs) == 0 {
+			// no empty queues
+			delete(l.queues, src)
+			continue
+		}
+		q.lastFlush = time.Now()
+	}
+}
+
+func (l *LogSender) hasPendingWorkLocked() bool {
+	for _, q := range l.queues {
+		if time.Since(q.lastFlush) > flushInterval {
+			return true
+		}
+		if q.flushRequested {
+			return true
+		}
+	}
+	return false
+}
+
+func (l *LogSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
+	// take the one it's been the longest since we've flushed, so that we have some sense of
+	// fairness across sources
+	var earliestFlush time.Time
+	for is, iq := range l.queues {
+		if q == nil || iq.lastFlush.Before(earliestFlush) {
+			src = is
+			q = iq
+			earliestFlush = iq.lastFlush
+		}
+	}
+	return src, q
+}
+
+func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger {
+	return ScriptLogger{srcID: logSourceID, sender: l}
+}
+
+type ScriptLogger struct {
+	sender *LogSender
+	srcID  uuid.UUID
+}
+
+func (s ScriptLogger) Send(ctx context.Context, log ...Log) error {
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	default:
+		s.sender.Enqueue(s.srcID, log...)
+		return nil
+	}
+}
+
+func (s ScriptLogger) Flush(ctx context.Context) error {
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	default:
+		s.sender.Flush(s.srcID)
+		return nil
+	}
+}
diff --git a/agent/logs_internal_test.go b/codersdk/agentsdk/logs_internal_test.go
similarity index 87%
rename from agent/logs_internal_test.go
rename to codersdk/agentsdk/logs_internal_test.go
index d688ed4a8d468..527ebbea81f64 100644
--- a/agent/logs_internal_test.go
+++ b/codersdk/agentsdk/logs_internal_test.go
@@ -1,16 +1,14 @@
-package agent
+package agentsdk
 
 import (
 	"context"
 	"testing"
 	"time"
 
-	"golang.org/x/xerrors"
-
-	"golang.org/x/exp/slices"
-
 	"github.com/google/uuid"
 	"github.com/stretchr/testify/require"
+	"golang.org/x/exp/slices"
+	"golang.org/x/xerrors"
 	protobuf "google.golang.org/protobuf/proto"
 
 	"cdr.dev/slog"
@@ -18,7 +16,6 @@ import (
 	"github.com/coder/coder/v2/agent/proto"
 	"github.com/coder/coder/v2/coderd/database/dbtime"
 	"github.com/coder/coder/v2/codersdk"
-	"github.com/coder/coder/v2/codersdk/agentsdk"
 	"github.com/coder/coder/v2/testutil"
 )
 
@@ -28,25 +25,25 @@ func TestLogSender_Mainline(t *testing.T) {
 	ctx, cancel := context.WithCancel(testCtx)
 	logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
 	fDest := newFakeLogDest()
-	uut := newLogSender(logger)
+	uut := NewLogSender(logger)
 
 	t0 := dbtime.Now()
 
 	ls1 := uuid.UUID{0x11}
-	uut.enqueue(ls1, agentsdk.Log{
+	uut.Enqueue(ls1, Log{
 		CreatedAt: t0,
 		Output:    "test log 0, src 1",
 		Level:     codersdk.LogLevelInfo,
 	})
 
 	ls2 := uuid.UUID{0x22}
-	uut.enqueue(ls2,
-		agentsdk.Log{
+	uut.Enqueue(ls2,
+		Log{
 			CreatedAt: t0,
 			Output:    "test log 0, src 2",
 			Level:     codersdk.LogLevelError,
 		},
-		agentsdk.Log{
+		Log{
 			CreatedAt: t0,
 			Output:    "test log 1, src 2",
 			Level:     codersdk.LogLevelWarn,
@@ -55,11 +52,11 @@ func TestLogSender_Mainline(t *testing.T) {
 
 	loopErr := make(chan error, 1)
 	go func() {
-		err := uut.sendLoop(ctx, fDest)
+		err := uut.SendLoop(ctx, fDest)
 		loopErr <- err
 	}()
 
-	// since neither source has even been flushed, it should immediately flush
+	// since neither source has even been flushed, it should immediately Flush
 	// both, although the order is not controlled
 	var logReqs []*proto.BatchCreateLogsRequest
 	logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
@@ -90,12 +87,12 @@ func TestLogSender_Mainline(t *testing.T) {
 	}
 
 	t1 := dbtime.Now()
-	uut.enqueue(ls1, agentsdk.Log{
+	uut.Enqueue(ls1, Log{
 		CreatedAt: t1,
 		Output:    "test log 1, src 1",
 		Level:     codersdk.LogLevelDebug,
 	})
-	uut.flush(ls1)
+	uut.Flush(ls1)
 
 	req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
 	testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
@@ -109,10 +106,10 @@ func TestLogSender_Mainline(t *testing.T) {
 
 	cancel()
 	err := testutil.RequireRecvCtx(testCtx, t, loopErr)
-	require.NoError(t, err)
+	require.ErrorIs(t, err, context.Canceled)
 
-	// we can still enqueue more logs after sendLoop returns
-	uut.enqueue(ls1, agentsdk.Log{
+	// we can still enqueue more logs after SendLoop returns
+	uut.Enqueue(ls1, Log{
 		CreatedAt: t1,
 		Output:    "test log 2, src 1",
 		Level:     codersdk.LogLevelTrace,
@@ -124,12 +121,12 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
 	ctx := testutil.Context(t, testutil.WaitShort)
 	logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
 	fDest := newFakeLogDest()
-	uut := newLogSender(logger)
+	uut := NewLogSender(logger)
 
 	t0 := dbtime.Now()
 
 	ls1 := uuid.UUID{0x11}
-	uut.enqueue(ls1, agentsdk.Log{
+	uut.Enqueue(ls1, Log{
 		CreatedAt: t0,
 		Output:    "test log 0, src 1",
 		Level:     codersdk.LogLevelInfo,
@@ -137,7 +134,7 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
 
 	loopErr := make(chan error, 1)
 	go func() {
-		err := uut.sendLoop(ctx, fDest)
+		err := uut.SendLoop(ctx, fDest)
 		loopErr <- err
 	}()
 
@@ -147,11 +144,11 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
 		&proto.BatchCreateLogsResponse{LogLimitExceeded: true})
 
 	err := testutil.RequireRecvCtx(ctx, t, loopErr)
-	require.NoError(t, err)
+	require.ErrorIs(t, err, LogLimitExceededError)
 
-	// we can still enqueue more logs after sendLoop returns, but they don't
+	// we can still enqueue more logs after SendLoop returns, but they don't
 	// actually get enqueued
-	uut.enqueue(ls1, agentsdk.Log{
+	uut.Enqueue(ls1, Log{
 		CreatedAt: t0,
 		Output:    "test log 2, src 1",
 		Level:     codersdk.LogLevelTrace,
@@ -160,13 +157,13 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
 	require.Len(t, uut.queues, 0)
 	uut.L.Unlock()
 
-	// Also, if we run sendLoop again, it should immediately exit.
+	// Also, if we run SendLoop again, it should immediately exit.
 	go func() {
-		err := uut.sendLoop(ctx, fDest)
+		err := uut.SendLoop(ctx, fDest)
 		loopErr <- err
 	}()
 	err = testutil.RequireRecvCtx(ctx, t, loopErr)
-	require.NoError(t, err)
+	require.ErrorIs(t, err, LogLimitExceededError)
 }
 
 func TestLogSender_SkipHugeLog(t *testing.T) {
@@ -175,7 +172,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
 	ctx, cancel := context.WithCancel(testCtx)
 	logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
 	fDest := newFakeLogDest()
-	uut := newLogSender(logger)
+	uut := NewLogSender(logger)
 
 	t0 := dbtime.Now()
 	ls1 := uuid.UUID{0x11}
@@ -185,13 +182,13 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
 	for i := range hugeLog {
 		hugeLog[i] = 'q'
 	}
-	uut.enqueue(ls1,
-		agentsdk.Log{
+	uut.Enqueue(ls1,
+		Log{
 			CreatedAt: t0,
 			Output:    string(hugeLog),
 			Level:     codersdk.LogLevelInfo,
 		},
-		agentsdk.Log{
+		Log{
 			CreatedAt: t0,
 			Output:    "test log 1, src 1",
 			Level:     codersdk.LogLevelInfo,
@@ -199,7 +196,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
 
 	loopErr := make(chan error, 1)
 	go func() {
-		err := uut.sendLoop(ctx, fDest)
+		err := uut.SendLoop(ctx, fDest)
 		loopErr <- err
 	}()
 
@@ -212,7 +209,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
 
 	cancel()
 	err := testutil.RequireRecvCtx(testCtx, t, loopErr)
-	require.NoError(t, err)
+	require.ErrorIs(t, err, context.Canceled)
 }
 
 func TestLogSender_Batch(t *testing.T) {
@@ -221,23 +218,23 @@ func TestLogSender_Batch(t *testing.T) {
 	ctx, cancel := context.WithCancel(testCtx)
 	logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
 	fDest := newFakeLogDest()
-	uut := newLogSender(logger)
+	uut := NewLogSender(logger)
 
 	t0 := dbtime.Now()
 	ls1 := uuid.UUID{0x11}
-	var logs []agentsdk.Log
+	var logs []Log
 	for i := 0; i < 60000; i++ {
-		logs = append(logs, agentsdk.Log{
+		logs = append(logs, Log{
 			CreatedAt: t0,
 			Output:    "r",
 			Level:     codersdk.LogLevelInfo,
 		})
 	}
-	uut.enqueue(ls1, logs...)
+	uut.Enqueue(ls1, logs...)
 
 	loopErr := make(chan error, 1)
 	go func() {
-		err := uut.sendLoop(ctx, fDest)
+		err := uut.SendLoop(ctx, fDest)
 		loopErr <- err
 	}()
 
@@ -262,7 +259,7 @@ func TestLogSender_Batch(t *testing.T) {
 
 	cancel()
 	err = testutil.RequireRecvCtx(testCtx, t, loopErr)
-	require.NoError(t, err)
+	require.ErrorIs(t, err, context.Canceled)
 }
 
 func TestLogSender_MaxQueuedLogs(t *testing.T) {
@@ -271,7 +268,7 @@ func TestLogSender_MaxQueuedLogs(t *testing.T) {
 	ctx, cancel := context.WithCancel(testCtx)
 	logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
 	fDest := newFakeLogDest()
-	uut := newLogSender(logger)
+	uut := NewLogSender(logger)
 
 	t0 := dbtime.Now()
 	ls1 := uuid.UUID{0x11}
@@ -280,26 +277,26 @@ func TestLogSender_MaxQueuedLogs(t *testing.T) {
 	for i := range hugeLog {
 		hugeLog[i] = 'q'
 	}
-	var logs []agentsdk.Log
+	var logs []Log
 	for i := 0; i < n; i++ {
-		logs = append(logs, agentsdk.Log{
+		logs = append(logs, Log{
 			CreatedAt: t0,
 			Output:    string(hugeLog),
 			Level:     codersdk.LogLevelInfo,
 		})
 	}
-	uut.enqueue(ls1, logs...)
+	uut.Enqueue(ls1, logs...)
 
 	// we're now right at the limit of output
 	require.Equal(t, maxBytesQueued, uut.outputLen)
 
 	// adding more logs should not error...
 	ls2 := uuid.UUID{0x22}
-	uut.enqueue(ls2, logs...)
+	uut.Enqueue(ls2, logs...)
 
 	loopErr := make(chan error, 1)
 	go func() {
-		err := uut.sendLoop(ctx, fDest)
+		err := uut.SendLoop(ctx, fDest)
 		loopErr <- err
 	}()
 
@@ -322,7 +319,7 @@ func TestLogSender_MaxQueuedLogs(t *testing.T) {
 
 	cancel()
 	err := testutil.RequireRecvCtx(testCtx, t, loopErr)
-	require.NoError(t, err)
+	require.ErrorIs(t, err, context.Canceled)
 }
 
 func TestLogSender_SendError(t *testing.T) {
@@ -332,12 +329,12 @@ func TestLogSender_SendError(t *testing.T) {
 	fDest := newFakeLogDest()
 	expectedErr := xerrors.New("test")
 	fDest.err = expectedErr
-	uut := newLogSender(logger)
+	uut := NewLogSender(logger)
 
 	t0 := dbtime.Now()
 
 	ls1 := uuid.UUID{0x11}
-	uut.enqueue(ls1, agentsdk.Log{
+	uut.Enqueue(ls1, Log{
 		CreatedAt: t0,
 		Output:    "test log 0, src 1",
 		Level:     codersdk.LogLevelInfo,
@@ -345,7 +342,7 @@ func TestLogSender_SendError(t *testing.T) {
 
 	loopErr := make(chan error, 1)
 	go func() {
-		err := uut.sendLoop(ctx, fDest)
+		err := uut.SendLoop(ctx, fDest)
 		loopErr <- err
 	}()
 
@@ -355,8 +352,8 @@ func TestLogSender_SendError(t *testing.T) {
 	err := testutil.RequireRecvCtx(ctx, t, loopErr)
 	require.ErrorIs(t, err, expectedErr)
 
-	// we can still enqueue more logs after sendLoop returns
-	uut.enqueue(ls1, agentsdk.Log{
+	// we can still enqueue more logs after SendLoop returns
+	uut.Enqueue(ls1, Log{
 		CreatedAt: t0,
 		Output:    "test log 2, src 1",
 		Level:     codersdk.LogLevelTrace,