Skip to content

Commit 946ff90

Browse files
committed
added holding the mutex during reconnection and test cases to verify being closed/reconnected race conditions
1 parent 7390d9d commit 946ff90

File tree

2 files changed

+200
-4
lines changed

2 files changed

+200
-4
lines changed

agent/immortalstreams/backedpipe/backed_writer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,10 @@ func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) err
172172
// Clear the current writer first in case replay fails
173173
bw.writer = nil
174174

175-
// Replay data if needed. We keep the writer as nil during replay to ensure
176-
// no concurrent writes can happen, then set it only after successful replay.
175+
// Replay data if needed. We keep the mutex held during replay to ensure
176+
// no concurrent operations can interfere with the reconnection process.
177177
if len(replayData) > 0 {
178-
bw.mu.Unlock()
179178
n, err := newWriter.Write(replayData)
180-
bw.mu.Lock()
181179

182180
if err != nil {
183181
// Reconnect failed, writer remains nil

agent/immortalstreams/backedpipe/backed_writer_test.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,204 @@ func TestBackedWriter_WriteBlocksAfterDisconnection(t *testing.T) {
651651
require.Equal(t, []byte("world"), writer2.buffer.Bytes()) // Only "world" since we replayed from sequence 5
652652
}
653653

654+
func TestBackedWriter_ConcurrentWriteAndClose(t *testing.T) {
655+
t.Parallel()
656+
657+
errorChan := make(chan error, 1)
658+
bw := backedpipe.NewBackedWriter(backedpipe.DefaultBufferSize, errorChan)
659+
writer := newMockWriter()
660+
bw.Reconnect(0, writer)
661+
662+
// Start a write operation that will be interrupted by close
663+
writeComplete := make(chan struct{})
664+
var writeErr error
665+
var n int
666+
667+
go func() {
668+
defer close(writeComplete)
669+
// Write some data that should succeed
670+
n, writeErr = bw.Write([]byte("hello"))
671+
}()
672+
673+
// Give write a chance to start
674+
time.Sleep(10 * time.Millisecond)
675+
676+
// Close the writer
677+
closeErr := bw.Close()
678+
require.NoError(t, closeErr)
679+
680+
// Wait for write to complete
681+
<-writeComplete
682+
683+
// Write should have either succeeded (if it completed before close)
684+
// or failed with EOF (if close interrupted it)
685+
if writeErr == nil {
686+
require.Equal(t, 5, n)
687+
} else {
688+
require.ErrorIs(t, writeErr, io.EOF)
689+
}
690+
691+
// Subsequent writes should fail
692+
n, err := bw.Write([]byte("world"))
693+
require.Equal(t, 0, n)
694+
require.ErrorIs(t, err, io.EOF)
695+
}
696+
697+
func TestBackedWriter_ConcurrentWriteAndReconnect(t *testing.T) {
698+
t.Parallel()
699+
700+
errorChan := make(chan error, 1)
701+
bw := backedpipe.NewBackedWriter(backedpipe.DefaultBufferSize, errorChan)
702+
703+
// Initial connection
704+
writer1 := newMockWriter()
705+
err := bw.Reconnect(0, writer1)
706+
require.NoError(t, err)
707+
708+
// Write some initial data
709+
_, err = bw.Write([]byte("initial"))
710+
require.NoError(t, err)
711+
712+
// Start a write operation that will be blocked by reconnect
713+
writeComplete := make(chan struct{})
714+
var writeErr error
715+
var n int
716+
717+
go func() {
718+
defer close(writeComplete)
719+
// This write should be blocked during reconnect
720+
n, writeErr = bw.Write([]byte("blocked"))
721+
}()
722+
723+
// Give write a chance to start
724+
time.Sleep(10 * time.Millisecond)
725+
726+
// Start reconnection which will cause the write to wait
727+
writer2 := &mockWriter{
728+
writeFunc: func(p []byte) (int, error) {
729+
// Simulate slow replay
730+
time.Sleep(50 * time.Millisecond)
731+
return len(p), nil
732+
},
733+
}
734+
735+
reconnectErr := bw.Reconnect(0, writer2)
736+
require.NoError(t, reconnectErr)
737+
738+
// Wait for write to complete
739+
<-writeComplete
740+
741+
// Write should succeed after reconnection completes
742+
require.NoError(t, writeErr)
743+
require.Equal(t, 7, n) // "blocked" is 7 bytes
744+
745+
// Verify the writer is connected
746+
require.True(t, bw.Connected())
747+
}
748+
749+
func TestBackedWriter_ConcurrentReconnectAndClose(t *testing.T) {
750+
t.Parallel()
751+
752+
errorChan := make(chan error, 1)
753+
bw := backedpipe.NewBackedWriter(backedpipe.DefaultBufferSize, errorChan)
754+
755+
// Initial connection and write some data
756+
writer1 := newMockWriter()
757+
err := bw.Reconnect(0, writer1)
758+
require.NoError(t, err)
759+
_, err = bw.Write([]byte("test data"))
760+
require.NoError(t, err)
761+
762+
// Start reconnection with slow replay
763+
reconnectComplete := make(chan struct{})
764+
var reconnectErr error
765+
766+
go func() {
767+
defer close(reconnectComplete)
768+
writer2 := &mockWriter{
769+
writeFunc: func(p []byte) (int, error) {
770+
// Simulate slow replay - this should be interrupted by close
771+
time.Sleep(100 * time.Millisecond)
772+
return len(p), nil
773+
},
774+
}
775+
reconnectErr = bw.Reconnect(0, writer2)
776+
}()
777+
778+
// Give reconnect a chance to start
779+
time.Sleep(10 * time.Millisecond)
780+
781+
// Close while reconnection is in progress
782+
closeErr := bw.Close()
783+
require.NoError(t, closeErr)
784+
785+
// Wait for reconnect to complete
786+
<-reconnectComplete
787+
788+
// With mutex held during replay, Close() waits for Reconnect() to finish.
789+
// So Reconnect() should succeed, then Close() runs and closes the writer.
790+
require.NoError(t, reconnectErr)
791+
792+
// Verify writer is closed (Close() ran after Reconnect() completed)
793+
require.False(t, bw.Connected())
794+
}
795+
796+
func TestBackedWriter_MultipleWritesDuringReconnect(t *testing.T) {
797+
t.Parallel()
798+
799+
errorChan := make(chan error, 1)
800+
bw := backedpipe.NewBackedWriter(backedpipe.DefaultBufferSize, errorChan)
801+
802+
// Initial connection
803+
writer1 := newMockWriter()
804+
err := bw.Reconnect(0, writer1)
805+
require.NoError(t, err)
806+
807+
// Write some initial data
808+
_, err = bw.Write([]byte("initial"))
809+
require.NoError(t, err)
810+
811+
// Start multiple write operations
812+
numWriters := 5
813+
var wg sync.WaitGroup
814+
writeResults := make([]error, numWriters)
815+
816+
for i := 0; i < numWriters; i++ {
817+
wg.Add(1)
818+
go func(id int) {
819+
defer wg.Done()
820+
data := []byte{byte('A' + id)}
821+
_, writeResults[id] = bw.Write(data)
822+
}(i)
823+
}
824+
825+
// Give writes a chance to start
826+
time.Sleep(10 * time.Millisecond)
827+
828+
// Start reconnection with slow replay
829+
writer2 := &mockWriter{
830+
writeFunc: func(p []byte) (int, error) {
831+
// Simulate slow replay
832+
time.Sleep(50 * time.Millisecond)
833+
return len(p), nil
834+
},
835+
}
836+
837+
reconnectErr := bw.Reconnect(0, writer2)
838+
require.NoError(t, reconnectErr)
839+
840+
// Wait for all writes to complete
841+
wg.Wait()
842+
843+
// All writes should succeed
844+
for i, err := range writeResults {
845+
require.NoError(t, err, "Write %d should succeed", i)
846+
}
847+
848+
// Verify the writer is connected
849+
require.True(t, bw.Connected())
850+
}
851+
654852
func BenchmarkBackedWriter_Write(b *testing.B) {
655853
errorChan := make(chan error, 1)
656854
bw := backedpipe.NewBackedWriter(backedpipe.DefaultBufferSize, errorChan) // 64KB buffer

0 commit comments

Comments
 (0)