@@ -164,21 +164,30 @@ type blockingReconnector struct {
164
164
callCount int
165
165
blockChan <- chan struct {}
166
166
blockedChan chan struct {}
167
+ mu sync.Mutex
168
+ signalOnce sync.Once // Ensure we only signal once for the first actual reconnect
167
169
}
168
170
169
171
func (b * blockingReconnector ) Reconnect (ctx context.Context , writerSeqNum uint64 ) (io.ReadWriteCloser , uint64 , error ) {
172
+ b .mu .Lock ()
170
173
b .callCount ++
174
+ currentCall := b .callCount
175
+ b .mu .Unlock ()
171
176
172
- if b . callCount == 1 {
177
+ if currentCall == 1 {
173
178
// Initial connect
174
179
return b .conn1 , 0 , nil
175
180
}
176
181
177
- // Signal that we're about to block
178
- select {
179
- case b .blockedChan <- struct {}{}:
180
- default :
181
- }
182
+ // Signal that we're about to block, but only once for the first reconnect attempt
183
+ // This ensures we properly test singleflight deduplication
184
+ b .signalOnce .Do (func () {
185
+ select {
186
+ case b .blockedChan <- struct {}{}:
187
+ default :
188
+ // If channel is full, don't block
189
+ }
190
+ })
182
191
183
192
// For subsequent calls, block until channel is closed
184
193
select {
@@ -686,22 +695,57 @@ func TestBackedPipe_DuplicateReconnectionPrevention(t *testing.T) {
686
695
require .NoError (t , err )
687
696
require .Equal (t , 1 , * callCount , "should have exactly 1 call after initial connect" )
688
697
689
- // Start multiple concurrent ForceReconnect calls while the reconnect function is blocked
698
+ // We'll use channels to coordinate the test execution:
699
+ // 1. Start all goroutines but have them wait
700
+ // 2. Release the first one and wait for it to block
701
+ // 3. Release the others while the first is still blocked
702
+
703
+ const numConcurrent = 3
704
+ startSignals := make ([]chan struct {}, numConcurrent )
705
+ startedSignals := make ([]chan struct {}, numConcurrent )
706
+ for i := range startSignals {
707
+ startSignals [i ] = make (chan struct {})
708
+ startedSignals [i ] = make (chan struct {})
709
+ }
710
+
711
+ errors := make ([]error , numConcurrent )
690
712
var wg sync.WaitGroup
691
- errors := make ([]error , 3 )
692
713
693
- for i := 0 ; i < 3 ; i ++ {
714
+ // Start all goroutines
715
+ for i := 0 ; i < numConcurrent ; i ++ {
694
716
wg .Add (1 )
695
717
go func (idx int ) {
696
718
defer wg .Done ()
719
+ // Wait for the signal to start
720
+ <- startSignals [idx ]
721
+ // Signal that we're about to call ForceReconnect
722
+ close (startedSignals [idx ])
697
723
errors [idx ] = bp .ForceReconnect ()
698
724
}(i )
699
725
}
700
726
701
- // Wait for the reconnect function to signal that it's blocking
727
+ // Start the first ForceReconnect and wait for it to block
728
+ close (startSignals [0 ])
729
+ <- startedSignals [0 ]
730
+
731
+ // Wait for the first reconnect to actually start and block
702
732
testutil .RequireReceive (testCtx , t , blockedChan )
703
733
704
- // Verify that exactly one more reconnect call was made (singleflight deduplication)
734
+ // Now start all the other ForceReconnect calls
735
+ // They should all join the same singleflight operation
736
+ for i := 1 ; i < numConcurrent ; i ++ {
737
+ close (startSignals [i ])
738
+ }
739
+
740
+ // Wait for all additional goroutines to have started their calls
741
+ for i := 1 ; i < numConcurrent ; i ++ {
742
+ <- startedSignals [i ]
743
+ }
744
+
745
+ // At this point, one reconnect has started and is blocked,
746
+ // and all other goroutines have called ForceReconnect and should be
747
+ // waiting on the same singleflight operation.
748
+ // Due to singleflight, only one reconnect should have been attempted.
705
749
require .Equal (t , 2 , * callCount , "should have exactly 2 calls: initial connect + 1 reconnect due to singleflight" )
706
750
707
751
// Release the blocking reconnect function
@@ -710,9 +754,9 @@ func TestBackedPipe_DuplicateReconnectionPrevention(t *testing.T) {
710
754
// Wait for all ForceReconnect calls to complete
711
755
wg .Wait ()
712
756
713
- // With singleflight, all concurrent calls should succeed (they share the same result)
757
+ // All calls should succeed (they share the same result from singleflight )
714
758
for i , err := range errors {
715
- require .NoError (t , err , "ForceReconnect %d should succeed" , i )
759
+ require .NoError (t , err , "ForceReconnect %d should succeed" , i , err )
716
760
}
717
761
718
762
// Final verification: call count should still be exactly 2
0 commit comments