From 95329374174491e993bba9003437b1469c0dd6e6 Mon Sep 17 00:00:00 2001 From: Michael Suchacz <203725896+ibetitsmike@users.noreply.github.com> Date: Fri, 12 Sep 2025 13:33:47 +0000 Subject: [PATCH] fix(backedpipe): deflake BackedWriter tests (sync on errChan, buffer replayStarted) --- .../backedpipe/backed_writer_test.go | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/agent/immortalstreams/backedpipe/backed_writer_test.go b/agent/immortalstreams/backedpipe/backed_writer_test.go index a1a77b36bc7e5..b61425e8278a8 100644 --- a/agent/immortalstreams/backedpipe/backed_writer_test.go +++ b/agent/immortalstreams/backedpipe/backed_writer_test.go @@ -177,18 +177,12 @@ func TestBackedWriter_BlockOnWriteFailure(t *testing.T) { // Expected - write is blocked } - // Should be disconnected + // Wait for error event which implies writer was marked disconnected + receivedErrorEvent := testutil.RequireReceive(ctx, t, errChan) + require.Contains(t, receivedErrorEvent.Err.Error(), "write failed") + require.Equal(t, "writer", receivedErrorEvent.Component) require.False(t, bw.Connected()) - // Error should be sent to error channel - select { - case receivedErrorEvent := <-errChan: - require.Contains(t, receivedErrorEvent.Err.Error(), "write failed") - require.Equal(t, "writer", receivedErrorEvent.Component) - default: - t.Fatal("Expected error to be sent to error channel") - } - // Reconnect with working writer and verify write completes writer2 := newMockWriter() err = bw.Reconnect(0, writer2) // Replay from beginning @@ -205,6 +199,7 @@ func TestBackedWriter_BlockOnWriteFailure(t *testing.T) { func TestBackedWriter_ReplayOnReconnect(t *testing.T) { t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) errChan := make(chan backedpipe.ErrorEvent, 1) bw := backedpipe.NewBackedWriter(backedpipe.DefaultBufferSize, errChan) @@ -243,6 +238,10 @@ func TestBackedWriter_ReplayOnReconnect(t *testing.T) { // Expected - write is blocked } + // Wait for error event which implies writer was marked disconnected + receivedErrorEvent := testutil.RequireReceive(ctx, t, errChan) + require.Contains(t, receivedErrorEvent.Err.Error(), "connection lost") + require.Equal(t, "writer", receivedErrorEvent.Component) require.False(t, bw.Connected()) // Reconnect with new writer and request replay from beginning @@ -479,18 +478,12 @@ func TestBackedWriter_BlockOnPartialWrite(t *testing.T) { // Expected - write is blocked } - // Should be disconnected + // Wait for error event which implies writer was marked disconnected + receivedErrorEvent := testutil.RequireReceive(ctx, t, errChan) + require.Contains(t, receivedErrorEvent.Err.Error(), "short write") + require.Equal(t, "writer", receivedErrorEvent.Component) require.False(t, bw.Connected()) - // Error should be sent to error channel - select { - case receivedErrorEvent := <-errChan: - require.Contains(t, receivedErrorEvent.Err.Error(), "short write") - require.Equal(t, "writer", receivedErrorEvent.Component) - default: - t.Fatal("Expected error to be sent to error channel") - } - // Reconnect with working writer and verify write completes writer2 := newMockWriter() err := bw.Reconnect(0, writer2) // Replay from beginning @@ -605,7 +598,10 @@ func TestBackedWriter_WriteBlocksAfterDisconnection(t *testing.T) { // Expected - write is blocked } - // Should be disconnected + // Wait for error event which implies writer was marked disconnected + receivedErrorEvent := testutil.RequireReceive(ctx, t, errChan) + require.Contains(t, receivedErrorEvent.Err.Error(), "connection lost") + require.Equal(t, "writer", receivedErrorEvent.Component) require.False(t, bw.Connected()) // Reconnect and verify write completes @@ -910,7 +906,7 @@ func TestBackedWriter_MultipleWritesDuringReconnect(t *testing.T) { <-writesReadyTimer.C // Start reconnection with controlled replay - replayStarted := make(chan struct{}) + replayStarted := make(chan struct{}, 1) replayCanComplete := make(chan struct{}) writer2 := &mockWriter{ writeFunc: func(p []byte) (int, error) {