Skip to content

Commit 7390d9d

Browse files
committed
fixed eviction tests in writer and moved from assert to require in reader tests
1 parent b9875a9 commit 7390d9d

File tree

2 files changed

+68
-53
lines changed

2 files changed

+68
-53
lines changed

agent/immortalstreams/backedpipe/backed_reader_test.go

Lines changed: 51 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"testing"
88
"time"
99

10-
"github.com/stretchr/testify/assert"
1110
"github.com/stretchr/testify/require"
1211
"golang.org/x/xerrors"
1312

@@ -59,9 +58,9 @@ func TestBackedReader_NewBackedReader(t *testing.T) {
5958
t.Parallel()
6059

6160
br := backedpipe.NewBackedReader()
62-
assert.NotNil(t, br)
63-
assert.Equal(t, uint64(0), br.SequenceNum())
64-
assert.False(t, br.Connected())
61+
require.NotNil(t, br)
62+
require.Equal(t, uint64(0), br.SequenceNum())
63+
require.False(t, br.Connected())
6564
}
6665

6766
func TestBackedReader_BasicReadOperation(t *testing.T) {
@@ -79,7 +78,7 @@ func TestBackedReader_BasicReadOperation(t *testing.T) {
7978

8079
// Get sequence number from reader
8180
seq := testutil.RequireReceive(ctx, t, seqNum)
82-
assert.Equal(t, uint64(0), seq)
81+
require.Equal(t, uint64(0), seq)
8382

8483
// Send new reader
8584
testutil.RequireSend(ctx, t, newR, io.Reader(reader))
@@ -88,16 +87,16 @@ func TestBackedReader_BasicReadOperation(t *testing.T) {
8887
buf := make([]byte, 5)
8988
n, err := br.Read(buf)
9089
require.NoError(t, err)
91-
assert.Equal(t, 5, n)
92-
assert.Equal(t, "hello", string(buf))
93-
assert.Equal(t, uint64(5), br.SequenceNum())
90+
require.Equal(t, 5, n)
91+
require.Equal(t, "hello", string(buf))
92+
require.Equal(t, uint64(5), br.SequenceNum())
9493

9594
// Read more data
9695
n, err = br.Read(buf)
9796
require.NoError(t, err)
98-
assert.Equal(t, 5, n)
99-
assert.Equal(t, " worl", string(buf))
100-
assert.Equal(t, uint64(10), br.SequenceNum())
97+
require.Equal(t, 5, n)
98+
require.Equal(t, " worl", string(buf))
99+
require.Equal(t, uint64(10), br.SequenceNum())
101100
}
102101

103102
func TestBackedReader_ReadBlocksWhenDisconnected(t *testing.T) {
@@ -143,8 +142,8 @@ func TestBackedReader_ReadBlocksWhenDisconnected(t *testing.T) {
143142

144143
// Wait for read to complete
145144
testutil.TryReceive(ctx, t, readDone)
146-
assert.NoError(t, readErr)
147-
assert.Equal(t, "test", string(readBuf))
145+
require.NoError(t, readErr)
146+
require.Equal(t, "test", string(readBuf))
148147
}
149148

150149
func TestBackedReader_ReconnectionAfterFailure(t *testing.T) {
@@ -168,8 +167,8 @@ func TestBackedReader_ReconnectionAfterFailure(t *testing.T) {
168167
buf := make([]byte, 5)
169168
n, err := br.Read(buf)
170169
require.NoError(t, err)
171-
assert.Equal(t, "first", string(buf[:n]))
172-
assert.Equal(t, uint64(5), br.SequenceNum())
170+
require.Equal(t, "first", string(buf[:n]))
171+
require.Equal(t, uint64(5), br.SequenceNum())
173172

174173
// Set up error callback to verify error notification
175174
errorReceived := make(chan error, 1)
@@ -189,8 +188,8 @@ func TestBackedReader_ReconnectionAfterFailure(t *testing.T) {
189188

190189
// Wait for the error to be reported via callback
191190
receivedErr := testutil.RequireReceive(ctx, t, errorReceived)
192-
assert.Error(t, receivedErr)
193-
assert.Contains(t, receivedErr.Error(), "connection lost")
191+
require.Error(t, receivedErr)
192+
require.Contains(t, receivedErr.Error(), "connection lost")
194193

195194
// Verify read is still blocked
196195
select {
@@ -201,7 +200,7 @@ func TestBackedReader_ReconnectionAfterFailure(t *testing.T) {
201200
}
202201

203202
// Verify disconnection
204-
assert.False(t, br.Connected())
203+
require.False(t, br.Connected())
205204

206205
// Reconnect with new reader
207206
reader2 := newMockReader("second")
@@ -212,12 +211,12 @@ func TestBackedReader_ReconnectionAfterFailure(t *testing.T) {
212211

213212
// Get sequence number and send new reader
214213
seq := testutil.RequireReceive(ctx, t, seqNum2)
215-
assert.Equal(t, uint64(5), seq) // Should return current sequence number
214+
require.Equal(t, uint64(5), seq) // Should return current sequence number
216215
testutil.RequireSend(ctx, t, newR2, io.Reader(reader2))
217216

218217
// Wait for read to unblock and succeed with new data
219218
readErr := testutil.RequireReceive(ctx, t, readDone)
220-
assert.NoError(t, readErr) // Should succeed with new reader
219+
require.NoError(t, readErr) // Should succeed with new reader
221220
}
222221

223222
func TestBackedReader_Close(t *testing.T) {
@@ -241,20 +240,20 @@ func TestBackedReader_Close(t *testing.T) {
241240
buf := make([]byte, 10)
242241
n, err := br.Read(buf)
243242
require.NoError(t, err)
244-
assert.Equal(t, 4, n) // "test" is 4 bytes
243+
require.Equal(t, 4, n) // "test" is 4 bytes
245244

246245
// Close the reader before EOF triggers reconnection
247246
err = br.Close()
248247
require.NoError(t, err)
249248

250249
// After close, reads should return EOF
251250
n, err = br.Read(buf)
252-
assert.Equal(t, 0, n)
253-
assert.Equal(t, io.EOF, err)
251+
require.Equal(t, 0, n)
252+
require.Equal(t, io.EOF, err)
254253

255254
// Subsequent reads should return EOF
256255
_, err = br.Read(buf)
257-
assert.Equal(t, io.EOF, err)
256+
require.Equal(t, io.EOF, err)
258257
}
259258

260259
func TestBackedReader_CloseIdempotent(t *testing.T) {
@@ -263,11 +262,11 @@ func TestBackedReader_CloseIdempotent(t *testing.T) {
263262
br := backedpipe.NewBackedReader()
264263

265264
err := br.Close()
266-
assert.NoError(t, err)
265+
require.NoError(t, err)
267266

268267
// Second close should be no-op
269268
err = br.Close()
270-
assert.NoError(t, err)
269+
require.NoError(t, err)
271270
}
272271

273272
func TestBackedReader_ReconnectAfterClose(t *testing.T) {
@@ -286,7 +285,7 @@ func TestBackedReader_ReconnectAfterClose(t *testing.T) {
286285

287286
// Should get 0 sequence number for closed reader
288287
seq := testutil.TryReceive(ctx, t, seqNum)
289-
assert.Equal(t, uint64(0), seq)
288+
require.Equal(t, uint64(0), seq)
290289
}
291290

292291
// Helper function to reconnect a reader using channels
@@ -315,18 +314,18 @@ func TestBackedReader_SequenceNumberTracking(t *testing.T) {
315314

316315
n, err := br.Read(buf)
317316
require.NoError(t, err)
318-
assert.Equal(t, 3, n)
319-
assert.Equal(t, uint64(3), br.SequenceNum())
317+
require.Equal(t, 3, n)
318+
require.Equal(t, uint64(3), br.SequenceNum())
320319

321320
n, err = br.Read(buf)
322321
require.NoError(t, err)
323-
assert.Equal(t, 3, n)
324-
assert.Equal(t, uint64(6), br.SequenceNum())
322+
require.Equal(t, 3, n)
323+
require.Equal(t, uint64(6), br.SequenceNum())
325324

326325
n, err = br.Read(buf)
327326
require.NoError(t, err)
328-
assert.Equal(t, 3, n)
329-
assert.Equal(t, uint64(9), br.SequenceNum())
327+
require.Equal(t, 3, n)
328+
require.Equal(t, uint64(9), br.SequenceNum())
330329
}
331330

332331
func TestBackedReader_EOFHandling(t *testing.T) {
@@ -348,8 +347,8 @@ func TestBackedReader_EOFHandling(t *testing.T) {
348347
buf := make([]byte, 10)
349348
n, err := br.Read(buf)
350349
require.NoError(t, err)
351-
assert.Equal(t, 4, n)
352-
assert.Equal(t, "test", string(buf[:n]))
350+
require.Equal(t, 4, n)
351+
require.Equal(t, "test", string(buf[:n]))
353352

354353
// Next read should encounter EOF, which triggers disconnection
355354
// The read should block waiting for reconnection
@@ -364,10 +363,10 @@ func TestBackedReader_EOFHandling(t *testing.T) {
364363

365364
// Wait for EOF to be reported via error callback
366365
receivedErr := testutil.RequireReceive(ctx, t, errorReceived)
367-
assert.Equal(t, io.EOF, receivedErr)
366+
require.Equal(t, io.EOF, receivedErr)
368367

369368
// Reader should be disconnected after EOF
370-
assert.False(t, br.Connected())
369+
require.False(t, br.Connected())
371370

372371
// Read should still be blocked
373372
select {
@@ -384,8 +383,8 @@ func TestBackedReader_EOFHandling(t *testing.T) {
384383
// Wait for the blocked read to complete with new data
385384
testutil.TryReceive(ctx, t, readDone)
386385
require.NoError(t, readErr)
387-
assert.Equal(t, 4, readN)
388-
assert.Equal(t, "more", string(buf[:readN]))
386+
require.Equal(t, 4, readN)
387+
require.Equal(t, "more", string(buf[:readN]))
389388
}
390389

391390
func BenchmarkBackedReader_Read(b *testing.B) {
@@ -438,11 +437,11 @@ func TestBackedReader_PartialReads(t *testing.T) {
438437
for i := 0; i < 5; i++ {
439438
n, err := br.Read(buf)
440439
require.NoError(t, err)
441-
assert.Equal(t, 1, n)
442-
assert.Equal(t, byte('A'), buf[0])
440+
require.Equal(t, 1, n)
441+
require.Equal(t, byte('A'), buf[0])
443442
}
444443

445-
assert.Equal(t, uint64(5), br.SequenceNum())
444+
require.Equal(t, uint64(5), br.SequenceNum())
446445
}
447446

448447
func TestBackedReader_CloseWhileBlockedOnUnderlyingReader(t *testing.T) {
@@ -525,14 +524,14 @@ func TestBackedReader_CloseWhileBlockedOnUnderlyingReader(t *testing.T) {
525524

526525
// The read should return EOF because Close() was called while it was blocked,
527526
// even though the underlying reader returned an error
528-
assert.Equal(t, 0, readN)
529-
assert.Equal(t, io.EOF, readErr)
527+
require.Equal(t, 0, readN)
528+
require.Equal(t, io.EOF, readErr)
530529

531530
// Subsequent reads should return EOF since the reader is now closed
532531
buf := make([]byte, 10)
533532
n, err := br.Read(buf)
534-
assert.Equal(t, 0, n)
535-
assert.Equal(t, io.EOF, err)
533+
require.Equal(t, 0, n)
534+
require.Equal(t, io.EOF, err)
536535
}
537536

538537
func TestBackedReader_CloseWhileBlockedWaitingForReconnect(t *testing.T) {
@@ -556,7 +555,7 @@ func TestBackedReader_CloseWhileBlockedWaitingForReconnect(t *testing.T) {
556555
buf := make([]byte, 10)
557556
n, err := br.Read(buf)
558557
require.NoError(t, err)
559-
assert.Equal(t, "initial", string(buf[:n]))
558+
require.Equal(t, "initial", string(buf[:n]))
560559

561560
// Set up error callback to track connection failure
562561
errorReceived := make(chan error, 1)
@@ -579,8 +578,8 @@ func TestBackedReader_CloseWhileBlockedWaitingForReconnect(t *testing.T) {
579578

580579
// Wait for the error to be reported (indicating disconnection)
581580
receivedErr := testutil.RequireReceive(ctx, t, errorReceived)
582-
assert.Error(t, receivedErr)
583-
assert.Contains(t, receivedErr.Error(), "connection lost")
581+
require.Error(t, receivedErr)
582+
require.Contains(t, receivedErr.Error(), "connection lost")
584583

585584
// Verify read is blocked waiting for reconnection
586585
select {
@@ -591,14 +590,14 @@ func TestBackedReader_CloseWhileBlockedWaitingForReconnect(t *testing.T) {
591590
}
592591

593592
// Verify reader is disconnected
594-
assert.False(t, br.Connected())
593+
require.False(t, br.Connected())
595594

596595
// Close the BackedReader while read is blocked waiting for reconnection
597596
err = br.Close()
598597
require.NoError(t, err)
599598

600599
// The read should unblock and return EOF
601600
testutil.TryReceive(ctx, t, readDone)
602-
assert.Equal(t, 0, readN)
603-
assert.Equal(t, io.EOF, readErr)
601+
require.Equal(t, 0, readN)
602+
require.Equal(t, io.EOF, readErr)
604603
}

agent/immortalstreams/backedpipe/backed_writer_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,23 @@ func TestBackedWriter_BufferEviction(t *testing.T) {
359359
require.NoError(t, err)
360360
require.Equal(t, 2, n)
361361

362-
// Buffer should contain "cdefg" (latest data)
362+
// Verify that the buffer contains only the latest data after eviction
363+
// Total sequence number should be 7 (5 + 2)
364+
require.Equal(t, uint64(7), bw.SequenceNum())
365+
366+
// Try to reconnect from the beginning - this should fail because
367+
// the early data was evicted from the buffer
368+
writer2 := newMockWriter()
369+
err = bw.Reconnect(0, writer2)
370+
require.Error(t, err)
371+
require.ErrorIs(t, err, backedpipe.ErrReplayDataUnavailable)
372+
373+
// However, reconnecting from a sequence that's still in the buffer should work
374+
// The buffer should contain the last 5 bytes: "cdefg"
375+
writer3 := newMockWriter()
376+
err = bw.Reconnect(2, writer3) // From sequence 2, should replay "cdefg"
377+
require.NoError(t, err)
378+
require.Equal(t, []byte("cdefg"), writer3.buffer.Bytes())
363379
}
364380

365381
func TestBackedWriter_Close(t *testing.T) {

0 commit comments

Comments
 (0)