Skip to content

Commit ad9d94a

Browse files
committed
feat: add logSender for sending logs on agent v2 API
1 parent 92b2e26 commit ad9d94a

File tree

3 files changed

+328
-0
lines changed

3 files changed

+328
-0
lines changed

agent/logs.go

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/xerrors"
10+
11+
"cdr.dev/slog"
12+
"github.com/coder/coder/v2/agent/proto"
13+
"github.com/coder/coder/v2/codersdk/agentsdk"
14+
)
15+
16+
const flushInterval = time.Second
17+
18+
type logQueue struct {
19+
logs []*proto.Log
20+
flushRequested bool
21+
lastFlush time.Time
22+
}
23+
24+
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
25+
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
26+
// the agent calls sendLoop to send pending logs.
27+
type logSender struct {
28+
*sync.Cond
29+
queues map[uuid.UUID]*logQueue
30+
logger slog.Logger
31+
}
32+
33+
type logDest interface {
34+
BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
35+
}
36+
37+
func newLogSender(logger slog.Logger) *logSender {
38+
return &logSender{
39+
Cond: sync.NewCond(&sync.Mutex{}),
40+
logger: logger,
41+
queues: make(map[uuid.UUID]*logQueue),
42+
}
43+
}
44+
45+
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
46+
logger := l.logger.With(slog.F("log_source_id", src))
47+
if len(logs) == 0 {
48+
logger.Debug(context.Background(), "enqueue called with no logs")
49+
return nil
50+
}
51+
l.L.Lock()
52+
defer l.L.Unlock()
53+
defer l.Broadcast()
54+
q, ok := l.queues[src]
55+
if !ok {
56+
q = &logQueue{}
57+
l.queues[src] = q
58+
}
59+
for _, log := range logs {
60+
pl, err := agentsdk.ProtoFromLog(log)
61+
if err != nil {
62+
return xerrors.Errorf("failed to convert log: %w", err)
63+
}
64+
q.logs = append(q.logs, pl)
65+
}
66+
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
67+
return nil
68+
}
69+
70+
func (l *logSender) flush(src uuid.UUID) error {
71+
l.L.Lock()
72+
defer l.L.Unlock()
73+
defer l.Broadcast()
74+
q, ok := l.queues[src]
75+
if ok {
76+
q.flushRequested = true
77+
}
78+
// queue might not exist because it's already been flushed and removed from
79+
// the map.
80+
return nil
81+
}
82+
83+
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
84+
// retry as it is expected that a higher layer retries establishing connection to the agent API and
85+
// calls sendLoop again.
86+
func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
87+
ctxDone := false
88+
defer l.logger.Debug(ctx, "sendLoop exiting")
89+
90+
// wake 4 times per flush interval to check if anything needs to be flushed
91+
go func() {
92+
tkr := time.NewTicker(flushInterval / 4)
93+
defer tkr.Stop()
94+
for {
95+
select {
96+
// also monitor the context here, so we notice immediately, rather
97+
// than waiting for the next tick or logs
98+
case <-ctx.Done():
99+
l.L.Lock()
100+
ctxDone = true
101+
l.L.Unlock()
102+
l.Broadcast()
103+
return
104+
case <-tkr.C:
105+
l.Broadcast()
106+
}
107+
}
108+
}()
109+
110+
l.L.Lock()
111+
defer l.L.Unlock()
112+
for {
113+
for !ctxDone && !l.hasPendingWorkLocked() {
114+
l.Wait()
115+
}
116+
if ctxDone {
117+
return nil
118+
}
119+
src, q := l.getPendingWorkLocked()
120+
q.flushRequested = false // clear flag since we're now flushing
121+
req := &proto.BatchCreateLogsRequest{
122+
LogSourceId: src[:],
123+
Logs: q.logs[:],
124+
}
125+
126+
l.L.Unlock()
127+
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
128+
_, err := dest.BatchCreateLogs(ctx, req)
129+
l.L.Lock()
130+
if err != nil {
131+
return xerrors.Errorf("failed to upload logs: %w", err)
132+
}
133+
134+
// Since elsewhere we only append to the logs, here we can remove them
135+
// since we successfully sent them. First we nil the pointers though,
136+
// so that they can be gc'd.
137+
for i := 0; i < len(req.Logs); i++ {
138+
q.logs[i] = nil
139+
}
140+
q.logs = q.logs[len(req.Logs):]
141+
if len(q.logs) == 0 {
142+
// no empty queues
143+
delete(l.queues, src)
144+
continue
145+
}
146+
q.lastFlush = time.Now()
147+
}
148+
}
149+
150+
func (l *logSender) hasPendingWorkLocked() bool {
151+
for _, q := range l.queues {
152+
if time.Since(q.lastFlush) > flushInterval {
153+
return true
154+
}
155+
if q.flushRequested {
156+
return true
157+
}
158+
}
159+
return false
160+
}
161+
162+
func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
163+
// take the one it's been the longest since we've flushed, so that we have some sense of
164+
// fairness across sources
165+
var earliestFlush time.Time
166+
for is, iq := range l.queues {
167+
if q == nil || iq.lastFlush.Before(earliestFlush) {
168+
src = is
169+
q = iq
170+
earliestFlush = iq.lastFlush
171+
}
172+
}
173+
return src, q
174+
}

agent/logs_internal_test.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"golang.org/x/exp/slices"
9+
10+
"github.com/google/uuid"
11+
"github.com/stretchr/testify/require"
12+
13+
"cdr.dev/slog"
14+
"cdr.dev/slog/sloggers/slogtest"
15+
"github.com/coder/coder/v2/agent/proto"
16+
"github.com/coder/coder/v2/coderd/database/dbtime"
17+
"github.com/coder/coder/v2/codersdk"
18+
"github.com/coder/coder/v2/codersdk/agentsdk"
19+
"github.com/coder/coder/v2/testutil"
20+
)
21+
22+
func TestLogSender(t *testing.T) {
23+
t.Parallel()
24+
testCtx := testutil.Context(t, testutil.WaitShort)
25+
ctx, cancel := context.WithCancel(testCtx)
26+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
27+
fDest := newFakeLogDest()
28+
uut := newLogSender(logger)
29+
30+
t0 := dbtime.Now()
31+
32+
ls1 := uuid.UUID{0x11}
33+
err := uut.enqueue(ls1, agentsdk.Log{
34+
CreatedAt: t0,
35+
Output: "test log 0, src 1",
36+
Level: codersdk.LogLevelInfo,
37+
})
38+
require.NoError(t, err)
39+
40+
ls2 := uuid.UUID{0x22}
41+
err = uut.enqueue(ls2,
42+
agentsdk.Log{
43+
CreatedAt: t0,
44+
Output: "test log 0, src 2",
45+
Level: codersdk.LogLevelError,
46+
},
47+
agentsdk.Log{
48+
CreatedAt: t0,
49+
Output: "test log 1, src 2",
50+
Level: codersdk.LogLevelWarn,
51+
},
52+
)
53+
require.NoError(t, err)
54+
55+
loopErr := make(chan error, 1)
56+
go func() {
57+
err := uut.sendLoop(ctx, fDest)
58+
loopErr <- err
59+
}()
60+
61+
// since neither source has even been flushed, it should immediately flush
62+
// both, although the order is not controlled
63+
var logReqs []*proto.BatchCreateLogsRequest
64+
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
65+
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
66+
for _, req := range logReqs {
67+
require.NotNil(t, req)
68+
srcID, err := uuid.FromBytes(req.LogSourceId)
69+
require.NoError(t, err)
70+
switch srcID {
71+
case ls1:
72+
require.Len(t, req.Logs, 1)
73+
require.Equal(t, "test log 0, src 1", req.Logs[0].GetOutput())
74+
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
75+
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
76+
case ls2:
77+
require.Len(t, req.Logs, 2)
78+
require.Equal(t, "test log 0, src 2", req.Logs[0].GetOutput())
79+
require.Equal(t, proto.Log_ERROR, req.Logs[0].GetLevel())
80+
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
81+
require.Equal(t, "test log 1, src 2", req.Logs[1].GetOutput())
82+
require.Equal(t, proto.Log_WARN, req.Logs[1].GetLevel())
83+
require.Equal(t, t0, req.Logs[1].GetCreatedAt().AsTime())
84+
default:
85+
t.Fatal("unknown log source")
86+
}
87+
}
88+
89+
t1 := dbtime.Now()
90+
err = uut.enqueue(ls1, agentsdk.Log{
91+
CreatedAt: t1,
92+
Output: "test log 1, src 1",
93+
Level: codersdk.LogLevelDebug,
94+
})
95+
require.NoError(t, err)
96+
err = uut.flush(ls1)
97+
require.NoError(t, err)
98+
99+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
100+
// give ourselves a 25% buffer if we're right on the cusp of a tick
101+
require.LessOrEqual(t, time.Since(t1), flushInterval*5/4)
102+
require.NotNil(t, req)
103+
require.Len(t, req.Logs, 1)
104+
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
105+
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
106+
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())
107+
108+
cancel()
109+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
110+
require.NoError(t, err)
111+
112+
// we can still enqueue more logs after sendLoop returns
113+
err = uut.enqueue(ls1, agentsdk.Log{
114+
CreatedAt: t1,
115+
Output: "test log 2, src 1",
116+
Level: codersdk.LogLevelTrace,
117+
})
118+
require.NoError(t, err)
119+
}
120+
121+
type fakeLogDest struct {
122+
reqs chan *proto.BatchCreateLogsRequest
123+
}
124+
125+
func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
126+
// clone the logs so that modifications the sender makes don't affect our tests. In production
127+
// these would be serialized/deserialized so we don't have to worry too much.
128+
req.Logs = slices.Clone(req.Logs)
129+
select {
130+
case <-ctx.Done():
131+
return nil, ctx.Err()
132+
case f.reqs <- req:
133+
return &proto.BatchCreateLogsResponse{}, nil
134+
}
135+
}
136+
137+
func newFakeLogDest() *fakeLogDest {
138+
return &fakeLogDest{
139+
reqs: make(chan *proto.BatchCreateLogsRequest),
140+
}
141+
}

codersdk/agentsdk/convert.go

+13
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/google/uuid"
88
"golang.org/x/xerrors"
99
"google.golang.org/protobuf/types/known/durationpb"
10+
"google.golang.org/protobuf/types/known/timestamppb"
1011

1112
"github.com/coder/coder/v2/agent/proto"
1213
"github.com/coder/coder/v2/codersdk"
@@ -293,3 +294,15 @@ func ProtoFromAppHealthsRequest(req PostAppHealthsRequest) (*proto.BatchUpdateAp
293294
}
294295
return pReq, nil
295296
}
297+
298+
func ProtoFromLog(log Log) (*proto.Log, error) {
299+
lvl, ok := proto.Log_Level_value[strings.ToUpper(string(log.Level))]
300+
if !ok {
301+
return nil, xerrors.Errorf("unknown log level: %s", log.Level)
302+
}
303+
return &proto.Log{
304+
CreatedAt: timestamppb.New(log.CreatedAt),
305+
Output: log.Output,
306+
Level: proto.Log_Level(lvl),
307+
}, nil
308+
}

0 commit comments

Comments
 (0)