Skip to content

Commit 4c98dec

Browse files
authored
chore: add backed reader, writer and pipe implementation (#19147)
Relates to: #18101 This PR introduces a new `backedpipe` package that provides reliable bidirectional byte streams over unreliable network connections. The implementation includes: - `BackedPipe`: Orchestrates a reader and writer to provide transparent reconnection and data replay - `BackedReader`: Handles reading with automatic reconnection, blocking reads when disconnected - `BackedWriter`: Maintains a ring buffer of recent writes for replay during reconnection - `RingBuffer`: Efficient circular buffer implementation for storing data The package enables resilient connections by tracking sequence numbers and replaying missed data after reconnection. It handles connection failures gracefully, automatically reconnecting and resuming data transfer from the appropriate point.
1 parent e53bc24 commit 4c98dec

File tree

8 files changed

+3737
-0
lines changed

8 files changed

+3737
-0
lines changed
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
package backedpipe
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
8+
"golang.org/x/sync/errgroup"
9+
"golang.org/x/sync/singleflight"
10+
"golang.org/x/xerrors"
11+
)
12+
13+
var (
14+
ErrPipeClosed = xerrors.New("pipe is closed")
15+
ErrPipeAlreadyConnected = xerrors.New("pipe is already connected")
16+
ErrReconnectionInProgress = xerrors.New("reconnection already in progress")
17+
ErrReconnectFailed = xerrors.New("reconnect failed")
18+
ErrInvalidSequenceNumber = xerrors.New("remote sequence number exceeds local sequence")
19+
ErrReconnectWriterFailed = xerrors.New("reconnect writer failed")
20+
)
21+
22+
// connectionState represents the current state of the BackedPipe connection.
23+
type connectionState int
24+
25+
const (
26+
// connected indicates the pipe is connected and operational.
27+
connected connectionState = iota
28+
// disconnected indicates the pipe is not connected but not closed.
29+
disconnected
30+
// reconnecting indicates a reconnection attempt is in progress.
31+
reconnecting
32+
// closed indicates the pipe is permanently closed.
33+
closed
34+
)
35+
36+
// ErrorEvent represents an error from a reader or writer with connection generation info.
37+
type ErrorEvent struct {
38+
Err error
39+
Component string // "reader" or "writer"
40+
Generation uint64 // connection generation when error occurred
41+
}
42+
43+
const (
44+
// Default buffer capacity used by the writer - 64MB
45+
DefaultBufferSize = 64 * 1024 * 1024
46+
)
47+
48+
// Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect.
49+
// Implementations should:
50+
// 1. Establish a new connection to the remote side
51+
// 2. Exchange sequence numbers with the remote side
52+
// 3. Return the new connection and the remote's reader sequence number
53+
//
54+
// The readerSeqNum parameter is the local reader's current sequence number
55+
// (total bytes successfully read from the remote). This must be sent to the
56+
// remote so it can replay its data to us starting from this number.
57+
//
58+
// The returned remoteReaderSeqNum should be the remote side's reader sequence
59+
// number (how many bytes of our outbound data it has successfully read). This
60+
// informs our writer where to resume (i.e., which bytes to replay to the remote).
61+
type Reconnector interface {
62+
Reconnect(ctx context.Context, readerSeqNum uint64) (conn io.ReadWriteCloser, remoteReaderSeqNum uint64, err error)
63+
}
64+
65+
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
66+
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection
67+
// and data replay capabilities.
68+
type BackedPipe struct {
69+
ctx context.Context
70+
cancel context.CancelFunc
71+
mu sync.RWMutex
72+
reader *BackedReader
73+
writer *BackedWriter
74+
reconnector Reconnector
75+
conn io.ReadWriteCloser
76+
77+
// State machine
78+
state connectionState
79+
connGen uint64 // Increments on each successful reconnection
80+
81+
// Unified error handling with generation filtering
82+
errChan chan ErrorEvent
83+
84+
// singleflight group to dedupe concurrent ForceReconnect calls
85+
sf singleflight.Group
86+
87+
// Track first error per generation to avoid duplicate reconnections
88+
lastErrorGen uint64
89+
}
90+
91+
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnector.
92+
// The pipe starts disconnected and must be connected using Connect().
93+
func NewBackedPipe(ctx context.Context, reconnector Reconnector) *BackedPipe {
94+
pipeCtx, cancel := context.WithCancel(ctx)
95+
96+
errChan := make(chan ErrorEvent, 1)
97+
98+
bp := &BackedPipe{
99+
ctx: pipeCtx,
100+
cancel: cancel,
101+
reconnector: reconnector,
102+
state: disconnected,
103+
connGen: 0, // Start with generation 0
104+
errChan: errChan,
105+
}
106+
107+
// Create reader and writer with typed error channel for generation-aware error reporting
108+
bp.reader = NewBackedReader(errChan)
109+
bp.writer = NewBackedWriter(DefaultBufferSize, errChan)
110+
111+
// Start error handler goroutine
112+
go bp.handleErrors()
113+
114+
return bp
115+
}
116+
117+
// Connect establishes the initial connection using the reconnect function.
118+
func (bp *BackedPipe) Connect() error {
119+
bp.mu.Lock()
120+
defer bp.mu.Unlock()
121+
122+
if bp.state == closed {
123+
return ErrPipeClosed
124+
}
125+
126+
if bp.state == connected {
127+
return ErrPipeAlreadyConnected
128+
}
129+
130+
// Use internal context for the actual reconnect operation to ensure
131+
// Close() reliably cancels any in-flight attempt.
132+
return bp.reconnectLocked()
133+
}
134+
135+
// Read implements io.Reader by delegating to the BackedReader.
136+
func (bp *BackedPipe) Read(p []byte) (int, error) {
137+
return bp.reader.Read(p)
138+
}
139+
140+
// Write implements io.Writer by delegating to the BackedWriter.
141+
func (bp *BackedPipe) Write(p []byte) (int, error) {
142+
bp.mu.RLock()
143+
writer := bp.writer
144+
state := bp.state
145+
bp.mu.RUnlock()
146+
147+
if state == closed {
148+
return 0, io.EOF
149+
}
150+
151+
return writer.Write(p)
152+
}
153+
154+
// Close closes the pipe and all underlying connections.
155+
func (bp *BackedPipe) Close() error {
156+
bp.mu.Lock()
157+
defer bp.mu.Unlock()
158+
159+
if bp.state == closed {
160+
return nil
161+
}
162+
163+
bp.state = closed
164+
bp.cancel() // Cancel main context
165+
166+
// Close all components in parallel to avoid deadlocks
167+
//
168+
// IMPORTANT: The connection must be closed first to unblock any
169+
// readers or writers that might be holding the mutex on Read/Write
170+
var g errgroup.Group
171+
172+
if bp.conn != nil {
173+
conn := bp.conn
174+
g.Go(func() error {
175+
return conn.Close()
176+
})
177+
bp.conn = nil
178+
}
179+
180+
if bp.reader != nil {
181+
reader := bp.reader
182+
g.Go(func() error {
183+
return reader.Close()
184+
})
185+
}
186+
187+
if bp.writer != nil {
188+
writer := bp.writer
189+
g.Go(func() error {
190+
return writer.Close()
191+
})
192+
}
193+
194+
// Wait for all close operations to complete and return any error
195+
return g.Wait()
196+
}
197+
198+
// Connected returns whether the pipe is currently connected.
199+
func (bp *BackedPipe) Connected() bool {
200+
bp.mu.RLock()
201+
defer bp.mu.RUnlock()
202+
return bp.state == connected && bp.reader.Connected() && bp.writer.Connected()
203+
}
204+
205+
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
206+
func (bp *BackedPipe) reconnectLocked() error {
207+
if bp.state == reconnecting {
208+
return ErrReconnectionInProgress
209+
}
210+
211+
bp.state = reconnecting
212+
defer func() {
213+
// Only reset to disconnected if we're still in reconnecting state
214+
// (successful reconnection will set state to connected)
215+
if bp.state == reconnecting {
216+
bp.state = disconnected
217+
}
218+
}()
219+
220+
// Close existing connection if any
221+
if bp.conn != nil {
222+
_ = bp.conn.Close()
223+
bp.conn = nil
224+
}
225+
226+
// Increment the generation and update both reader and writer.
227+
// We do it now to track even the connections that fail during
228+
// Reconnect.
229+
bp.connGen++
230+
bp.reader.SetGeneration(bp.connGen)
231+
bp.writer.SetGeneration(bp.connGen)
232+
233+
// Reconnect reader and writer
234+
seqNum := make(chan uint64, 1)
235+
newR := make(chan io.Reader, 1)
236+
237+
go bp.reader.Reconnect(seqNum, newR)
238+
239+
// Get the precise reader sequence number from the reader while it holds its lock
240+
readerSeqNum, ok := <-seqNum
241+
if !ok {
242+
// Reader was closed during reconnection
243+
return ErrReconnectFailed
244+
}
245+
246+
// Perform reconnect using the exact sequence number we just received
247+
conn, remoteReaderSeqNum, err := bp.reconnector.Reconnect(bp.ctx, readerSeqNum)
248+
if err != nil {
249+
// Unblock reader reconnect
250+
newR <- nil
251+
return ErrReconnectFailed
252+
}
253+
254+
// Provide the new connection to the reader (reader still holds its lock)
255+
newR <- conn
256+
257+
// Replay our outbound data from the remote's reader sequence number
258+
writerReconnectErr := bp.writer.Reconnect(remoteReaderSeqNum, conn)
259+
if writerReconnectErr != nil {
260+
return ErrReconnectWriterFailed
261+
}
262+
263+
// Success - update state
264+
bp.conn = conn
265+
bp.state = connected
266+
267+
return nil
268+
}
269+
270+
// handleErrors listens for connection errors from reader/writer and triggers reconnection.
271+
// It filters errors from old connections and ensures only the first error per generation
272+
// triggers reconnection.
273+
func (bp *BackedPipe) handleErrors() {
274+
for {
275+
select {
276+
case <-bp.ctx.Done():
277+
return
278+
case errorEvt := <-bp.errChan:
279+
bp.handleConnectionError(errorEvt)
280+
}
281+
}
282+
}
283+
284+
// handleConnectionError handles errors from either reader or writer components.
285+
// It filters errors from old connections and ensures only one reconnection per generation.
286+
func (bp *BackedPipe) handleConnectionError(errorEvt ErrorEvent) {
287+
bp.mu.Lock()
288+
defer bp.mu.Unlock()
289+
290+
// Skip if already closed
291+
if bp.state == closed {
292+
return
293+
}
294+
295+
// Filter errors from old connections (lower generation)
296+
if errorEvt.Generation < bp.connGen {
297+
return
298+
}
299+
300+
// Skip if not connected (already disconnected or reconnecting)
301+
if bp.state != connected {
302+
return
303+
}
304+
305+
// Skip if we've already seen an error for this generation
306+
if bp.lastErrorGen >= errorEvt.Generation {
307+
return
308+
}
309+
310+
// This is the first error for this generation
311+
bp.lastErrorGen = errorEvt.Generation
312+
313+
// Mark as disconnected
314+
bp.state = disconnected
315+
316+
// Try to reconnect using internal context
317+
reconnectErr := bp.reconnectLocked()
318+
319+
if reconnectErr != nil {
320+
// Reconnection failed - log or handle as needed
321+
// For now, we'll just continue and wait for manual reconnection
322+
_ = errorEvt.Err // Use the original error from the component
323+
_ = errorEvt.Component // Component info available for potential logging by higher layers
324+
}
325+
}
326+
327+
// ForceReconnect forces a reconnection attempt immediately.
328+
// This can be used to force a reconnection if a new connection is established.
329+
// It prevents duplicate reconnections when called concurrently.
330+
func (bp *BackedPipe) ForceReconnect() error {
331+
// Deduplicate concurrent ForceReconnect calls so only one reconnection
332+
// attempt runs at a time from this API. Use the pipe's internal context
333+
// to ensure Close() cancels any in-flight attempt.
334+
_, err, _ := bp.sf.Do("force-reconnect", func() (interface{}, error) {
335+
bp.mu.Lock()
336+
defer bp.mu.Unlock()
337+
338+
if bp.state == closed {
339+
return nil, io.EOF
340+
}
341+
342+
// Don't force reconnect if already reconnecting
343+
if bp.state == reconnecting {
344+
return nil, ErrReconnectionInProgress
345+
}
346+
347+
return nil, bp.reconnectLocked()
348+
})
349+
return err
350+
}

0 commit comments

Comments
 (0)