-
Notifications
You must be signed in to change notification settings - Fork 962
chore: add backed reader, writer and pipe implementation #19147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
07a1b9f
f77d7bf
086fbf8
d82da56
a303e68
fd2fd86
68c08b1
568b1a4
07c6963
d0ab610
b2188f9
f76783a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,290 @@ | ||
package backedpipe | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"sync" | ||
|
||
"golang.org/x/sync/singleflight" | ||
"golang.org/x/xerrors" | ||
) | ||
|
||
var ( | ||
ErrPipeClosed = xerrors.New("pipe is closed") | ||
ErrPipeAlreadyConnected = xerrors.New("pipe is already connected") | ||
ErrReconnectionInProgress = xerrors.New("reconnection already in progress") | ||
ErrReconnectFailed = xerrors.New("reconnect failed") | ||
ErrInvalidSequenceNumber = xerrors.New("remote sequence number exceeds local sequence") | ||
ErrReconnectWriterFailed = xerrors.New("reconnect writer failed") | ||
) | ||
|
||
const ( | ||
// Default buffer capacity used by the writer - 64MB | ||
DefaultBufferSize = 64 * 1024 * 1024 | ||
) | ||
|
||
// ReconnectFunc is called when the BackedPipe needs to establish a new connection. | ||
// It should: | ||
// 1. Establish a new connection to the remote side | ||
// 2. Exchange sequence numbers with the remote side | ||
// 3. Return the new connection and the remote's current sequence number | ||
// | ||
// The writerSeqNum parameter is the local writer's current sequence number, | ||
// which should be sent to the remote side so it knows where to resume reading from. | ||
// | ||
// The returned readerSeqNum should be the remote side's current sequence number, | ||
// which indicates where the local reader should resume from. | ||
type ReconnectFunc func(ctx context.Context, writerSeqNum uint64) (conn io.ReadWriteCloser, readerSeqNum uint64, err error) | ||
|
||
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections. | ||
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection | ||
// and data replay capabilities. | ||
type BackedPipe struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
mu sync.RWMutex | ||
reader *BackedReader | ||
writer *BackedWriter | ||
reconnectFn ReconnectFunc | ||
conn io.ReadWriteCloser | ||
connected bool | ||
closed bool | ||
|
||
// Reconnection state | ||
reconnecting bool | ||
|
||
// Error channel for receiving connection errors from reader/writer | ||
errorChan chan error | ||
|
||
// singleflight group to dedupe concurrent ForceReconnect calls | ||
sf singleflight.Group | ||
} | ||
|
||
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnect function. | ||
// The pipe starts disconnected and must be connected using Connect(). | ||
func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe { | ||
pipeCtx, cancel := context.WithCancel(ctx) | ||
|
||
errorChan := make(chan error, 2) // Buffer for reader and writer errors | ||
bp := &BackedPipe{ | ||
ctx: pipeCtx, | ||
cancel: cancel, | ||
reader: NewBackedReader(), | ||
writer: NewBackedWriter(DefaultBufferSize, errorChan), | ||
reconnectFn: reconnectFn, | ||
errorChan: errorChan, | ||
} | ||
|
||
// Set up error callback for reader only (writer uses error channel directly) | ||
bp.reader.SetErrorCallback(func(err error) { | ||
select { | ||
case bp.errorChan <- err: | ||
case <-bp.ctx.Done(): | ||
} | ||
}) | ||
|
||
// Start error handler goroutine | ||
go bp.handleErrors() | ||
|
||
return bp | ||
} | ||
|
||
// Connect establishes the initial connection using the reconnect function. | ||
func (bp *BackedPipe) Connect() error { | ||
bp.mu.Lock() | ||
defer bp.mu.Unlock() | ||
|
||
if bp.closed { | ||
return ErrPipeClosed | ||
} | ||
|
||
if bp.connected { | ||
return ErrPipeAlreadyConnected | ||
} | ||
|
||
// Use internal context for the actual reconnect operation to ensure | ||
// Close() reliably cancels any in-flight attempt. | ||
return bp.reconnectLocked() | ||
} | ||
|
||
// Read implements io.Reader by delegating to the BackedReader. | ||
func (bp *BackedPipe) Read(p []byte) (int, error) { | ||
bp.mu.RLock() | ||
reader := bp.reader | ||
closed := bp.closed | ||
bp.mu.RUnlock() | ||
|
||
if closed { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't really need this check because the underlying reader will get closed and return EOF. That means you only reference the |
||
return 0, io.EOF | ||
} | ||
|
||
return reader.Read(p) | ||
} | ||
|
||
// Write implements io.Writer by delegating to the BackedWriter. | ||
func (bp *BackedPipe) Write(p []byte) (int, error) { | ||
bp.mu.RLock() | ||
writer := bp.writer | ||
closed := bp.closed | ||
bp.mu.RUnlock() | ||
|
||
if closed { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same deal as read --- the |
||
return 0, io.EOF | ||
} | ||
|
||
return writer.Write(p) | ||
} | ||
|
||
// Close closes the pipe and all underlying connections. | ||
func (bp *BackedPipe) Close() error { | ||
bp.mu.Lock() | ||
defer bp.mu.Unlock() | ||
|
||
if bp.closed { | ||
return nil | ||
} | ||
|
||
bp.closed = true | ||
bp.cancel() // Cancel main context | ||
|
||
// Close underlying components | ||
var readerErr, writerErr, connErr error | ||
|
||
if bp.reader != nil { | ||
readerErr = bp.reader.Close() | ||
} | ||
|
||
if bp.writer != nil { | ||
writerErr = bp.writer.Close() | ||
} | ||
|
||
if bp.conn != nil { | ||
connErr = bp.conn.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the conn isn't closed then closing the writer or reader could deadlock on io. In particular, if the connection is idle, then the reader will be just blocked reading the conn and you will deadlock until some data gets sent. You could reorder it so the conn is closed first, or use an golang.org/x/sync/errgroup to close them all in parallel and also do your error consolidation. |
||
bp.conn = nil | ||
} | ||
|
||
bp.connected = false | ||
|
||
// Return first error encountered | ||
if readerErr != nil { | ||
return readerErr | ||
} | ||
if writerErr != nil { | ||
return writerErr | ||
} | ||
return connErr | ||
} | ||
|
||
// Connected returns whether the pipe is currently connected. | ||
func (bp *BackedPipe) Connected() bool { | ||
bp.mu.RLock() | ||
defer bp.mu.RUnlock() | ||
return bp.connected | ||
} | ||
|
||
// reconnectLocked handles the reconnection logic. Must be called with write lock held. | ||
func (bp *BackedPipe) reconnectLocked() error { | ||
if bp.reconnecting { | ||
return ErrReconnectionInProgress | ||
} | ||
|
||
bp.reconnecting = true | ||
defer func() { | ||
bp.reconnecting = false | ||
}() | ||
|
||
// Close existing connection if any | ||
if bp.conn != nil { | ||
_ = bp.conn.Close() | ||
bp.conn = nil | ||
} | ||
|
||
bp.connected = false | ||
|
||
// Get current writer sequence number to send to remote | ||
writerSeqNum := bp.writer.SequenceNum() | ||
|
||
conn, readerSeqNum, err := bp.reconnectFn(bp.ctx, writerSeqNum) | ||
if err != nil { | ||
return ErrReconnectFailed | ||
} | ||
|
||
// Validate sequence numbers | ||
if readerSeqNum > writerSeqNum { | ||
_ = conn.Close() | ||
return ErrInvalidSequenceNumber | ||
} | ||
|
||
// Reconnect reader and writer | ||
seqNum := make(chan uint64, 1) | ||
newR := make(chan io.Reader, 1) | ||
|
||
go bp.reader.Reconnect(seqNum, newR) | ||
|
||
// Get sequence number and send new reader | ||
<-seqNum | ||
newR <- conn | ||
|
||
err = bp.writer.Reconnect(readerSeqNum, conn) | ||
if err != nil { | ||
_ = conn.Close() | ||
return ErrReconnectWriterFailed | ||
} | ||
|
||
// Success - update state | ||
bp.conn = conn | ||
bp.connected = true | ||
|
||
return nil | ||
} | ||
|
||
// handleErrors listens for connection errors from reader/writer and triggers reconnection. | ||
func (bp *BackedPipe) handleErrors() { | ||
for { | ||
select { | ||
case <-bp.ctx.Done(): | ||
return | ||
case err := <-bp.errorChan: | ||
// Connection error occurred | ||
bp.mu.Lock() | ||
|
||
// Skip if already closed or not connected | ||
if bp.closed || !bp.connected { | ||
bp.mu.Unlock() | ||
continue | ||
} | ||
|
||
// Mark as disconnected | ||
bp.connected = false | ||
|
||
// Try to reconnect using internal context | ||
reconnectErr := bp.reconnectLocked() | ||
bp.mu.Unlock() | ||
|
||
if reconnectErr != nil { | ||
// Reconnection failed - log or handle as needed | ||
// For now, we'll just continue and wait for manual reconnection | ||
_ = err // Use the original error | ||
} | ||
} | ||
} | ||
} | ||
|
||
// ForceReconnect forces a reconnection attempt immediately. | ||
// This can be used to force a reconnection if a new connection is established. | ||
func (bp *BackedPipe) ForceReconnect() error { | ||
// Deduplicate concurrent ForceReconnect calls so only one reconnection | ||
// attempt runs at a time from this API. Use the pipe's internal context | ||
// to ensure Close() cancels any in-flight attempt. | ||
_, err, _ := bp.sf.Do("backedpipe-reconnect", func() (interface{}, error) { | ||
bp.mu.Lock() | ||
defer bp.mu.Unlock() | ||
|
||
if bp.closed { | ||
return nil, io.EOF | ||
} | ||
|
||
return nil, bp.reconnectLocked() | ||
}) | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to handle both initial connections and reconnections, the thing that handles the connections is going to need to be stateful. On the client side it needs to post a new immortal stream the first time, and then store the name/ID of the stream for re-dialing. Instead of making this a function, make it an interface type with the reconnection function on it.
It's much easier to understand this kind of thing because you can ask your IDE to show you all implementations of an interface type.
It also avoids the awkward closure that you define up the stack to handle the statefulness. It can be a regular function on the struct type rather than an anonymous closure over it. Also makes stack traces way more readable.