From dfe07f99935e8dd53d1fd491768f9ad01727c2b9 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Thu, 15 Feb 2024 14:11:49 +0400 Subject: [PATCH] feat: add WaitUntilEmpty to LogSender --- codersdk/agentsdk/logs.go | 30 ++++++++++++++++ codersdk/agentsdk/logs_internal_test.go | 48 ++++++++++++++++++++++++- 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/codersdk/agentsdk/logs.go b/codersdk/agentsdk/logs.go index a28aede7bd177..9db47adf35fb2 100644 --- a/codersdk/agentsdk/logs.go +++ b/codersdk/agentsdk/logs.go @@ -437,6 +437,7 @@ func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error { l.exceededLogLimit = true // no point in keeping anything we have queued around, server will not accept them l.queues = make(map[uuid.UUID]*logQueue) + l.Broadcast() // might unblock WaitUntilEmpty return LogLimitExceededError } @@ -451,6 +452,7 @@ func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error { if len(q.logs) == 0 { // no empty queues delete(l.queues, src) + l.Broadcast() // might unblock WaitUntilEmpty continue } q.lastFlush = time.Now() @@ -487,6 +489,34 @@ func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger { return ScriptLogger{srcID: logSourceID, sender: l} } +// WaitUntilEmpty waits until the LogSender's queues are empty or the given context expires. +func (l *LogSender) WaitUntilEmpty(ctx context.Context) error { + ctxDone := false + nevermind := make(chan struct{}) + defer close(nevermind) + go func() { + select { + case <-ctx.Done(): + l.L.Lock() + defer l.L.Unlock() + ctxDone = true + l.Broadcast() + return + case <-nevermind: + return + } + }() + l.L.Lock() + defer l.L.Unlock() + for len(l.queues) != 0 && !ctxDone { + l.Wait() + } + if len(l.queues) == 0 { + return nil + } + return ctx.Err() +} + type ScriptLogger struct { sender *LogSender srcID uuid.UUID diff --git a/codersdk/agentsdk/logs_internal_test.go b/codersdk/agentsdk/logs_internal_test.go index 527ebbea81f64..d942689d31465 100644 --- a/codersdk/agentsdk/logs_internal_test.go +++ b/codersdk/agentsdk/logs_internal_test.go @@ -56,6 +56,12 @@ func TestLogSender_Mainline(t *testing.T) { loopErr <- err }() + empty := make(chan error, 1) + go func() { + err := uut.WaitUntilEmpty(ctx) + empty <- err + }() + // since neither source has even been flushed, it should immediately Flush // both, although the order is not controlled var logReqs []*proto.BatchCreateLogsRequest @@ -104,8 +110,11 @@ func TestLogSender_Mainline(t *testing.T) { require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel()) require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime()) + err := testutil.RequireRecvCtx(ctx, t, empty) + require.NoError(t, err) + cancel() - err := testutil.RequireRecvCtx(testCtx, t, loopErr) + err = testutil.RequireRecvCtx(testCtx, t, loopErr) require.ErrorIs(t, err, context.Canceled) // we can still enqueue more logs after SendLoop returns @@ -132,6 +141,12 @@ func TestLogSender_LogLimitExceeded(t *testing.T) { Level: codersdk.LogLevelInfo, }) + empty := make(chan error, 1) + go func() { + err := uut.WaitUntilEmpty(ctx) + empty <- err + }() + loopErr := make(chan error, 1) go func() { err := uut.SendLoop(ctx, fDest) @@ -146,6 +161,10 @@ func TestLogSender_LogLimitExceeded(t *testing.T) { err := testutil.RequireRecvCtx(ctx, t, loopErr) require.ErrorIs(t, err, LogLimitExceededError) + // Should also unblock WaitUntilEmpty + err = testutil.RequireRecvCtx(ctx, t, empty) + require.NoError(t, err) + // we can still enqueue more logs after SendLoop returns, but they don't // actually get enqueued uut.Enqueue(ls1, Log{ @@ -363,6 +382,33 @@ func TestLogSender_SendError(t *testing.T) { uut.L.Unlock() } +func TestLogSender_WaitUntilEmpty_ContextExpired(t *testing.T) { + t.Parallel() + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + uut := NewLogSender(logger) + + t0 := dbtime.Now() + + ls1 := uuid.UUID{0x11} + uut.Enqueue(ls1, Log{ + CreatedAt: t0, + Output: "test log 0, src 1", + Level: codersdk.LogLevelInfo, + }) + + empty := make(chan error, 1) + go func() { + err := uut.WaitUntilEmpty(ctx) + empty <- err + }() + + cancel() + err := testutil.RequireRecvCtx(testCtx, t, empty) + require.ErrorIs(t, err, context.Canceled) +} + type fakeLogDest struct { reqs chan *proto.BatchCreateLogsRequest resps chan *proto.BatchCreateLogsResponse