-
Notifications
You must be signed in to change notification settings - Fork 981
chore: add backed reader, writer and pipe implementation #19147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
9dd9c4a
to
3223bf9
Compare
d2ee08d
to
27da7ef
Compare
16abe05
to
dde9516
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not done with my review, but taking a break and wanted to send you comments so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another batch of comments.
I haven't really looked at BackedPipe yet, but I think maybe it would be better to wait for the comments I've made so far to be resolved.
7468299
to
b2188f9
Compare
45558ec
to
064514e
Compare
95dc01a
to
85c505d
Compare
… to block on errors
…being closed/reconnected race conditions
separate channels for writer/reader errors moved to an interface for reconnection coordination logic for closing a pipe
…etter error tracking
28927dc
to
9cafe05
Compare
// - readerSeqNum: how many bytes we've successfully read from remote (send to remote) | ||
// - writerSeqNum: how many bytes we've written so far (used for validation) | ||
readerSeqNum := bp.reader.SequenceNum() | ||
writerSeqNum := bp.writer.SequenceNum() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you read these values here, they can be out of date by the time you use them, since you release the locks and reads & writes can flow.
|
||
// Get sequence number and send new reader | ||
<-seqNum | ||
newR <- conn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to bp.reconnector.Reconnect()
has to happen between reading the sequence number chan and writing the connection, so that the lock is held while we are reconnecting. And, the sequence number you psss to Reconnect
has to be the one you got from the channel. Otherwise we could read more data and then get a replay from the wrong spot.
The whole point of using channels on this function on the reader is to allow this.
} | ||
|
||
// Validate sequence numbers | ||
if remoteReaderSeqNum > writerSeqNum { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writer's Reconnect()
method already handles this check, and it does so while holding the lock. Doing it here means we are subject to data races which could give false errors.
Fixes: #18101
This PR introduces a new
backedpipe
package that provides reliable bidirectional byte streams over unreliable network connections. The implementation includes:BackedPipe
: Orchestrates a reader and writer to provide transparent reconnection and data replayBackedReader
: Handles reading with automatic reconnection, blocking reads when disconnectedBackedWriter
: Maintains a ring buffer of recent writes for replay during reconnectionRingBuffer
: Efficient circular buffer implementation for storing dataThe package enables resilient connections by tracking sequence numbers and replaying missed data after reconnection. It handles connection failures gracefully, automatically reconnecting and resuming data transfer from the appropriate point.