-
Notifications
You must be signed in to change notification settings - Fork 963
chore: add backed reader, writer and pipe implementation #19147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ibetitsmike
wants to merge
13
commits into
main
Choose a base branch
from
mike/immortal-streams-backed-base
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+3,307
−0
Open
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
07a1b9f
"chore: add backed reader, writer and pipe"
ibetitsmike f77d7bf
improvement to backed pipe concurrency and tests
ibetitsmike 086fbf8
chore: moved backedpipe to be a part of the agent code
ibetitsmike d82da56
PR feedback implemented
ibetitsmike a303e68
fmt
ibetitsmike fd2fd86
revereted WaitSuperShort - not used anymore
ibetitsmike 68c08b1
changed err to EOF instead of ErrClosedPipe and changed backed_writer…
ibetitsmike 568b1a4
added sentinel errors
ibetitsmike 07c6963
fixed eviction tests in writer and moved from assert to require in re…
ibetitsmike d0ab610
added holding the mutex during reconnection and test cases to verify …
ibetitsmike b2188f9
writer's tets cleanup
ibetitsmike f76783a
improvements to backed pipe
ibetitsmike 45558ec
coordinating tests with channels
ibetitsmike File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,293 @@ | ||
package backedpipe | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"sync" | ||
|
||
"golang.org/x/sync/errgroup" | ||
"golang.org/x/sync/singleflight" | ||
"golang.org/x/xerrors" | ||
) | ||
|
||
var ( | ||
ErrPipeClosed = xerrors.New("pipe is closed") | ||
ErrPipeAlreadyConnected = xerrors.New("pipe is already connected") | ||
ErrReconnectionInProgress = xerrors.New("reconnection already in progress") | ||
ErrReconnectFailed = xerrors.New("reconnect failed") | ||
ErrInvalidSequenceNumber = xerrors.New("remote sequence number exceeds local sequence") | ||
ErrReconnectWriterFailed = xerrors.New("reconnect writer failed") | ||
) | ||
|
||
const ( | ||
// Default buffer capacity used by the writer - 64MB | ||
DefaultBufferSize = 64 * 1024 * 1024 | ||
) | ||
|
||
// Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect. | ||
// Implementations should: | ||
// 1. Establish a new connection to the remote side | ||
// 2. Exchange sequence numbers with the remote side | ||
// 3. Return the new connection and the remote's current sequence number | ||
// | ||
// The writerSeqNum parameter is the local writer's current sequence number, | ||
// which should be sent to the remote side so it knows where to resume reading from. | ||
// | ||
// The returned readerSeqNum should be the remote side's current sequence number, | ||
// which indicates where the local reader should resume from. | ||
type Reconnector interface { | ||
Reconnect(ctx context.Context, writerSeqNum uint64) (conn io.ReadWriteCloser, readerSeqNum uint64, err error) | ||
} | ||
|
||
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections. | ||
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection | ||
// and data replay capabilities. | ||
type BackedPipe struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
mu sync.RWMutex | ||
reader *BackedReader | ||
writer *BackedWriter | ||
reconnector Reconnector | ||
conn io.ReadWriteCloser | ||
connected bool | ||
closed bool | ||
|
||
// Reconnection state | ||
reconnecting bool | ||
|
||
// Error channels for receiving connection errors from reader/writer separately | ||
readerErrorChan chan error | ||
writerErrorChan chan error | ||
|
||
// singleflight group to dedupe concurrent ForceReconnect calls | ||
sf singleflight.Group | ||
} | ||
|
||
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnector. | ||
// The pipe starts disconnected and must be connected using Connect(). | ||
func NewBackedPipe(ctx context.Context, reconnector Reconnector) *BackedPipe { | ||
pipeCtx, cancel := context.WithCancel(ctx) | ||
|
||
readerErrorChan := make(chan error, 1) // Buffer for reader errors | ||
writerErrorChan := make(chan error, 1) // Buffer for writer errors | ||
bp := &BackedPipe{ | ||
ctx: pipeCtx, | ||
cancel: cancel, | ||
reader: NewBackedReader(readerErrorChan), | ||
writer: NewBackedWriter(DefaultBufferSize, writerErrorChan), | ||
reconnector: reconnector, | ||
readerErrorChan: readerErrorChan, | ||
writerErrorChan: writerErrorChan, | ||
} | ||
|
||
// Start error handler goroutine | ||
go bp.handleErrors() | ||
|
||
return bp | ||
} | ||
|
||
// Connect establishes the initial connection using the reconnect function. | ||
func (bp *BackedPipe) Connect() error { | ||
bp.mu.Lock() | ||
defer bp.mu.Unlock() | ||
|
||
if bp.closed { | ||
return ErrPipeClosed | ||
} | ||
|
||
if bp.connected { | ||
return ErrPipeAlreadyConnected | ||
} | ||
|
||
// Use internal context for the actual reconnect operation to ensure | ||
// Close() reliably cancels any in-flight attempt. | ||
return bp.reconnectLocked() | ||
} | ||
|
||
// Read implements io.Reader by delegating to the BackedReader. | ||
func (bp *BackedPipe) Read(p []byte) (int, error) { | ||
return bp.reader.Read(p) | ||
} | ||
|
||
// Write implements io.Writer by delegating to the BackedWriter. | ||
func (bp *BackedPipe) Write(p []byte) (int, error) { | ||
bp.mu.RLock() | ||
writer := bp.writer | ||
closed := bp.closed | ||
bp.mu.RUnlock() | ||
|
||
if closed { | ||
return 0, io.EOF | ||
} | ||
|
||
return writer.Write(p) | ||
} | ||
|
||
// Close closes the pipe and all underlying connections. | ||
func (bp *BackedPipe) Close() error { | ||
bp.mu.Lock() | ||
defer bp.mu.Unlock() | ||
|
||
if bp.closed { | ||
return nil | ||
} | ||
|
||
bp.closed = true | ||
bp.cancel() // Cancel main context | ||
|
||
// Close all components in parallel to avoid deadlocks | ||
// | ||
// IMPORTANT: The connection must be closed first to unblock any | ||
// readers or writers that might be holding the mutex on Read/Write | ||
var g errgroup.Group | ||
|
||
if bp.conn != nil { | ||
conn := bp.conn | ||
g.Go(func() error { | ||
return conn.Close() | ||
}) | ||
bp.conn = nil | ||
} | ||
|
||
if bp.reader != nil { | ||
reader := bp.reader | ||
g.Go(func() error { | ||
return reader.Close() | ||
}) | ||
} | ||
|
||
if bp.writer != nil { | ||
writer := bp.writer | ||
g.Go(func() error { | ||
return writer.Close() | ||
}) | ||
} | ||
|
||
bp.connected = false | ||
|
||
// Wait for all close operations to complete and return any error | ||
return g.Wait() | ||
} | ||
|
||
// Connected returns whether the pipe is currently connected. | ||
func (bp *BackedPipe) Connected() bool { | ||
bp.mu.RLock() | ||
defer bp.mu.RUnlock() | ||
return bp.connected | ||
} | ||
|
||
// reconnectLocked handles the reconnection logic. Must be called with write lock held. | ||
func (bp *BackedPipe) reconnectLocked() error { | ||
if bp.reconnecting { | ||
return ErrReconnectionInProgress | ||
} | ||
|
||
bp.reconnecting = true | ||
defer func() { | ||
bp.reconnecting = false | ||
}() | ||
|
||
// Close existing connection if any | ||
if bp.conn != nil { | ||
_ = bp.conn.Close() | ||
bp.conn = nil | ||
} | ||
|
||
bp.connected = false | ||
|
||
// Get current writer sequence number to send to remote | ||
writerSeqNum := bp.writer.SequenceNum() | ||
|
||
conn, readerSeqNum, err := bp.reconnector.Reconnect(bp.ctx, writerSeqNum) | ||
if err != nil { | ||
return ErrReconnectFailed | ||
} | ||
|
||
// Validate sequence numbers | ||
if readerSeqNum > writerSeqNum { | ||
_ = conn.Close() | ||
return ErrInvalidSequenceNumber | ||
} | ||
|
||
// Reconnect reader and writer | ||
seqNum := make(chan uint64, 1) | ||
newR := make(chan io.Reader, 1) | ||
|
||
go bp.reader.Reconnect(seqNum, newR) | ||
|
||
// Get sequence number and send new reader | ||
<-seqNum | ||
newR <- conn | ||
|
||
err = bp.writer.Reconnect(readerSeqNum, conn) | ||
if err != nil { | ||
_ = conn.Close() | ||
return ErrReconnectWriterFailed | ||
} | ||
|
||
// Success - update state | ||
bp.conn = conn | ||
bp.connected = true | ||
|
||
return nil | ||
} | ||
|
||
// handleErrors listens for connection errors from reader/writer and triggers reconnection. | ||
func (bp *BackedPipe) handleErrors() { | ||
for { | ||
select { | ||
case <-bp.ctx.Done(): | ||
return | ||
case err := <-bp.readerErrorChan: | ||
// Reader connection error occurred | ||
bp.handleConnectionError(err, "reader") | ||
case err := <-bp.writerErrorChan: | ||
// Writer connection error occurred | ||
bp.handleConnectionError(err, "writer") | ||
} | ||
} | ||
} | ||
|
||
// handleConnectionError handles errors from either reader or writer components. | ||
func (bp *BackedPipe) handleConnectionError(err error, component string) { | ||
bp.mu.Lock() | ||
defer bp.mu.Unlock() | ||
|
||
// Skip if already closed or not connected | ||
if bp.closed || !bp.connected { | ||
return | ||
} | ||
|
||
// Mark as disconnected | ||
bp.connected = false | ||
|
||
// Try to reconnect using internal context | ||
reconnectErr := bp.reconnectLocked() | ||
|
||
if reconnectErr != nil { | ||
// Reconnection failed - log or handle as needed | ||
// For now, we'll just continue and wait for manual reconnection | ||
_ = err // Use the original error from the component | ||
_ = component // Component info available for potential logging by higher layers | ||
} | ||
} | ||
|
||
// ForceReconnect forces a reconnection attempt immediately. | ||
// This can be used to force a reconnection if a new connection is established. | ||
func (bp *BackedPipe) ForceReconnect() error { | ||
// Deduplicate concurrent ForceReconnect calls so only one reconnection | ||
// attempt runs at a time from this API. Use the pipe's internal context | ||
// to ensure Close() cancels any in-flight attempt. | ||
_, err, _ := bp.sf.Do("backedpipe-reconnect", func() (interface{}, error) { | ||
bp.mu.Lock() | ||
defer bp.mu.Unlock() | ||
|
||
if bp.closed { | ||
return nil, io.EOF | ||
} | ||
|
||
return nil, bp.reconnectLocked() | ||
}) | ||
return err | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.