-
Notifications
You must be signed in to change notification settings - Fork 891
fix: Use buffered reader in peer to fix ShortBuffer #303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,7 @@ | |
"tfexec", | ||
"tfstate", | ||
"unconvert", | ||
"webrtc", | ||
"xerrors", | ||
"yamux" | ||
] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package peer | ||
|
||
import ( | ||
"bufio" | ||
"context" | ||
"io" | ||
"net" | ||
|
@@ -78,7 +79,8 @@ type Channel struct { | |
dc *webrtc.DataChannel | ||
// This field can be nil. It becomes set after the DataChannel | ||
// has been opened and is detached. | ||
rwc datachannel.ReadWriteCloser | ||
rwc datachannel.ReadWriteCloser | ||
reader io.Reader | ||
|
||
closed chan struct{} | ||
closeMutex sync.Mutex | ||
|
@@ -130,6 +132,21 @@ func (c *Channel) init() { | |
_ = c.closeWithError(xerrors.Errorf("detach: %w", err)) | ||
return | ||
} | ||
// pion/webrtc will return an io.ErrShortBuffer when a read | ||
// is triggerred with a buffer size less than the chunks written. | ||
// | ||
// This makes sense when considering UDP connections, because | ||
// bufferring of data that has no transmit guarantees is likely | ||
// to cause unexpected behavior. | ||
// | ||
// When ordered, this adds a bufio.Reader. This ensures additional | ||
// data on TCP-like connections can be read in parts, while still | ||
// being bufferred. | ||
if c.opts.Unordered { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooo, wonder if we have the same issue with wsnet 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was really confused why
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for context, I was getting weird short buffer errors (that I had no idea how to reproduce easily or diagnose) when I tried to remove the envagent proxying, so I wonder whether this would've fixed it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, this almost certainly would have! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious if we use unordered connections via WebRTC in the product today? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nah, we don't! |
||
c.reader = c.rwc | ||
} else { | ||
c.reader = bufio.NewReader(c.rwc) | ||
} | ||
close(c.opened) | ||
}) | ||
|
||
|
@@ -181,7 +198,7 @@ func (c *Channel) Read(bytes []byte) (int, error) { | |
} | ||
} | ||
|
||
bytesRead, err := c.rwc.Read(bytes) | ||
bytesRead, err := c.reader.Read(bytes) | ||
if err != nil { | ||
if c.isClosed() { | ||
return 0, c.closeError | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for documenting this, very helpful. Wouldn't have been clear to me why we need this otherwise 👍