Skip to content

Commit f39d152

Browse files
committed
feat: add WaitUntilEmpty to LogSender
1 parent 2a2203e commit f39d152

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

codersdk/agentsdk/logs.go

+28
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,34 @@ func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger {
483483
return ScriptLogger{srcID: logSourceID, sender: l}
484484
}
485485

486+
// WaitUntilEmpty waits until the LogSender's queues are empty or the given context expires.
487+
func (l *LogSender) WaitUntilEmpty(ctx context.Context) error {
488+
ctxDone := false
489+
nevermind := make(chan struct{})
490+
defer close(nevermind)
491+
go func() {
492+
select {
493+
case <-ctx.Done():
494+
l.L.Lock()
495+
defer l.L.Unlock()
496+
ctxDone = true
497+
l.Broadcast()
498+
return
499+
case <-nevermind:
500+
return
501+
}
502+
}()
503+
l.L.Lock()
504+
defer l.L.Unlock()
505+
for len(l.queues) != 0 && !ctxDone {
506+
l.Wait()
507+
}
508+
if len(l.queues) == 0 {
509+
return nil
510+
}
511+
return ctx.Err()
512+
}
513+
486514
type ScriptLogger struct {
487515
sender *LogSender
488516
srcID uuid.UUID

codersdk/agentsdk/logs_internal_test.go

+37-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func TestLogSender_Mainline(t *testing.T) {
5656
loopErr <- err
5757
}()
5858

59+
empty := make(chan error, 1)
60+
go func() {
61+
err := uut.WaitUntilEmpty(ctx)
62+
empty <- err
63+
}()
64+
5965
// since neither source has even been flushed, it should immediately Flush
6066
// both, although the order is not controlled
6167
var logReqs []*proto.BatchCreateLogsRequest
@@ -104,8 +110,11 @@ func TestLogSender_Mainline(t *testing.T) {
104110
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
105111
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())
106112

113+
err := testutil.RequireRecvCtx(ctx, t, empty)
114+
require.NoError(t, err)
115+
107116
cancel()
108-
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
117+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
109118
require.NoError(t, err)
110119

111120
// we can still enqueue more logs after SendLoop returns
@@ -363,6 +372,33 @@ func TestLogSender_SendError(t *testing.T) {
363372
uut.L.Unlock()
364373
}
365374

375+
func TestLogSender_WaitUntilEmpty_ContextExpired(t *testing.T) {
376+
t.Parallel()
377+
testCtx := testutil.Context(t, testutil.WaitShort)
378+
ctx, cancel := context.WithCancel(testCtx)
379+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
380+
uut := NewLogSender(logger)
381+
382+
t0 := dbtime.Now()
383+
384+
ls1 := uuid.UUID{0x11}
385+
uut.Enqueue(ls1, Log{
386+
CreatedAt: t0,
387+
Output: "test log 0, src 1",
388+
Level: codersdk.LogLevelInfo,
389+
})
390+
391+
empty := make(chan error, 1)
392+
go func() {
393+
err := uut.WaitUntilEmpty(ctx)
394+
empty <- err
395+
}()
396+
397+
cancel()
398+
err := testutil.RequireRecvCtx(testCtx, t, empty)
399+
require.ErrorIs(t, err, context.Canceled)
400+
}
401+
366402
type fakeLogDest struct {
367403
reqs chan *proto.BatchCreateLogsRequest
368404
resps chan *proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)