Skip to content

Commit b9875a9

Browse files
committed
added sentinel errors
1 parent 6e13d4a commit b9875a9

File tree

5 files changed

+38
-23
lines changed

5 files changed

+38
-23
lines changed

agent/immortalstreams/backedpipe/backed_pipe.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ import (
99
"golang.org/x/xerrors"
1010
)
1111

12+
var (
13+
ErrPipeClosed = xerrors.New("pipe is closed")
14+
ErrPipeAlreadyConnected = xerrors.New("pipe is already connected")
15+
ErrReconnectionInProgress = xerrors.New("reconnection already in progress")
16+
ErrReconnectFailed = xerrors.New("reconnect failed")
17+
ErrInvalidSequenceNumber = xerrors.New("remote sequence number exceeds local sequence")
18+
ErrReconnectWriterFailed = xerrors.New("reconnect writer failed")
19+
)
20+
1221
const (
1322
// Default buffer capacity used by the writer - 64MB
1423
DefaultBufferSize = 64 * 1024 * 1024
@@ -90,11 +99,11 @@ func (bp *BackedPipe) Connect(_ context.Context) error { // external ctx ignored
9099
defer bp.mu.Unlock()
91100

92101
if bp.closed {
93-
return xerrors.New("pipe is closed")
102+
return ErrPipeClosed
94103
}
95104

96105
if bp.connected {
97-
return xerrors.New("pipe is already connected")
106+
return ErrPipeAlreadyConnected
98107
}
99108

100109
// Use internal context for the actual reconnect operation to ensure
@@ -190,7 +199,7 @@ func (bp *BackedPipe) signalConnectionChange() {
190199
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
191200
func (bp *BackedPipe) reconnectLocked() error {
192201
if bp.reconnecting {
193-
return xerrors.New("reconnection already in progress")
202+
return ErrReconnectionInProgress
194203
}
195204

196205
bp.reconnecting = true
@@ -216,14 +225,13 @@ func (bp *BackedPipe) reconnectLocked() error {
216225
bp.mu.Lock()
217226

218227
if err != nil {
219-
return xerrors.Errorf("reconnect failed: %w", err)
228+
return ErrReconnectFailed
220229
}
221230

222231
// Validate sequence numbers
223232
if readerSeqNum > writerSeqNum {
224233
_ = conn.Close()
225-
return xerrors.Errorf("remote sequence number %d exceeds local sequence %d, cannot replay",
226-
readerSeqNum, writerSeqNum)
234+
return ErrInvalidSequenceNumber
227235
}
228236

229237
// Reconnect reader and writer
@@ -239,7 +247,7 @@ func (bp *BackedPipe) reconnectLocked() error {
239247
err = bp.writer.Reconnect(readerSeqNum, conn)
240248
if err != nil {
241249
_ = conn.Close()
242-
return xerrors.Errorf("reconnect writer: %w", err)
250+
return ErrReconnectWriterFailed
243251
}
244252

245253
// Success - update state

agent/immortalstreams/backedpipe/backed_pipe_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func TestBackedPipe_ConnectAlreadyConnected(t *testing.T) {
189189
// Second connect should fail
190190
err = bp.Connect(ctx)
191191
require.Error(t, err)
192-
require.Contains(t, err.Error(), "already connected")
192+
require.ErrorIs(t, err, backedpipe.ErrPipeAlreadyConnected)
193193
}
194194

195195
func TestBackedPipe_ConnectAfterClose(t *testing.T) {
@@ -206,7 +206,7 @@ func TestBackedPipe_ConnectAfterClose(t *testing.T) {
206206

207207
err = bp.Connect(ctx)
208208
require.Error(t, err)
209-
require.Contains(t, err.Error(), "closed")
209+
require.ErrorIs(t, err, backedpipe.ErrPipeClosed)
210210
}
211211

212212
func TestBackedPipe_BasicReadWrite(t *testing.T) {
@@ -501,7 +501,7 @@ func TestBackedPipe_ReconnectFunctionFailure(t *testing.T) {
501501

502502
err := bp.Connect(ctx)
503503
require.Error(t, err)
504-
require.Contains(t, err.Error(), "reconnect failed")
504+
require.ErrorIs(t, err, backedpipe.ErrReconnectFailed)
505505
require.False(t, bp.Connected())
506506
}
507507

agent/immortalstreams/backedpipe/backed_writer.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ import (
77
"golang.org/x/xerrors"
88
)
99

10+
var (
11+
ErrWriterClosed = xerrors.New("cannot reconnect closed writer")
12+
ErrNilWriter = xerrors.New("new writer cannot be nil")
13+
ErrFutureSequence = xerrors.New("cannot replay from future sequence")
14+
ErrReplayDataUnavailable = xerrors.New("failed to read replay data")
15+
ErrReplayFailed = xerrors.New("replay failed")
16+
ErrPartialReplay = xerrors.New("partial replay")
17+
)
18+
1019
// BackedWriter wraps an unreliable io.Writer and makes it resilient to disconnections.
1120
// It maintains a ring buffer of recent writes for replay during reconnection.
1221
type BackedWriter struct {
@@ -129,16 +138,16 @@ func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) err
129138
defer bw.mu.Unlock()
130139

131140
if bw.closed {
132-
return xerrors.New("cannot reconnect closed writer")
141+
return ErrWriterClosed
133142
}
134143

135144
if newWriter == nil {
136-
return xerrors.New("new writer cannot be nil")
145+
return ErrNilWriter
137146
}
138147

139148
// Check if we can replay from the requested sequence number
140149
if replayFromSeq > bw.sequenceNum {
141-
return xerrors.Errorf("cannot replay from future sequence %d: current sequence is %d", replayFromSeq, bw.sequenceNum)
150+
return ErrFutureSequence
142151
}
143152

144153
// Calculate how many bytes we need to replay
@@ -156,7 +165,7 @@ func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) err
156165
//nolint:gosec // Safe conversion: replayBytes is calculated from uint64 subtraction
157166
replayData, err = bw.buffer.ReadLast(int(replayBytes))
158167
if err != nil {
159-
return xerrors.Errorf("failed to read replay data: %w", err)
168+
return ErrReplayDataUnavailable
160169
}
161170
}
162171

@@ -172,12 +181,12 @@ func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) err
172181

173182
if err != nil {
174183
// Reconnect failed, writer remains nil
175-
return xerrors.Errorf("replay failed: %w", err)
184+
return ErrReplayFailed
176185
}
177186

178187
if n != len(replayData) {
179188
// Reconnect failed, writer remains nil
180-
return xerrors.Errorf("partial replay: wrote %d of %d bytes", n, len(replayData))
189+
return ErrPartialReplay
181190
}
182191
}
183192

agent/immortalstreams/backedpipe/backed_writer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ func TestBackedWriter_ReplayFromFutureSequence(t *testing.T) {
313313
writer2 := newMockWriter()
314314
err = bw.Reconnect(10, writer2) // Future sequence
315315
require.Error(t, err)
316-
require.Contains(t, err.Error(), "future sequence")
316+
require.ErrorIs(t, err, backedpipe.ErrFutureSequence)
317317
}
318318

319319
func TestBackedWriter_ReplayDataLoss(t *testing.T) {
@@ -336,7 +336,7 @@ func TestBackedWriter_ReplayDataLoss(t *testing.T) {
336336
err = bw.Reconnect(0, writer2) // Try to replay from evicted data
337337
// With the new error handling, this should fail because we can't read all the data
338338
require.Error(t, err)
339-
require.Contains(t, err.Error(), "failed to read replay data")
339+
require.ErrorIs(t, err, backedpipe.ErrReplayDataUnavailable)
340340
}
341341

342342
func TestBackedWriter_BufferEviction(t *testing.T) {
@@ -381,7 +381,7 @@ func TestBackedWriter_Close(t *testing.T) {
381381
// Reconnect after close should fail
382382
err = bw.Reconnect(0, newMockWriter())
383383
require.Error(t, err)
384-
require.Contains(t, err.Error(), "closed")
384+
require.ErrorIs(t, err, backedpipe.ErrWriterClosed)
385385
}
386386

387387
func TestBackedWriter_CloseIdempotent(t *testing.T) {
@@ -454,7 +454,7 @@ func TestBackedWriter_ReconnectDuringReplay(t *testing.T) {
454454

455455
err = bw.Reconnect(0, writer2)
456456
require.Error(t, err)
457-
require.Contains(t, err.Error(), "replay failed")
457+
require.ErrorIs(t, err, backedpipe.ErrReplayFailed)
458458
require.False(t, bw.Connected())
459459
}
460460

agent/immortalstreams/backedpipe/ring_buffer.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package backedpipe
22

3-
import (
4-
"golang.org/x/xerrors"
5-
)
3+
import "golang.org/x/xerrors"
64

75
// ringBuffer implements an efficient circular buffer with a fixed-size allocation.
86
// This implementation is not thread-safe and relies on external synchronization.

0 commit comments

Comments
 (0)