Skip to content

Commit 95dc01a

Browse files
committed
fixed flaky tests
1 parent 064514e commit 95dc01a

File tree

3 files changed

+82
-23
lines changed

3 files changed

+82
-23
lines changed

agent/immortalstreams/backedpipe/backed_pipe.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ func (bp *BackedPipe) ForceReconnect() error {
327327
// Deduplicate concurrent ForceReconnect calls so only one reconnection
328328
// attempt runs at a time from this API. Use the pipe's internal context
329329
// to ensure Close() cancels any in-flight attempt.
330-
_, err, _ := bp.sf.Do("backedpipe-reconnect", func() (interface{}, error) {
330+
_, err, _ := bp.sf.Do("force-reconnect", func() (interface{}, error) {
331331
bp.mu.Lock()
332332
defer bp.mu.Unlock()
333333

agent/immortalstreams/backedpipe/backed_pipe_test.go

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -164,21 +164,30 @@ type blockingReconnector struct {
164164
callCount int
165165
blockChan <-chan struct{}
166166
blockedChan chan struct{}
167+
mu sync.Mutex
168+
signalOnce sync.Once // Ensure we only signal once for the first actual reconnect
167169
}
168170

169171
func (b *blockingReconnector) Reconnect(ctx context.Context, writerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
172+
b.mu.Lock()
170173
b.callCount++
174+
currentCall := b.callCount
175+
b.mu.Unlock()
171176

172-
if b.callCount == 1 {
177+
if currentCall == 1 {
173178
// Initial connect
174179
return b.conn1, 0, nil
175180
}
176181

177-
// Signal that we're about to block
178-
select {
179-
case b.blockedChan <- struct{}{}:
180-
default:
181-
}
182+
// Signal that we're about to block, but only once for the first reconnect attempt
183+
// This ensures we properly test singleflight deduplication
184+
b.signalOnce.Do(func() {
185+
select {
186+
case b.blockedChan <- struct{}{}:
187+
default:
188+
// If channel is full, don't block
189+
}
190+
})
182191

183192
// For subsequent calls, block until channel is closed
184193
select {
@@ -686,22 +695,57 @@ func TestBackedPipe_DuplicateReconnectionPrevention(t *testing.T) {
686695
require.NoError(t, err)
687696
require.Equal(t, 1, *callCount, "should have exactly 1 call after initial connect")
688697

689-
// Start multiple concurrent ForceReconnect calls while the reconnect function is blocked
698+
// We'll use channels to coordinate the test execution:
699+
// 1. Start all goroutines but have them wait
700+
// 2. Release the first one and wait for it to block
701+
// 3. Release the others while the first is still blocked
702+
703+
const numConcurrent = 3
704+
startSignals := make([]chan struct{}, numConcurrent)
705+
startedSignals := make([]chan struct{}, numConcurrent)
706+
for i := range startSignals {
707+
startSignals[i] = make(chan struct{})
708+
startedSignals[i] = make(chan struct{})
709+
}
710+
711+
errors := make([]error, numConcurrent)
690712
var wg sync.WaitGroup
691-
errors := make([]error, 3)
692713

693-
for i := 0; i < 3; i++ {
714+
// Start all goroutines
715+
for i := 0; i < numConcurrent; i++ {
694716
wg.Add(1)
695717
go func(idx int) {
696718
defer wg.Done()
719+
// Wait for the signal to start
720+
<-startSignals[idx]
721+
// Signal that we're about to call ForceReconnect
722+
close(startedSignals[idx])
697723
errors[idx] = bp.ForceReconnect()
698724
}(i)
699725
}
700726

701-
// Wait for the reconnect function to signal that it's blocking
727+
// Start the first ForceReconnect and wait for it to block
728+
close(startSignals[0])
729+
<-startedSignals[0]
730+
731+
// Wait for the first reconnect to actually start and block
702732
testutil.RequireReceive(testCtx, t, blockedChan)
703733

704-
// Verify that exactly one more reconnect call was made (singleflight deduplication)
734+
// Now start all the other ForceReconnect calls
735+
// They should all join the same singleflight operation
736+
for i := 1; i < numConcurrent; i++ {
737+
close(startSignals[i])
738+
}
739+
740+
// Wait for all additional goroutines to have started their calls
741+
for i := 1; i < numConcurrent; i++ {
742+
<-startedSignals[i]
743+
}
744+
745+
// At this point, one reconnect has started and is blocked,
746+
// and all other goroutines have called ForceReconnect and should be
747+
// waiting on the same singleflight operation.
748+
// Due to singleflight, only one reconnect should have been attempted.
705749
require.Equal(t, 2, *callCount, "should have exactly 2 calls: initial connect + 1 reconnect due to singleflight")
706750

707751
// Release the blocking reconnect function
@@ -710,9 +754,9 @@ func TestBackedPipe_DuplicateReconnectionPrevention(t *testing.T) {
710754
// Wait for all ForceReconnect calls to complete
711755
wg.Wait()
712756

713-
// With singleflight, all concurrent calls should succeed (they share the same result)
757+
// All calls should succeed (they share the same result from singleflight)
714758
for i, err := range errors {
715-
require.NoError(t, err, "ForceReconnect %d should succeed", i)
759+
require.NoError(t, err, "ForceReconnect %d should succeed", i, err)
716760
}
717761

718762
// Final verification: call count should still be exactly 2

agent/immortalstreams/backedpipe/backed_writer_test.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -744,15 +744,16 @@ func TestBackedWriter_ConcurrentWriteAndReconnect(t *testing.T) {
744744
// Wait for write to start
745745
testutil.RequireReceive(ctx, t, writeStarted)
746746

747-
// Give the write a moment to actually start and get blocked on the mutex
748-
time.Sleep(testutil.IntervalFast)
747+
// Use a small timeout to ensure the write goroutine has a chance to get blocked
748+
// on the mutex before we check if it's still blocked
749+
writeCheckTimer := time.NewTimer(testutil.IntervalFast)
750+
defer writeCheckTimer.Stop()
749751

750-
// Verify write is still blocked (reconnect is still in progress)
751752
select {
752753
case <-writeComplete:
753754
t.Fatal("Write should be blocked during reconnect")
754-
default:
755-
// Good, write is still blocked
755+
case <-writeCheckTimer.C:
756+
// Write is still blocked after a reasonable wait
756757
}
757758

758759
// Allow replay to complete, which will allow reconnect to finish
@@ -818,13 +819,24 @@ func TestBackedWriter_ConcurrentReconnectAndClose(t *testing.T) {
818819
testutil.RequireReceive(ctx, t, reconnectStarted)
819820

820821
// Start Close() in a separate goroutine since it will block until Reconnect() completes
822+
closeStarted := make(chan struct{}, 1)
821823
closeComplete := make(chan error, 1)
822824
go func() {
825+
closeStarted <- struct{}{} // Signal that Close() is starting
823826
closeComplete <- bw.Close()
824827
}()
825828

826-
// Give Close() a moment to start and block on the mutex
827-
time.Sleep(testutil.IntervalFast)
829+
// Wait for Close() to start, then give it a moment to attempt to acquire the mutex
830+
testutil.RequireReceive(ctx, t, closeStarted)
831+
closeCheckTimer := time.NewTimer(testutil.IntervalFast)
832+
defer closeCheckTimer.Stop()
833+
834+
select {
835+
case <-closeComplete:
836+
t.Fatal("Close should be blocked during reconnect")
837+
case <-closeCheckTimer.C:
838+
// Good, Close is still blocked after a reasonable wait
839+
}
828840

829841
// Allow replay to complete so reconnection can finish
830842
close(replayCanComplete)
@@ -891,8 +903,11 @@ func TestBackedWriter_MultipleWritesDuringReconnect(t *testing.T) {
891903
testutil.RequireReceive(ctx, t, writesStarted)
892904
}
893905

894-
// Give the writes a moment to actually begin execution
895-
time.Sleep(testutil.IntervalFast)
906+
// Use a timer to ensure all write goroutines have had a chance to start executing
907+
// and potentially get blocked on the mutex before we start the reconnection
908+
writesReadyTimer := time.NewTimer(testutil.IntervalFast)
909+
defer writesReadyTimer.Stop()
910+
<-writesReadyTimer.C
896911

897912
// Start reconnection with controlled replay
898913
replayStarted := make(chan struct{})

0 commit comments

Comments
 (0)