@@ -651,6 +651,204 @@ func TestBackedWriter_WriteBlocksAfterDisconnection(t *testing.T) {
651
651
require .Equal (t , []byte ("world" ), writer2 .buffer .Bytes ()) // Only "world" since we replayed from sequence 5
652
652
}
653
653
654
+ func TestBackedWriter_ConcurrentWriteAndClose (t * testing.T ) {
655
+ t .Parallel ()
656
+
657
+ errorChan := make (chan error , 1 )
658
+ bw := backedpipe .NewBackedWriter (backedpipe .DefaultBufferSize , errorChan )
659
+ writer := newMockWriter ()
660
+ bw .Reconnect (0 , writer )
661
+
662
+ // Start a write operation that will be interrupted by close
663
+ writeComplete := make (chan struct {})
664
+ var writeErr error
665
+ var n int
666
+
667
+ go func () {
668
+ defer close (writeComplete )
669
+ // Write some data that should succeed
670
+ n , writeErr = bw .Write ([]byte ("hello" ))
671
+ }()
672
+
673
+ // Give write a chance to start
674
+ time .Sleep (10 * time .Millisecond )
675
+
676
+ // Close the writer
677
+ closeErr := bw .Close ()
678
+ require .NoError (t , closeErr )
679
+
680
+ // Wait for write to complete
681
+ <- writeComplete
682
+
683
+ // Write should have either succeeded (if it completed before close)
684
+ // or failed with EOF (if close interrupted it)
685
+ if writeErr == nil {
686
+ require .Equal (t , 5 , n )
687
+ } else {
688
+ require .ErrorIs (t , writeErr , io .EOF )
689
+ }
690
+
691
+ // Subsequent writes should fail
692
+ n , err := bw .Write ([]byte ("world" ))
693
+ require .Equal (t , 0 , n )
694
+ require .ErrorIs (t , err , io .EOF )
695
+ }
696
+
697
+ func TestBackedWriter_ConcurrentWriteAndReconnect (t * testing.T ) {
698
+ t .Parallel ()
699
+
700
+ errorChan := make (chan error , 1 )
701
+ bw := backedpipe .NewBackedWriter (backedpipe .DefaultBufferSize , errorChan )
702
+
703
+ // Initial connection
704
+ writer1 := newMockWriter ()
705
+ err := bw .Reconnect (0 , writer1 )
706
+ require .NoError (t , err )
707
+
708
+ // Write some initial data
709
+ _ , err = bw .Write ([]byte ("initial" ))
710
+ require .NoError (t , err )
711
+
712
+ // Start a write operation that will be blocked by reconnect
713
+ writeComplete := make (chan struct {})
714
+ var writeErr error
715
+ var n int
716
+
717
+ go func () {
718
+ defer close (writeComplete )
719
+ // This write should be blocked during reconnect
720
+ n , writeErr = bw .Write ([]byte ("blocked" ))
721
+ }()
722
+
723
+ // Give write a chance to start
724
+ time .Sleep (10 * time .Millisecond )
725
+
726
+ // Start reconnection which will cause the write to wait
727
+ writer2 := & mockWriter {
728
+ writeFunc : func (p []byte ) (int , error ) {
729
+ // Simulate slow replay
730
+ time .Sleep (50 * time .Millisecond )
731
+ return len (p ), nil
732
+ },
733
+ }
734
+
735
+ reconnectErr := bw .Reconnect (0 , writer2 )
736
+ require .NoError (t , reconnectErr )
737
+
738
+ // Wait for write to complete
739
+ <- writeComplete
740
+
741
+ // Write should succeed after reconnection completes
742
+ require .NoError (t , writeErr )
743
+ require .Equal (t , 7 , n ) // "blocked" is 7 bytes
744
+
745
+ // Verify the writer is connected
746
+ require .True (t , bw .Connected ())
747
+ }
748
+
749
+ func TestBackedWriter_ConcurrentReconnectAndClose (t * testing.T ) {
750
+ t .Parallel ()
751
+
752
+ errorChan := make (chan error , 1 )
753
+ bw := backedpipe .NewBackedWriter (backedpipe .DefaultBufferSize , errorChan )
754
+
755
+ // Initial connection and write some data
756
+ writer1 := newMockWriter ()
757
+ err := bw .Reconnect (0 , writer1 )
758
+ require .NoError (t , err )
759
+ _ , err = bw .Write ([]byte ("test data" ))
760
+ require .NoError (t , err )
761
+
762
+ // Start reconnection with slow replay
763
+ reconnectComplete := make (chan struct {})
764
+ var reconnectErr error
765
+
766
+ go func () {
767
+ defer close (reconnectComplete )
768
+ writer2 := & mockWriter {
769
+ writeFunc : func (p []byte ) (int , error ) {
770
+ // Simulate slow replay - this should be interrupted by close
771
+ time .Sleep (100 * time .Millisecond )
772
+ return len (p ), nil
773
+ },
774
+ }
775
+ reconnectErr = bw .Reconnect (0 , writer2 )
776
+ }()
777
+
778
+ // Give reconnect a chance to start
779
+ time .Sleep (10 * time .Millisecond )
780
+
781
+ // Close while reconnection is in progress
782
+ closeErr := bw .Close ()
783
+ require .NoError (t , closeErr )
784
+
785
+ // Wait for reconnect to complete
786
+ <- reconnectComplete
787
+
788
+ // With mutex held during replay, Close() waits for Reconnect() to finish.
789
+ // So Reconnect() should succeed, then Close() runs and closes the writer.
790
+ require .NoError (t , reconnectErr )
791
+
792
+ // Verify writer is closed (Close() ran after Reconnect() completed)
793
+ require .False (t , bw .Connected ())
794
+ }
795
+
796
+ func TestBackedWriter_MultipleWritesDuringReconnect (t * testing.T ) {
797
+ t .Parallel ()
798
+
799
+ errorChan := make (chan error , 1 )
800
+ bw := backedpipe .NewBackedWriter (backedpipe .DefaultBufferSize , errorChan )
801
+
802
+ // Initial connection
803
+ writer1 := newMockWriter ()
804
+ err := bw .Reconnect (0 , writer1 )
805
+ require .NoError (t , err )
806
+
807
+ // Write some initial data
808
+ _ , err = bw .Write ([]byte ("initial" ))
809
+ require .NoError (t , err )
810
+
811
+ // Start multiple write operations
812
+ numWriters := 5
813
+ var wg sync.WaitGroup
814
+ writeResults := make ([]error , numWriters )
815
+
816
+ for i := 0 ; i < numWriters ; i ++ {
817
+ wg .Add (1 )
818
+ go func (id int ) {
819
+ defer wg .Done ()
820
+ data := []byte {byte ('A' + id )}
821
+ _ , writeResults [id ] = bw .Write (data )
822
+ }(i )
823
+ }
824
+
825
+ // Give writes a chance to start
826
+ time .Sleep (10 * time .Millisecond )
827
+
828
+ // Start reconnection with slow replay
829
+ writer2 := & mockWriter {
830
+ writeFunc : func (p []byte ) (int , error ) {
831
+ // Simulate slow replay
832
+ time .Sleep (50 * time .Millisecond )
833
+ return len (p ), nil
834
+ },
835
+ }
836
+
837
+ reconnectErr := bw .Reconnect (0 , writer2 )
838
+ require .NoError (t , reconnectErr )
839
+
840
+ // Wait for all writes to complete
841
+ wg .Wait ()
842
+
843
+ // All writes should succeed
844
+ for i , err := range writeResults {
845
+ require .NoError (t , err , "Write %d should succeed" , i )
846
+ }
847
+
848
+ // Verify the writer is connected
849
+ require .True (t , bw .Connected ())
850
+ }
851
+
654
852
func BenchmarkBackedWriter_Write (b * testing.B ) {
655
853
errorChan := make (chan error , 1 )
656
854
bw := backedpipe .NewBackedWriter (backedpipe .DefaultBufferSize , errorChan ) // 64KB buffer
0 commit comments