Skip to content

Commit 2aff014

Browse files
authored
feat: add logSender for sending logs on agent v2 API (coder#12046)
Adds a new subcomponent of the agent for queueing up logs until they can be sent over the Agent API. Subsequent PR will change the agent to use this instead of the HTTP API for posting logs. Relates to coder#10534
1 parent 627232e commit 2aff014

File tree

3 files changed

+652
-0
lines changed

3 files changed

+652
-0
lines changed

agent/logs.go

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/xerrors"
10+
11+
"cdr.dev/slog"
12+
"github.com/coder/coder/v2/agent/proto"
13+
"github.com/coder/coder/v2/codersdk/agentsdk"
14+
)
15+
16+
const (
17+
flushInterval = time.Second
18+
maxBytesPerBatch = 1 << 20 // 1MiB
19+
overheadPerLog = 21 // found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued = 1048576
25+
)
26+
27+
type logQueue struct {
28+
logs []*proto.Log
29+
flushRequested bool
30+
lastFlush time.Time
31+
}
32+
33+
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
34+
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
35+
// the agent calls sendLoop to send pending logs.
36+
type logSender struct {
37+
*sync.Cond
38+
queues map[uuid.UUID]*logQueue
39+
logger slog.Logger
40+
exceededLogLimit bool
41+
outputLen int
42+
}
43+
44+
type logDest interface {
45+
BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
46+
}
47+
48+
func newLogSender(logger slog.Logger) *logSender {
49+
return &logSender{
50+
Cond: sync.NewCond(&sync.Mutex{}),
51+
logger: logger,
52+
queues: make(map[uuid.UUID]*logQueue),
53+
}
54+
}
55+
56+
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) {
57+
logger := l.logger.With(slog.F("log_source_id", src))
58+
if len(logs) == 0 {
59+
logger.Debug(context.Background(), "enqueue called with no logs")
60+
return
61+
}
62+
l.L.Lock()
63+
defer l.L.Unlock()
64+
if l.exceededLogLimit {
65+
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
66+
// don't error, as we also write to file and don't want the overall write to fail
67+
return
68+
}
69+
defer l.Broadcast()
70+
q, ok := l.queues[src]
71+
if !ok {
72+
q = &logQueue{}
73+
l.queues[src] = q
74+
}
75+
for k, log := range logs {
76+
// Here we check the queue size before adding a log because we want to queue up slightly
77+
// more logs than the database would store to ensure we trigger "logs truncated" at the
78+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
79+
// examined the Coder agent logs.
80+
if l.outputLen > maxBytesQueued {
81+
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
82+
return
83+
}
84+
pl, err := agentsdk.ProtoFromLog(log)
85+
if err != nil {
86+
logger.Critical(context.Background(), "failed to convert log", slog.Error(err))
87+
return
88+
}
89+
if len(pl.Output)+overheadPerLog > maxBytesPerBatch {
90+
logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output)))
91+
continue
92+
}
93+
q.logs = append(q.logs, pl)
94+
l.outputLen += len(pl.Output)
95+
}
96+
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
97+
}
98+
99+
func (l *logSender) flush(src uuid.UUID) {
100+
l.L.Lock()
101+
defer l.L.Unlock()
102+
defer l.Broadcast()
103+
q, ok := l.queues[src]
104+
if ok {
105+
q.flushRequested = true
106+
}
107+
// queue might not exist because it's already been flushed and removed from
108+
// the map.
109+
}
110+
111+
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
112+
// retry as it is expected that a higher layer retries establishing connection to the agent API and
113+
// calls sendLoop again.
114+
func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
115+
l.L.Lock()
116+
defer l.L.Unlock()
117+
if l.exceededLogLimit {
118+
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
119+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
120+
// error because we're already handled it
121+
return nil
122+
}
123+
124+
ctxDone := false
125+
defer l.logger.Debug(ctx, "sendLoop exiting")
126+
127+
// wake 4 times per flush interval to check if anything needs to be flushed
128+
ctx, cancel := context.WithCancel(ctx)
129+
defer cancel()
130+
go func() {
131+
tkr := time.NewTicker(flushInterval / 4)
132+
defer tkr.Stop()
133+
for {
134+
select {
135+
// also monitor the context here, so we notice immediately, rather
136+
// than waiting for the next tick or logs
137+
case <-ctx.Done():
138+
l.L.Lock()
139+
ctxDone = true
140+
l.L.Unlock()
141+
l.Broadcast()
142+
return
143+
case <-tkr.C:
144+
l.Broadcast()
145+
}
146+
}
147+
}()
148+
149+
for {
150+
for !ctxDone && !l.hasPendingWorkLocked() {
151+
l.Wait()
152+
}
153+
if ctxDone {
154+
return nil
155+
}
156+
157+
src, q := l.getPendingWorkLocked()
158+
logger := l.logger.With(slog.F("log_source_id", src))
159+
q.flushRequested = false // clear flag since we're now flushing
160+
req := &proto.BatchCreateLogsRequest{
161+
LogSourceId: src[:],
162+
}
163+
164+
// outputToSend keeps track of the size of the protobuf message we send, while
165+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
166+
// success. They are different because outputToSend also counts protocol message overheads.
167+
outputToSend := 0
168+
outputToRemove := 0
169+
n := 0
170+
for n < len(q.logs) {
171+
log := q.logs[n]
172+
outputToSend += len(log.Output) + overheadPerLog
173+
if outputToSend > maxBytesPerBatch {
174+
break
175+
}
176+
req.Logs = append(req.Logs, log)
177+
n++
178+
outputToRemove += len(log.Output)
179+
}
180+
181+
l.L.Unlock()
182+
logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
183+
resp, err := dest.BatchCreateLogs(ctx, req)
184+
l.L.Lock()
185+
if err != nil {
186+
return xerrors.Errorf("failed to upload logs: %w", err)
187+
}
188+
if resp.LogLimitExceeded {
189+
l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
190+
l.exceededLogLimit = true
191+
// no point in keeping anything we have queued around, server will not accept them
192+
l.queues = make(map[uuid.UUID]*logQueue)
193+
// We've handled the error as best as we can. We don't want the server limit to grind
194+
// other things to a halt, so this is all we can do.
195+
return nil
196+
}
197+
198+
// Since elsewhere we only append to the logs, here we can remove them
199+
// since we successfully sent them. First we nil the pointers though,
200+
// so that they can be gc'd.
201+
for i := 0; i < n; i++ {
202+
q.logs[i] = nil
203+
}
204+
q.logs = q.logs[n:]
205+
l.outputLen -= outputToRemove
206+
if len(q.logs) == 0 {
207+
// no empty queues
208+
delete(l.queues, src)
209+
continue
210+
}
211+
q.lastFlush = time.Now()
212+
}
213+
}
214+
215+
func (l *logSender) hasPendingWorkLocked() bool {
216+
for _, q := range l.queues {
217+
if time.Since(q.lastFlush) > flushInterval {
218+
return true
219+
}
220+
if q.flushRequested {
221+
return true
222+
}
223+
}
224+
return false
225+
}
226+
227+
func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
228+
// take the one it's been the longest since we've flushed, so that we have some sense of
229+
// fairness across sources
230+
var earliestFlush time.Time
231+
for is, iq := range l.queues {
232+
if q == nil || iq.lastFlush.Before(earliestFlush) {
233+
src = is
234+
q = iq
235+
earliestFlush = iq.lastFlush
236+
}
237+
}
238+
return src, q
239+
}

0 commit comments

Comments
 (0)