Skip to content

Commit 0f99947

Browse files
committed
feat: add logSender for sending logs on agent v2 API
1 parent 151aaad commit 0f99947

File tree

3 files changed

+321
-0
lines changed

3 files changed

+321
-0
lines changed

agent/logs.go

+172
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/exp/slices"
10+
"golang.org/x/xerrors"
11+
12+
"cdr.dev/slog"
13+
"github.com/coder/coder/v2/agent/proto"
14+
"github.com/coder/coder/v2/codersdk/agentsdk"
15+
)
16+
17+
const flushInterval = time.Second
18+
19+
type logQueue struct {
20+
logs []*proto.Log
21+
flushRequested bool
22+
lastFlush time.Time
23+
}
24+
25+
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
26+
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
27+
// the agent calls sendLoop to send pending logs.
28+
type logSender struct {
29+
*sync.Cond
30+
queues map[uuid.UUID]*logQueue
31+
logger slog.Logger
32+
}
33+
34+
type logDest interface {
35+
BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
36+
}
37+
38+
func newLogSender(logger slog.Logger) *logSender {
39+
return &logSender{
40+
Cond: sync.NewCond(&sync.Mutex{}),
41+
logger: logger,
42+
queues: make(map[uuid.UUID]*logQueue),
43+
}
44+
}
45+
46+
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
47+
logger := l.logger.With(slog.F("log_source_id", src))
48+
if len(logs) == 0 {
49+
logger.Debug(context.Background(), "enqueue called with no logs")
50+
return nil
51+
}
52+
l.L.Lock()
53+
defer l.L.Unlock()
54+
defer l.Broadcast()
55+
q, ok := l.queues[src]
56+
if !ok {
57+
q = &logQueue{}
58+
l.queues[src] = q
59+
}
60+
for _, log := range logs {
61+
pl, err := agentsdk.ProtoFromLog(log)
62+
if err != nil {
63+
return xerrors.Errorf("failed to convert log: %w", err)
64+
}
65+
q.logs = append(q.logs, pl)
66+
}
67+
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
68+
return nil
69+
}
70+
71+
func (l *logSender) flush(src uuid.UUID) error {
72+
l.L.Lock()
73+
defer l.L.Unlock()
74+
defer l.Broadcast()
75+
q, ok := l.queues[src]
76+
if ok {
77+
q.flushRequested = true
78+
}
79+
// queue might not exist because it's already been flushed and removed from
80+
// the map.
81+
return nil
82+
}
83+
84+
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
85+
// retry as it is expected that a higher layer retries establishing connection to the agent API and
86+
// calls sendLoop again.
87+
func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
88+
ctxDone := false
89+
defer l.logger.Debug(ctx, "sendLoop exiting")
90+
91+
// wake 4 times per flush interval to check if anything needs to be flushed
92+
go func() {
93+
tkr := time.NewTicker(flushInterval / 4)
94+
defer tkr.Stop()
95+
for {
96+
select {
97+
// also monitor the context here, so we notice immediately, rather
98+
// than waiting for the next tick or logs
99+
case <-ctx.Done():
100+
l.L.Lock()
101+
ctxDone = true
102+
l.L.Unlock()
103+
l.Broadcast()
104+
return
105+
case <-tkr.C:
106+
l.Broadcast()
107+
}
108+
}
109+
}()
110+
111+
l.L.Lock()
112+
defer l.L.Unlock()
113+
for {
114+
for !ctxDone && !l.hasPendingWorkLocked() {
115+
l.Wait()
116+
}
117+
if ctxDone {
118+
return nil
119+
}
120+
src, q := l.getPendingWorkLocked()
121+
q.flushRequested = false // clear flag since we're now flushing
122+
req := &proto.BatchCreateLogsRequest{
123+
LogSourceId: src[:],
124+
// when we release the lock, we don't want modifications to the slice to affect us
125+
Logs: slices.Clone(q.logs),
126+
}
127+
128+
l.L.Unlock()
129+
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
130+
_, err := dest.BatchCreateLogs(ctx, req)
131+
l.L.Lock()
132+
if err != nil {
133+
return xerrors.Errorf("failed to upload logs: %w", err)
134+
}
135+
136+
// since elsewhere we only append to the logs, here we can remove them
137+
// since we successfully sent them
138+
q.logs = q.logs[len(req.Logs):]
139+
if len(q.logs) == 0 {
140+
// no empty queues
141+
delete(l.queues, src)
142+
continue
143+
}
144+
q.lastFlush = time.Now()
145+
}
146+
}
147+
148+
func (l *logSender) hasPendingWorkLocked() bool {
149+
for _, q := range l.queues {
150+
if q.flushRequested {
151+
return true
152+
}
153+
if time.Since(q.lastFlush) > flushInterval {
154+
return true
155+
}
156+
}
157+
return false
158+
}
159+
160+
func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
161+
// take the one it's been the longest since we've flushed, so that we have some sense of
162+
// fairness across sources
163+
var earliestFlush time.Time
164+
for is, iq := range l.queues {
165+
if q == nil || iq.lastFlush.Before(earliestFlush) {
166+
src = is
167+
q = iq
168+
earliestFlush = iq.lastFlush
169+
}
170+
}
171+
return src, q
172+
}

agent/logs_internal_test.go

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/stretchr/testify/require"
10+
11+
"cdr.dev/slog"
12+
"cdr.dev/slog/sloggers/slogtest"
13+
"github.com/coder/coder/v2/agent/proto"
14+
"github.com/coder/coder/v2/coderd/database/dbtime"
15+
"github.com/coder/coder/v2/codersdk"
16+
"github.com/coder/coder/v2/codersdk/agentsdk"
17+
"github.com/coder/coder/v2/testutil"
18+
)
19+
20+
func TestLogSender(t *testing.T) {
21+
t.Parallel()
22+
testCtx := testutil.Context(t, testutil.WaitShort)
23+
ctx, cancel := context.WithCancel(testCtx)
24+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
25+
fDest := newFakeLogDest()
26+
uut := newLogSender(logger)
27+
28+
t0 := dbtime.Now()
29+
30+
ls1 := uuid.UUID{0x11}
31+
err := uut.enqueue(ls1, agentsdk.Log{
32+
CreatedAt: t0,
33+
Output: "test log 0, src 1",
34+
Level: codersdk.LogLevelInfo,
35+
})
36+
require.NoError(t, err)
37+
38+
ls2 := uuid.UUID{0x22}
39+
err = uut.enqueue(ls2,
40+
agentsdk.Log{
41+
CreatedAt: t0,
42+
Output: "test log 0, src 2",
43+
Level: codersdk.LogLevelError,
44+
},
45+
agentsdk.Log{
46+
CreatedAt: t0,
47+
Output: "test log 1, src 2",
48+
Level: codersdk.LogLevelWarn,
49+
},
50+
)
51+
require.NoError(t, err)
52+
53+
loopErr := make(chan error, 1)
54+
go func() {
55+
err := uut.sendLoop(ctx, fDest)
56+
loopErr <- err
57+
}()
58+
59+
// since neither source has even been flushed, it should immediately flush
60+
// both, although the order is not controlled
61+
var logReqs []*proto.BatchCreateLogsRequest
62+
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
63+
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
64+
for _, req := range logReqs {
65+
require.NotNil(t, req)
66+
srcID, err := uuid.FromBytes(req.LogSourceId)
67+
require.NoError(t, err)
68+
switch srcID {
69+
case ls1:
70+
require.Len(t, req.Logs, 1)
71+
require.Equal(t, "test log 0, src 1", req.Logs[0].GetOutput())
72+
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
73+
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
74+
case ls2:
75+
require.Len(t, req.Logs, 2)
76+
require.Equal(t, "test log 0, src 2", req.Logs[0].GetOutput())
77+
require.Equal(t, proto.Log_ERROR, req.Logs[0].GetLevel())
78+
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
79+
require.Equal(t, "test log 1, src 2", req.Logs[1].GetOutput())
80+
require.Equal(t, proto.Log_WARN, req.Logs[1].GetLevel())
81+
require.Equal(t, t0, req.Logs[1].GetCreatedAt().AsTime())
82+
default:
83+
t.Fatal("unknown log source")
84+
}
85+
}
86+
87+
t1 := dbtime.Now()
88+
err = uut.enqueue(ls1, agentsdk.Log{
89+
CreatedAt: t1,
90+
Output: "test log 1, src 1",
91+
Level: codersdk.LogLevelDebug,
92+
})
93+
require.NoError(t, err)
94+
err = uut.flush(ls1)
95+
require.NoError(t, err)
96+
97+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
98+
// give ourselves a 25% buffer if we're right on the cusp of a tick
99+
require.LessOrEqual(t, time.Since(t1), flushInterval*5/4)
100+
require.NotNil(t, req)
101+
require.Len(t, req.Logs, 1)
102+
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
103+
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
104+
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())
105+
106+
cancel()
107+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
108+
require.NoError(t, err)
109+
110+
// we can still enqueue more logs after sendLoop returns
111+
err = uut.enqueue(ls1, agentsdk.Log{
112+
CreatedAt: t1,
113+
Output: "test log 2, src 1",
114+
Level: codersdk.LogLevelTrace,
115+
})
116+
require.NoError(t, err)
117+
}
118+
119+
type fakeLogDest struct {
120+
reqs chan *proto.BatchCreateLogsRequest
121+
}
122+
123+
func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
124+
select {
125+
case <-ctx.Done():
126+
return nil, ctx.Err()
127+
case f.reqs <- req:
128+
return &proto.BatchCreateLogsResponse{}, nil
129+
}
130+
}
131+
132+
func newFakeLogDest() *fakeLogDest {
133+
return &fakeLogDest{
134+
reqs: make(chan *proto.BatchCreateLogsRequest),
135+
}
136+
}

codersdk/agentsdk/convert.go

+13
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/google/uuid"
88
"golang.org/x/xerrors"
99
"google.golang.org/protobuf/types/known/durationpb"
10+
"google.golang.org/protobuf/types/known/timestamppb"
1011

1112
"github.com/coder/coder/v2/agent/proto"
1213
"github.com/coder/coder/v2/codersdk"
@@ -293,3 +294,15 @@ func ProtoFromAppHealthsRequest(req PostAppHealthsRequest) (*proto.BatchUpdateAp
293294
}
294295
return pReq, nil
295296
}
297+
298+
func ProtoFromLog(log Log) (*proto.Log, error) {
299+
lvl, ok := proto.Log_Level_value[strings.ToUpper(string(log.Level))]
300+
if !ok {
301+
return nil, xerrors.Errorf("unknown log level: %s", log.Level)
302+
}
303+
return &proto.Log{
304+
CreatedAt: timestamppb.New(log.CreatedAt),
305+
Output: log.Output,
306+
Level: proto.Log_Level(lvl),
307+
}, nil
308+
}

0 commit comments

Comments
 (0)