Skip to content

Commit 1dc0874

Browse files
committed
Merge branch 'reland-activity-and-autostop-changes' into optional-external-auth-frontend
2 parents fea76b0 + ed8f526 commit 1dc0874

File tree

75 files changed

+2085
-495
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+2085
-495
lines changed

agent/logs.go

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/xerrors"
10+
11+
"cdr.dev/slog"
12+
"github.com/coder/coder/v2/agent/proto"
13+
"github.com/coder/coder/v2/codersdk/agentsdk"
14+
)
15+
16+
const (
17+
flushInterval = time.Second
18+
maxBytesPerBatch = 1 << 20 // 1MiB
19+
overheadPerLog = 21 // found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued = 1048576
25+
)
26+
27+
type logQueue struct {
28+
logs []*proto.Log
29+
flushRequested bool
30+
lastFlush time.Time
31+
}
32+
33+
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
34+
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
35+
// the agent calls sendLoop to send pending logs.
36+
type logSender struct {
37+
*sync.Cond
38+
queues map[uuid.UUID]*logQueue
39+
logger slog.Logger
40+
exceededLogLimit bool
41+
outputLen int
42+
}
43+
44+
type logDest interface {
45+
BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
46+
}
47+
48+
func newLogSender(logger slog.Logger) *logSender {
49+
return &logSender{
50+
Cond: sync.NewCond(&sync.Mutex{}),
51+
logger: logger,
52+
queues: make(map[uuid.UUID]*logQueue),
53+
}
54+
}
55+
56+
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) {
57+
logger := l.logger.With(slog.F("log_source_id", src))
58+
if len(logs) == 0 {
59+
logger.Debug(context.Background(), "enqueue called with no logs")
60+
return
61+
}
62+
l.L.Lock()
63+
defer l.L.Unlock()
64+
if l.exceededLogLimit {
65+
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
66+
// don't error, as we also write to file and don't want the overall write to fail
67+
return
68+
}
69+
defer l.Broadcast()
70+
q, ok := l.queues[src]
71+
if !ok {
72+
q = &logQueue{}
73+
l.queues[src] = q
74+
}
75+
for k, log := range logs {
76+
// Here we check the queue size before adding a log because we want to queue up slightly
77+
// more logs than the database would store to ensure we trigger "logs truncated" at the
78+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
79+
// examined the Coder agent logs.
80+
if l.outputLen > maxBytesQueued {
81+
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
82+
return
83+
}
84+
pl, err := agentsdk.ProtoFromLog(log)
85+
if err != nil {
86+
logger.Critical(context.Background(), "failed to convert log", slog.Error(err))
87+
return
88+
}
89+
if len(pl.Output)+overheadPerLog > maxBytesPerBatch {
90+
logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output)))
91+
continue
92+
}
93+
q.logs = append(q.logs, pl)
94+
l.outputLen += len(pl.Output)
95+
}
96+
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
97+
}
98+
99+
func (l *logSender) flush(src uuid.UUID) {
100+
l.L.Lock()
101+
defer l.L.Unlock()
102+
defer l.Broadcast()
103+
q, ok := l.queues[src]
104+
if ok {
105+
q.flushRequested = true
106+
}
107+
// queue might not exist because it's already been flushed and removed from
108+
// the map.
109+
}
110+
111+
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
112+
// retry as it is expected that a higher layer retries establishing connection to the agent API and
113+
// calls sendLoop again.
114+
func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
115+
l.L.Lock()
116+
defer l.L.Unlock()
117+
if l.exceededLogLimit {
118+
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
119+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
120+
// error because we're already handled it
121+
return nil
122+
}
123+
124+
ctxDone := false
125+
defer l.logger.Debug(ctx, "sendLoop exiting")
126+
127+
// wake 4 times per flush interval to check if anything needs to be flushed
128+
ctx, cancel := context.WithCancel(ctx)
129+
defer cancel()
130+
go func() {
131+
tkr := time.NewTicker(flushInterval / 4)
132+
defer tkr.Stop()
133+
for {
134+
select {
135+
// also monitor the context here, so we notice immediately, rather
136+
// than waiting for the next tick or logs
137+
case <-ctx.Done():
138+
l.L.Lock()
139+
ctxDone = true
140+
l.L.Unlock()
141+
l.Broadcast()
142+
return
143+
case <-tkr.C:
144+
l.Broadcast()
145+
}
146+
}
147+
}()
148+
149+
for {
150+
for !ctxDone && !l.hasPendingWorkLocked() {
151+
l.Wait()
152+
}
153+
if ctxDone {
154+
return nil
155+
}
156+
157+
src, q := l.getPendingWorkLocked()
158+
logger := l.logger.With(slog.F("log_source_id", src))
159+
q.flushRequested = false // clear flag since we're now flushing
160+
req := &proto.BatchCreateLogsRequest{
161+
LogSourceId: src[:],
162+
}
163+
164+
// outputToSend keeps track of the size of the protobuf message we send, while
165+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
166+
// success. They are different because outputToSend also counts protocol message overheads.
167+
outputToSend := 0
168+
outputToRemove := 0
169+
n := 0
170+
for n < len(q.logs) {
171+
log := q.logs[n]
172+
outputToSend += len(log.Output) + overheadPerLog
173+
if outputToSend > maxBytesPerBatch {
174+
break
175+
}
176+
req.Logs = append(req.Logs, log)
177+
n++
178+
outputToRemove += len(log.Output)
179+
}
180+
181+
l.L.Unlock()
182+
logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
183+
resp, err := dest.BatchCreateLogs(ctx, req)
184+
l.L.Lock()
185+
if err != nil {
186+
return xerrors.Errorf("failed to upload logs: %w", err)
187+
}
188+
if resp.LogLimitExceeded {
189+
l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
190+
l.exceededLogLimit = true
191+
// no point in keeping anything we have queued around, server will not accept them
192+
l.queues = make(map[uuid.UUID]*logQueue)
193+
// We've handled the error as best as we can. We don't want the server limit to grind
194+
// other things to a halt, so this is all we can do.
195+
return nil
196+
}
197+
198+
// Since elsewhere we only append to the logs, here we can remove them
199+
// since we successfully sent them. First we nil the pointers though,
200+
// so that they can be gc'd.
201+
for i := 0; i < n; i++ {
202+
q.logs[i] = nil
203+
}
204+
q.logs = q.logs[n:]
205+
l.outputLen -= outputToRemove
206+
if len(q.logs) == 0 {
207+
// no empty queues
208+
delete(l.queues, src)
209+
continue
210+
}
211+
q.lastFlush = time.Now()
212+
}
213+
}
214+
215+
func (l *logSender) hasPendingWorkLocked() bool {
216+
for _, q := range l.queues {
217+
if time.Since(q.lastFlush) > flushInterval {
218+
return true
219+
}
220+
if q.flushRequested {
221+
return true
222+
}
223+
}
224+
return false
225+
}
226+
227+
func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
228+
// take the one it's been the longest since we've flushed, so that we have some sense of
229+
// fairness across sources
230+
var earliestFlush time.Time
231+
for is, iq := range l.queues {
232+
if q == nil || iq.lastFlush.Before(earliestFlush) {
233+
src = is
234+
q = iq
235+
earliestFlush = iq.lastFlush
236+
}
237+
}
238+
return src, q
239+
}

0 commit comments

Comments
 (0)