From 60cb67413e3087a0f7bd0aff7973584e70af40f9 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Wed, 16 Feb 2022 04:27:23 +0000 Subject: [PATCH] fix: Use buffered reader in peer to fix ShortBuffer This prevents a io.ErrShortBuffer from occurring when the byte slice being read is smaller than the chunks sent from the opposite pipe. This makes sense for unordered connections, where transmission is not guaranteed, but does not make sense for TCP-like connections. We use a bufio.Reader when ordered to ensure data isn't lost. --- .vscode/settings.json | 1 + go.mod | 5 ++--- go.sum | 2 -- peer/channel.go | 21 +++++++++++++++++++-- peer/conn_test.go | 21 +++++++++++++++++++++ 5 files changed, 43 insertions(+), 7 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 34ed9fbae2c42..d9b2b88f1798c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -57,6 +57,7 @@ "tfexec", "tfstate", "unconvert", + "webrtc", "xerrors", "yamux" ] diff --git a/go.mod b/go.mod index 290c7d3758b85..d082567bfa1f8 100644 --- a/go.mod +++ b/go.mod @@ -16,9 +16,9 @@ replace github.com/chzyer/readline => github.com/kylecarbs/readline v0.0.0-20220 require ( cdr.dev/slog v1.4.1 - github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 github.com/briandowns/spinner v1.18.1 github.com/coder/retry v1.3.0 + github.com/creack/pty v1.1.17 github.com/fatih/color v1.13.0 github.com/go-chi/chi/v5 v5.0.7 github.com/go-chi/render v1.0.1 @@ -50,6 +50,7 @@ require ( go.uber.org/goleak v1.1.12 golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 + golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/protobuf v1.27.1 nhooyr.io/websocket v1.8.7 @@ -67,7 +68,6 @@ require ( github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect github.com/containerd/continuity v0.2.2 // indirect - github.com/creack/pty v1.1.17 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dhui/dktest v0.3.9 // indirect github.com/dlclark/regexp2 v1.4.0 // indirect @@ -124,7 +124,6 @@ require ( github.com/zeebo/errs v1.2.2 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 8e4c8c18401ae..02d93b18ba563 100644 --- a/go.sum +++ b/go.sum @@ -103,8 +103,6 @@ github.com/Microsoft/hcsshim v0.8.23/go.mod h1:4zegtUJth7lAvFyc6cH2gGQ5B3OFQim01 github.com/Microsoft/hcsshim/test v0.0.0-20201218223536-d3e5debf77da/go.mod h1:5hlzMzRKMLyo42nCZ9oml8AdTlq/0cvIaBv6tK1RehU= github.com/Microsoft/hcsshim/test v0.0.0-20210227013316-43a75bb4edd3/go.mod h1:mw7qgWloBUl75W/gVH3cQszUg1+gUITj7D6NY7ywVnY= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= -github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63nhn5WAunQHLTznkw5W8b1Xc0dNjp83s= -github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= diff --git a/peer/channel.go b/peer/channel.go index b01154bcfaa25..d1f4930fe31f7 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -1,6 +1,7 @@ package peer import ( + "bufio" "context" "io" "net" @@ -78,7 +79,8 @@ type Channel struct { dc *webrtc.DataChannel // This field can be nil. It becomes set after the DataChannel // has been opened and is detached. - rwc datachannel.ReadWriteCloser + rwc datachannel.ReadWriteCloser + reader io.Reader closed chan struct{} closeMutex sync.Mutex @@ -130,6 +132,21 @@ func (c *Channel) init() { _ = c.closeWithError(xerrors.Errorf("detach: %w", err)) return } + // pion/webrtc will return an io.ErrShortBuffer when a read + // is triggerred with a buffer size less than the chunks written. + // + // This makes sense when considering UDP connections, because + // bufferring of data that has no transmit guarantees is likely + // to cause unexpected behavior. + // + // When ordered, this adds a bufio.Reader. This ensures additional + // data on TCP-like connections can be read in parts, while still + // being bufferred. + if c.opts.Unordered { + c.reader = c.rwc + } else { + c.reader = bufio.NewReader(c.rwc) + } close(c.opened) }) @@ -181,7 +198,7 @@ func (c *Channel) Read(bytes []byte) (int, error) { } } - bytesRead, err := c.rwc.Read(bytes) + bytesRead, err := c.reader.Read(bytes) if err != nil { if c.isClosed() { return 0, c.closeError diff --git a/peer/conn_test.go b/peer/conn_test.go index 519e5f3b743db..644390ba2ea68 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -267,6 +267,27 @@ func TestConn(t *testing.T) { _, err := client.Ping() require.NoError(t, err) }) + + t.Run("ShortBuffer", func(t *testing.T) { + t.Parallel() + client, server, _ := createPair(t) + exchange(client, server) + go func() { + channel, err := client.Dial(context.Background(), "test", nil) + require.NoError(t, err) + _, err = channel.Write([]byte{1, 2}) + require.NoError(t, err) + }() + channel, err := server.Accept(context.Background()) + require.NoError(t, err) + data := make([]byte, 1) + _, err = channel.Read(data) + require.NoError(t, err) + require.Equal(t, uint8(0x1), data[0]) + _, err = channel.Read(data) + require.NoError(t, err) + require.Equal(t, uint8(0x2), data[0]) + }) } func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.Router) {