Skip to content

Commit ab4cb66

Browse files
authored
feat: add WaitUntilEmpty to LogSender (#12159)
We'll need this to be able to tell when all outstanding logs have been sent, as part of graceful shutdown.
1 parent 081e37d commit ab4cb66

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

codersdk/agentsdk/logs.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error {
437437
l.exceededLogLimit = true
438438
// no point in keeping anything we have queued around, server will not accept them
439439
l.queues = make(map[uuid.UUID]*logQueue)
440+
l.Broadcast() // might unblock WaitUntilEmpty
440441
return LogLimitExceededError
441442
}
442443

@@ -451,6 +452,7 @@ func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error {
451452
if len(q.logs) == 0 {
452453
// no empty queues
453454
delete(l.queues, src)
455+
l.Broadcast() // might unblock WaitUntilEmpty
454456
continue
455457
}
456458
q.lastFlush = time.Now()
@@ -487,6 +489,34 @@ func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger {
487489
return ScriptLogger{srcID: logSourceID, sender: l}
488490
}
489491

492+
// WaitUntilEmpty waits until the LogSender's queues are empty or the given context expires.
493+
func (l *LogSender) WaitUntilEmpty(ctx context.Context) error {
494+
ctxDone := false
495+
nevermind := make(chan struct{})
496+
defer close(nevermind)
497+
go func() {
498+
select {
499+
case <-ctx.Done():
500+
l.L.Lock()
501+
defer l.L.Unlock()
502+
ctxDone = true
503+
l.Broadcast()
504+
return
505+
case <-nevermind:
506+
return
507+
}
508+
}()
509+
l.L.Lock()
510+
defer l.L.Unlock()
511+
for len(l.queues) != 0 && !ctxDone {
512+
l.Wait()
513+
}
514+
if len(l.queues) == 0 {
515+
return nil
516+
}
517+
return ctx.Err()
518+
}
519+
490520
type ScriptLogger struct {
491521
sender *LogSender
492522
srcID uuid.UUID

codersdk/agentsdk/logs_internal_test.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func TestLogSender_Mainline(t *testing.T) {
5656
loopErr <- err
5757
}()
5858

59+
empty := make(chan error, 1)
60+
go func() {
61+
err := uut.WaitUntilEmpty(ctx)
62+
empty <- err
63+
}()
64+
5965
// since neither source has even been flushed, it should immediately Flush
6066
// both, although the order is not controlled
6167
var logReqs []*proto.BatchCreateLogsRequest
@@ -104,8 +110,11 @@ func TestLogSender_Mainline(t *testing.T) {
104110
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
105111
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())
106112

113+
err := testutil.RequireRecvCtx(ctx, t, empty)
114+
require.NoError(t, err)
115+
107116
cancel()
108-
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
117+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
109118
require.ErrorIs(t, err, context.Canceled)
110119

111120
// we can still enqueue more logs after SendLoop returns
@@ -132,6 +141,12 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
132141
Level: codersdk.LogLevelInfo,
133142
})
134143

144+
empty := make(chan error, 1)
145+
go func() {
146+
err := uut.WaitUntilEmpty(ctx)
147+
empty <- err
148+
}()
149+
135150
loopErr := make(chan error, 1)
136151
go func() {
137152
err := uut.SendLoop(ctx, fDest)
@@ -146,6 +161,10 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
146161
err := testutil.RequireRecvCtx(ctx, t, loopErr)
147162
require.ErrorIs(t, err, LogLimitExceededError)
148163

164+
// Should also unblock WaitUntilEmpty
165+
err = testutil.RequireRecvCtx(ctx, t, empty)
166+
require.NoError(t, err)
167+
149168
// we can still enqueue more logs after SendLoop returns, but they don't
150169
// actually get enqueued
151170
uut.Enqueue(ls1, Log{
@@ -363,6 +382,33 @@ func TestLogSender_SendError(t *testing.T) {
363382
uut.L.Unlock()
364383
}
365384

385+
func TestLogSender_WaitUntilEmpty_ContextExpired(t *testing.T) {
386+
t.Parallel()
387+
testCtx := testutil.Context(t, testutil.WaitShort)
388+
ctx, cancel := context.WithCancel(testCtx)
389+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
390+
uut := NewLogSender(logger)
391+
392+
t0 := dbtime.Now()
393+
394+
ls1 := uuid.UUID{0x11}
395+
uut.Enqueue(ls1, Log{
396+
CreatedAt: t0,
397+
Output: "test log 0, src 1",
398+
Level: codersdk.LogLevelInfo,
399+
})
400+
401+
empty := make(chan error, 1)
402+
go func() {
403+
err := uut.WaitUntilEmpty(ctx)
404+
empty <- err
405+
}()
406+
407+
cancel()
408+
err := testutil.RequireRecvCtx(testCtx, t, empty)
409+
require.ErrorIs(t, err, context.Canceled)
410+
}
411+
366412
type fakeLogDest struct {
367413
reqs chan *proto.BatchCreateLogsRequest
368414
resps chan *proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)