Skip to content

Commit 14e155e

Browse files
committed
Merge remote-tracking branch 'origin/main' into ssncferreira/support_custom_notifications
2 parents 681a29f + 4c98dec commit 14e155e

File tree

8 files changed

+3737
-0
lines changed

8 files changed

+3737
-0
lines changed
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
package backedpipe
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
8+
"golang.org/x/sync/errgroup"
9+
"golang.org/x/sync/singleflight"
10+
"golang.org/x/xerrors"
11+
)
12+
13+
var (
14+
ErrPipeClosed = xerrors.New("pipe is closed")
15+
ErrPipeAlreadyConnected = xerrors.New("pipe is already connected")
16+
ErrReconnectionInProgress = xerrors.New("reconnection already in progress")
17+
ErrReconnectFailed = xerrors.New("reconnect failed")
18+
ErrInvalidSequenceNumber = xerrors.New("remote sequence number exceeds local sequence")
19+
ErrReconnectWriterFailed = xerrors.New("reconnect writer failed")
20+
)
21+
22+
// connectionState represents the current state of the BackedPipe connection.
23+
type connectionState int
24+
25+
const (
26+
// connected indicates the pipe is connected and operational.
27+
connected connectionState = iota
28+
// disconnected indicates the pipe is not connected but not closed.
29+
disconnected
30+
// reconnecting indicates a reconnection attempt is in progress.
31+
reconnecting
32+
// closed indicates the pipe is permanently closed.
33+
closed
34+
)
35+
36+
// ErrorEvent represents an error from a reader or writer with connection generation info.
37+
type ErrorEvent struct {
38+
Err error
39+
Component string // "reader" or "writer"
40+
Generation uint64 // connection generation when error occurred
41+
}
42+
43+
const (
44+
// Default buffer capacity used by the writer - 64MB
45+
DefaultBufferSize = 64 * 1024 * 1024
46+
)
47+
48+
// Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect.
49+
// Implementations should:
50+
// 1. Establish a new connection to the remote side
51+
// 2. Exchange sequence numbers with the remote side
52+
// 3. Return the new connection and the remote's reader sequence number
53+
//
54+
// The readerSeqNum parameter is the local reader's current sequence number
55+
// (total bytes successfully read from the remote). This must be sent to the
56+
// remote so it can replay its data to us starting from this number.
57+
//
58+
// The returned remoteReaderSeqNum should be the remote side's reader sequence
59+
// number (how many bytes of our outbound data it has successfully read). This
60+
// informs our writer where to resume (i.e., which bytes to replay to the remote).
61+
type Reconnector interface {
62+
Reconnect(ctx context.Context, readerSeqNum uint64) (conn io.ReadWriteCloser, remoteReaderSeqNum uint64, err error)
63+
}
64+
65+
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
66+
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection
67+
// and data replay capabilities.
68+
type BackedPipe struct {
69+
ctx context.Context
70+
cancel context.CancelFunc
71+
mu sync.RWMutex
72+
reader *BackedReader
73+
writer *BackedWriter
74+
reconnector Reconnector
75+
conn io.ReadWriteCloser
76+
77+
// State machine
78+
state connectionState
79+
connGen uint64 // Increments on each successful reconnection
80+
81+
// Unified error handling with generation filtering
82+
errChan chan ErrorEvent
83+
84+
// singleflight group to dedupe concurrent ForceReconnect calls
85+
sf singleflight.Group
86+
87+
// Track first error per generation to avoid duplicate reconnections
88+
lastErrorGen uint64
89+
}
90+
91+
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnector.
92+
// The pipe starts disconnected and must be connected using Connect().
93+
func NewBackedPipe(ctx context.Context, reconnector Reconnector) *BackedPipe {
94+
pipeCtx, cancel := context.WithCancel(ctx)
95+
96+
errChan := make(chan ErrorEvent, 1)
97+
98+
bp := &BackedPipe{
99+
ctx: pipeCtx,
100+
cancel: cancel,
101+
reconnector: reconnector,
102+
state: disconnected,
103+
connGen: 0, // Start with generation 0
104+
errChan: errChan,
105+
}
106+
107+
// Create reader and writer with typed error channel for generation-aware error reporting
108+
bp.reader = NewBackedReader(errChan)
109+
bp.writer = NewBackedWriter(DefaultBufferSize, errChan)
110+
111+
// Start error handler goroutine
112+
go bp.handleErrors()
113+
114+
return bp
115+
}
116+
117+
// Connect establishes the initial connection using the reconnect function.
118+
func (bp *BackedPipe) Connect() error {
119+
bp.mu.Lock()
120+
defer bp.mu.Unlock()
121+
122+
if bp.state == closed {
123+
return ErrPipeClosed
124+
}
125+
126+
if bp.state == connected {
127+
return ErrPipeAlreadyConnected
128+
}
129+
130+
// Use internal context for the actual reconnect operation to ensure
131+
// Close() reliably cancels any in-flight attempt.
132+
return bp.reconnectLocked()
133+
}
134+
135+
// Read implements io.Reader by delegating to the BackedReader.
136+
func (bp *BackedPipe) Read(p []byte) (int, error) {
137+
return bp.reader.Read(p)
138+
}
139+
140+
// Write implements io.Writer by delegating to the BackedWriter.
141+
func (bp *BackedPipe) Write(p []byte) (int, error) {
142+
bp.mu.RLock()
143+
writer := bp.writer
144+
state := bp.state
145+
bp.mu.RUnlock()
146+
147+
if state == closed {
148+
return 0, io.EOF
149+
}
150+
151+
return writer.Write(p)
152+
}
153+
154+
// Close closes the pipe and all underlying connections.
155+
func (bp *BackedPipe) Close() error {
156+
bp.mu.Lock()
157+
defer bp.mu.Unlock()
158+
159+
if bp.state == closed {
160+
return nil
161+
}
162+
163+
bp.state = closed
164+
bp.cancel() // Cancel main context
165+
166+
// Close all components in parallel to avoid deadlocks
167+
//
168+
// IMPORTANT: The connection must be closed first to unblock any
169+
// readers or writers that might be holding the mutex on Read/Write
170+
var g errgroup.Group
171+
172+
if bp.conn != nil {
173+
conn := bp.conn
174+
g.Go(func() error {
175+
return conn.Close()
176+
})
177+
bp.conn = nil
178+
}
179+
180+
if bp.reader != nil {
181+
reader := bp.reader
182+
g.Go(func() error {
183+
return reader.Close()
184+
})
185+
}
186+
187+
if bp.writer != nil {
188+
writer := bp.writer
189+
g.Go(func() error {
190+
return writer.Close()
191+
})
192+
}
193+
194+
// Wait for all close operations to complete and return any error
195+
return g.Wait()
196+
}
197+
198+
// Connected returns whether the pipe is currently connected.
199+
func (bp *BackedPipe) Connected() bool {
200+
bp.mu.RLock()
201+
defer bp.mu.RUnlock()
202+
return bp.state == connected && bp.reader.Connected() && bp.writer.Connected()
203+
}
204+
205+
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
206+
func (bp *BackedPipe) reconnectLocked() error {
207+
if bp.state == reconnecting {
208+
return ErrReconnectionInProgress
209+
}
210+
211+
bp.state = reconnecting
212+
defer func() {
213+
// Only reset to disconnected if we're still in reconnecting state
214+
// (successful reconnection will set state to connected)
215+
if bp.state == reconnecting {
216+
bp.state = disconnected
217+
}
218+
}()
219+
220+
// Close existing connection if any
221+
if bp.conn != nil {
222+
_ = bp.conn.Close()
223+
bp.conn = nil
224+
}
225+
226+
// Increment the generation and update both reader and writer.
227+
// We do it now to track even the connections that fail during
228+
// Reconnect.
229+
bp.connGen++
230+
bp.reader.SetGeneration(bp.connGen)
231+
bp.writer.SetGeneration(bp.connGen)
232+
233+
// Reconnect reader and writer
234+
seqNum := make(chan uint64, 1)
235+
newR := make(chan io.Reader, 1)
236+
237+
go bp.reader.Reconnect(seqNum, newR)
238+
239+
// Get the precise reader sequence number from the reader while it holds its lock
240+
readerSeqNum, ok := <-seqNum
241+
if !ok {
242+
// Reader was closed during reconnection
243+
return ErrReconnectFailed
244+
}
245+
246+
// Perform reconnect using the exact sequence number we just received
247+
conn, remoteReaderSeqNum, err := bp.reconnector.Reconnect(bp.ctx, readerSeqNum)
248+
if err != nil {
249+
// Unblock reader reconnect
250+
newR <- nil
251+
return ErrReconnectFailed
252+
}
253+
254+
// Provide the new connection to the reader (reader still holds its lock)
255+
newR <- conn
256+
257+
// Replay our outbound data from the remote's reader sequence number
258+
writerReconnectErr := bp.writer.Reconnect(remoteReaderSeqNum, conn)
259+
if writerReconnectErr != nil {
260+
return ErrReconnectWriterFailed
261+
}
262+
263+
// Success - update state
264+
bp.conn = conn
265+
bp.state = connected
266+
267+
return nil
268+
}
269+
270+
// handleErrors listens for connection errors from reader/writer and triggers reconnection.
271+
// It filters errors from old connections and ensures only the first error per generation
272+
// triggers reconnection.
273+
func (bp *BackedPipe) handleErrors() {
274+
for {
275+
select {
276+
case <-bp.ctx.Done():
277+
return
278+
case errorEvt := <-bp.errChan:
279+
bp.handleConnectionError(errorEvt)
280+
}
281+
}
282+
}
283+
284+
// handleConnectionError handles errors from either reader or writer components.
285+
// It filters errors from old connections and ensures only one reconnection per generation.
286+
func (bp *BackedPipe) handleConnectionError(errorEvt ErrorEvent) {
287+
bp.mu.Lock()
288+
defer bp.mu.Unlock()
289+
290+
// Skip if already closed
291+
if bp.state == closed {
292+
return
293+
}
294+
295+
// Filter errors from old connections (lower generation)
296+
if errorEvt.Generation < bp.connGen {
297+
return
298+
}
299+
300+
// Skip if not connected (already disconnected or reconnecting)
301+
if bp.state != connected {
302+
return
303+
}
304+
305+
// Skip if we've already seen an error for this generation
306+
if bp.lastErrorGen >= errorEvt.Generation {
307+
return
308+
}
309+
310+
// This is the first error for this generation
311+
bp.lastErrorGen = errorEvt.Generation
312+
313+
// Mark as disconnected
314+
bp.state = disconnected
315+
316+
// Try to reconnect using internal context
317+
reconnectErr := bp.reconnectLocked()
318+
319+
if reconnectErr != nil {
320+
// Reconnection failed - log or handle as needed
321+
// For now, we'll just continue and wait for manual reconnection
322+
_ = errorEvt.Err // Use the original error from the component
323+
_ = errorEvt.Component // Component info available for potential logging by higher layers
324+
}
325+
}
326+
327+
// ForceReconnect forces a reconnection attempt immediately.
328+
// This can be used to force a reconnection if a new connection is established.
329+
// It prevents duplicate reconnections when called concurrently.
330+
func (bp *BackedPipe) ForceReconnect() error {
331+
// Deduplicate concurrent ForceReconnect calls so only one reconnection
332+
// attempt runs at a time from this API. Use the pipe's internal context
333+
// to ensure Close() cancels any in-flight attempt.
334+
_, err, _ := bp.sf.Do("force-reconnect", func() (interface{}, error) {
335+
bp.mu.Lock()
336+
defer bp.mu.Unlock()
337+
338+
if bp.state == closed {
339+
return nil, io.EOF
340+
}
341+
342+
// Don't force reconnect if already reconnecting
343+
if bp.state == reconnecting {
344+
return nil, ErrReconnectionInProgress
345+
}
346+
347+
return nil, bp.reconnectLocked()
348+
})
349+
return err
350+
}

0 commit comments

Comments
 (0)