|
| 1 | +package agent |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "sync" |
| 6 | + "time" |
| 7 | + |
| 8 | + "github.com/google/uuid" |
| 9 | + "golang.org/x/xerrors" |
| 10 | + |
| 11 | + "cdr.dev/slog" |
| 12 | + "github.com/coder/coder/v2/agent/proto" |
| 13 | + "github.com/coder/coder/v2/codersdk/agentsdk" |
| 14 | +) |
| 15 | + |
| 16 | +const ( |
| 17 | + flushInterval = time.Second |
| 18 | + maxBytesPerBatch = 1 << 20 // 1MiB |
| 19 | + overheadPerLog = 21 // found by testing |
| 20 | + |
| 21 | + // maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken |
| 22 | + // from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll |
| 23 | + // accept in the database. |
| 24 | + maxBytesQueued = 1048576 |
| 25 | +) |
| 26 | + |
| 27 | +type logQueue struct { |
| 28 | + logs []*proto.Log |
| 29 | + flushRequested bool |
| 30 | + lastFlush time.Time |
| 31 | +} |
| 32 | + |
| 33 | +// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the |
| 34 | +// agent API. Things that need to log call enqueue and flush. When the agent API becomes available, |
| 35 | +// the agent calls sendLoop to send pending logs. |
| 36 | +type logSender struct { |
| 37 | + *sync.Cond |
| 38 | + queues map[uuid.UUID]*logQueue |
| 39 | + logger slog.Logger |
| 40 | + exceededLogLimit bool |
| 41 | + outputLen int |
| 42 | +} |
| 43 | + |
| 44 | +type logDest interface { |
| 45 | + BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) |
| 46 | +} |
| 47 | + |
| 48 | +func newLogSender(logger slog.Logger) *logSender { |
| 49 | + return &logSender{ |
| 50 | + Cond: sync.NewCond(&sync.Mutex{}), |
| 51 | + logger: logger, |
| 52 | + queues: make(map[uuid.UUID]*logQueue), |
| 53 | + } |
| 54 | +} |
| 55 | + |
| 56 | +func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) { |
| 57 | + logger := l.logger.With(slog.F("log_source_id", src)) |
| 58 | + if len(logs) == 0 { |
| 59 | + logger.Debug(context.Background(), "enqueue called with no logs") |
| 60 | + return |
| 61 | + } |
| 62 | + l.L.Lock() |
| 63 | + defer l.L.Unlock() |
| 64 | + if l.exceededLogLimit { |
| 65 | + logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit") |
| 66 | + // don't error, as we also write to file and don't want the overall write to fail |
| 67 | + return |
| 68 | + } |
| 69 | + defer l.Broadcast() |
| 70 | + q, ok := l.queues[src] |
| 71 | + if !ok { |
| 72 | + q = &logQueue{} |
| 73 | + l.queues[src] = q |
| 74 | + } |
| 75 | + for k, log := range logs { |
| 76 | + // Here we check the queue size before adding a log because we want to queue up slightly |
| 77 | + // more logs than the database would store to ensure we trigger "logs truncated" at the |
| 78 | + // database layer. Otherwise, the end user wouldn't know logs are truncated unless they |
| 79 | + // examined the Coder agent logs. |
| 80 | + if l.outputLen > maxBytesQueued { |
| 81 | + logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs))) |
| 82 | + return |
| 83 | + } |
| 84 | + pl, err := agentsdk.ProtoFromLog(log) |
| 85 | + if err != nil { |
| 86 | + logger.Critical(context.Background(), "failed to convert log", slog.Error(err)) |
| 87 | + return |
| 88 | + } |
| 89 | + if len(pl.Output)+overheadPerLog > maxBytesPerBatch { |
| 90 | + logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output))) |
| 91 | + continue |
| 92 | + } |
| 93 | + q.logs = append(q.logs, pl) |
| 94 | + l.outputLen += len(pl.Output) |
| 95 | + } |
| 96 | + logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs))) |
| 97 | +} |
| 98 | + |
| 99 | +func (l *logSender) flush(src uuid.UUID) { |
| 100 | + l.L.Lock() |
| 101 | + defer l.L.Unlock() |
| 102 | + defer l.Broadcast() |
| 103 | + q, ok := l.queues[src] |
| 104 | + if ok { |
| 105 | + q.flushRequested = true |
| 106 | + } |
| 107 | + // queue might not exist because it's already been flushed and removed from |
| 108 | + // the map. |
| 109 | +} |
| 110 | + |
| 111 | +// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not |
| 112 | +// retry as it is expected that a higher layer retries establishing connection to the agent API and |
| 113 | +// calls sendLoop again. |
| 114 | +func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { |
| 115 | + l.L.Lock() |
| 116 | + defer l.L.Unlock() |
| 117 | + if l.exceededLogLimit { |
| 118 | + l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded") |
| 119 | + // no point in keeping this loop going, if log limit is exceeded, but don't return an |
| 120 | + // error because we're already handled it |
| 121 | + return nil |
| 122 | + } |
| 123 | + |
| 124 | + ctxDone := false |
| 125 | + defer l.logger.Debug(ctx, "sendLoop exiting") |
| 126 | + |
| 127 | + // wake 4 times per flush interval to check if anything needs to be flushed |
| 128 | + ctx, cancel := context.WithCancel(ctx) |
| 129 | + defer cancel() |
| 130 | + go func() { |
| 131 | + tkr := time.NewTicker(flushInterval / 4) |
| 132 | + defer tkr.Stop() |
| 133 | + for { |
| 134 | + select { |
| 135 | + // also monitor the context here, so we notice immediately, rather |
| 136 | + // than waiting for the next tick or logs |
| 137 | + case <-ctx.Done(): |
| 138 | + l.L.Lock() |
| 139 | + ctxDone = true |
| 140 | + l.L.Unlock() |
| 141 | + l.Broadcast() |
| 142 | + return |
| 143 | + case <-tkr.C: |
| 144 | + l.Broadcast() |
| 145 | + } |
| 146 | + } |
| 147 | + }() |
| 148 | + |
| 149 | + for { |
| 150 | + for !ctxDone && !l.hasPendingWorkLocked() { |
| 151 | + l.Wait() |
| 152 | + } |
| 153 | + if ctxDone { |
| 154 | + return nil |
| 155 | + } |
| 156 | + |
| 157 | + src, q := l.getPendingWorkLocked() |
| 158 | + logger := l.logger.With(slog.F("log_source_id", src)) |
| 159 | + q.flushRequested = false // clear flag since we're now flushing |
| 160 | + req := &proto.BatchCreateLogsRequest{ |
| 161 | + LogSourceId: src[:], |
| 162 | + } |
| 163 | + |
| 164 | + // outputToSend keeps track of the size of the protobuf message we send, while |
| 165 | + // outputToRemove keeps track of the size of the output we'll remove from the queues on |
| 166 | + // success. They are different because outputToSend also counts protocol message overheads. |
| 167 | + outputToSend := 0 |
| 168 | + outputToRemove := 0 |
| 169 | + n := 0 |
| 170 | + for n < len(q.logs) { |
| 171 | + log := q.logs[n] |
| 172 | + outputToSend += len(log.Output) + overheadPerLog |
| 173 | + if outputToSend > maxBytesPerBatch { |
| 174 | + break |
| 175 | + } |
| 176 | + req.Logs = append(req.Logs, log) |
| 177 | + n++ |
| 178 | + outputToRemove += len(log.Output) |
| 179 | + } |
| 180 | + |
| 181 | + l.L.Unlock() |
| 182 | + logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs))) |
| 183 | + resp, err := dest.BatchCreateLogs(ctx, req) |
| 184 | + l.L.Lock() |
| 185 | + if err != nil { |
| 186 | + return xerrors.Errorf("failed to upload logs: %w", err) |
| 187 | + } |
| 188 | + if resp.LogLimitExceeded { |
| 189 | + l.logger.Warn(ctx, "server log limit exceeded; logs truncated") |
| 190 | + l.exceededLogLimit = true |
| 191 | + // no point in keeping anything we have queued around, server will not accept them |
| 192 | + l.queues = make(map[uuid.UUID]*logQueue) |
| 193 | + // We've handled the error as best as we can. We don't want the server limit to grind |
| 194 | + // other things to a halt, so this is all we can do. |
| 195 | + return nil |
| 196 | + } |
| 197 | + |
| 198 | + // Since elsewhere we only append to the logs, here we can remove them |
| 199 | + // since we successfully sent them. First we nil the pointers though, |
| 200 | + // so that they can be gc'd. |
| 201 | + for i := 0; i < n; i++ { |
| 202 | + q.logs[i] = nil |
| 203 | + } |
| 204 | + q.logs = q.logs[n:] |
| 205 | + l.outputLen -= outputToRemove |
| 206 | + if len(q.logs) == 0 { |
| 207 | + // no empty queues |
| 208 | + delete(l.queues, src) |
| 209 | + continue |
| 210 | + } |
| 211 | + q.lastFlush = time.Now() |
| 212 | + } |
| 213 | +} |
| 214 | + |
| 215 | +func (l *logSender) hasPendingWorkLocked() bool { |
| 216 | + for _, q := range l.queues { |
| 217 | + if time.Since(q.lastFlush) > flushInterval { |
| 218 | + return true |
| 219 | + } |
| 220 | + if q.flushRequested { |
| 221 | + return true |
| 222 | + } |
| 223 | + } |
| 224 | + return false |
| 225 | +} |
| 226 | + |
| 227 | +func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) { |
| 228 | + // take the one it's been the longest since we've flushed, so that we have some sense of |
| 229 | + // fairness across sources |
| 230 | + var earliestFlush time.Time |
| 231 | + for is, iq := range l.queues { |
| 232 | + if q == nil || iq.lastFlush.Before(earliestFlush) { |
| 233 | + src = is |
| 234 | + q = iq |
| 235 | + earliestFlush = iq.lastFlush |
| 236 | + } |
| 237 | + } |
| 238 | + return src, q |
| 239 | +} |
0 commit comments