From eaacbca74ff99d3b66116b0e2ed4e8648369e948 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 21 Jul 2022 12:31:32 +0300 Subject: [PATCH 1/8] fix: Potential deadlock in peer.Channel dc.OnOpen --- peer/channel.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/peer/channel.go b/peer/channel.go index c0415e50baa1a..a9ee9148854f6 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -122,15 +122,16 @@ func (c *Channel) init() { }) c.dc.OnOpen(func() { c.closeMutex.Lock() - defer c.closeMutex.Unlock() - c.conn.logger().Debug(context.Background(), "datachannel opening", slog.F("id", c.dc.ID()), slog.F("label", c.dc.Label())) var err error c.rwc, err = c.dc.Detach() if err != nil { + c.closeMutex.Unlock() _ = c.closeWithError(xerrors.Errorf("detach: %w", err)) return } + c.closeMutex.Unlock() + // pion/webrtc will return an io.ErrShortBuffer when a read // is triggerred with a buffer size less than the chunks written. // From 5b5dde93e97eac202925d807fd6731594c5fe906 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 21 Jul 2022 12:32:34 +0300 Subject: [PATCH 2/8] fix: Potential send on closed channel --- peer/channel.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/peer/channel.go b/peer/channel.go index a9ee9148854f6..322b027cee3ae 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -106,6 +106,10 @@ func (c *Channel) init() { // write operations to block once the threshold is set. c.dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) c.dc.OnBufferedAmountLow(func() { + // Grab the lock to protect the sendMore channel from being + // closed in between the isClosed check and the send. + c.closeMutex.Lock() + defer c.closeMutex.Unlock() if c.isClosed() { return } From 8061d4c031aeb4f6c9f9a3e200f30892b186dccb Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 21 Jul 2022 12:34:23 +0300 Subject: [PATCH 3/8] fix: Improve robustness of waitOpened during close --- peer/channel.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/peer/channel.go b/peer/channel.go index 322b027cee3ae..d352985d882ba 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -194,9 +194,6 @@ func (c *Channel) init() { // // This will block until the underlying DataChannel has been opened. func (c *Channel) Read(bytes []byte) (int, error) { - if c.isClosed() { - return 0, c.closeError - } err := c.waitOpened() if err != nil { return 0, err @@ -233,9 +230,6 @@ func (c *Channel) Write(bytes []byte) (n int, err error) { c.writeMutex.Lock() defer c.writeMutex.Unlock() - if c.isClosed() { - return 0, c.closeWithError(nil) - } err = c.waitOpened() if err != nil { return 0, err @@ -313,7 +307,13 @@ func (c *Channel) isClosed() bool { func (c *Channel) waitOpened() error { select { case <-c.opened: - return nil + // Re-check to prioritize the closed channel. + select { + case <-c.closed: + return c.closeError + default: + return nil + } case <-c.closed: return c.closeError } From 077b02e56c6ca5977856bfe837a533b85026ec3f Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 21 Jul 2022 12:37:31 +0300 Subject: [PATCH 4/8] chore: Simplify statements --- peer/channel.go | 1 - peer/conn.go | 6 ------ 2 files changed, 7 deletions(-) diff --git a/peer/channel.go b/peer/channel.go index d352985d882ba..46668b87fb152 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -115,7 +115,6 @@ func (c *Channel) init() { } select { case <-c.closed: - return case c.sendMore <- struct{}{}: default: } diff --git a/peer/conn.go b/peer/conn.go index 8eae101ccdbbe..2e67b500ee5fd 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -3,7 +3,6 @@ package peer import ( "bytes" "context" - "crypto/rand" "io" "sync" @@ -256,7 +255,6 @@ func (c *Conn) init() error { c.logger().Debug(context.Background(), "sending local candidate", slog.F("candidate", iceCandidate.ToJSON().Candidate)) select { case <-c.closed: - break case c.localCandidateChannel <- iceCandidate.ToJSON(): } }() @@ -265,7 +263,6 @@ func (c *Conn) init() error { go func() { select { case <-c.closed: - return case c.dcOpenChannel <- dc: } }() @@ -435,9 +432,6 @@ func (c *Conn) pingEchoChannel() (*Channel, error) { data := make([]byte, pingDataLength) bytesRead, err := c.pingEchoChan.Read(data) if err != nil { - if c.isClosed() { - return - } _ = c.CloseWithError(xerrors.Errorf("read ping echo channel: %w", err)) return } From 0474aa934e1fae2604c37c37f998927dfa356f81 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 21 Jul 2022 15:48:16 +0300 Subject: [PATCH 5/8] fix: Improve teardown and timeout of peer tests --- peer/conn_test.go | 53 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/peer/conn_test.go b/peer/conn_test.go index 20f4c84638b0c..e528507a9fb0b 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -91,6 +91,8 @@ func TestConn(t *testing.T) { // Create a channel that closes on disconnect. channel, err := server.CreateChannel(context.Background(), "wow", nil) assert.NoError(t, err) + defer channel.Close() + err = wan.Stop() require.NoError(t, err) // Once the connection is marked as disconnected, this @@ -107,10 +109,13 @@ func TestConn(t *testing.T) { t.Parallel() client, server, _ := createPair(t) exchange(t, client, server) - cch, err := client.CreateChannel(context.Background(), "hello", &peer.ChannelOptions{}) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + cch, err := client.CreateChannel(ctx, "hello", &peer.ChannelOptions{}) require.NoError(t, err) + defer cch.Close() - sch, err := server.Accept(context.Background()) + sch, err := server.Accept(ctx) require.NoError(t, err) defer sch.Close() @@ -123,9 +128,12 @@ func TestConn(t *testing.T) { t.Parallel() client, server, wan := createPair(t) exchange(t, client, server) - cch, err := client.CreateChannel(context.Background(), "hello", &peer.ChannelOptions{}) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + cch, err := client.CreateChannel(ctx, "hello", &peer.ChannelOptions{}) require.NoError(t, err) - sch, err := server.Accept(context.Background()) + defer cch.Close() + sch, err := server.Accept(ctx) require.NoError(t, err) defer sch.Close() @@ -170,13 +178,29 @@ func TestConn(t *testing.T) { srv, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) defer srv.Close() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() go func() { - sch, err := server.Accept(context.Background()) - assert.NoError(t, err) + sch, err := server.Accept(ctx) + if err != nil { + assert.NoError(t, err) + return + } + defer sch.Close() + nc2 := sch.NetConn() + defer nc2.Close() + nc1, err := net.Dial("tcp", srv.Addr().String()) - assert.NoError(t, err) + if err != nil { + assert.NoError(t, err) + return + } + defer nc1.Close() + go func() { + defer nc1.Close() + defer nc2.Close() _, _ = io.Copy(nc1, nc2) }() _, _ = io.Copy(nc2, nc1) @@ -204,7 +228,7 @@ func TestConn(t *testing.T) { c := http.Client{ Transport: defaultTransport, } - req, err := http.NewRequestWithContext(context.Background(), "GET", "http://localhost/", nil) + req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost/", nil) require.NoError(t, err) resp, err := c.Do(req) require.NoError(t, err) @@ -272,14 +296,21 @@ func TestConn(t *testing.T) { t.Parallel() client, server, _ := createPair(t) exchange(t, client, server) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() go func() { - channel, err := client.CreateChannel(context.Background(), "test", nil) - assert.NoError(t, err) + channel, err := client.CreateChannel(ctx, "test", nil) + if err != nil { + assert.NoError(t, err) + return + } + defer channel.Close() _, err = channel.Write([]byte{1, 2}) assert.NoError(t, err) }() - channel, err := server.Accept(context.Background()) + channel, err := server.Accept(ctx) require.NoError(t, err) + defer channel.Close() data := make([]byte, 1) _, err = channel.Read(data) require.NoError(t, err) From 5e33a9a4a72681b27354050f20f24f3edef4e6d8 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 21 Jul 2022 15:49:00 +0300 Subject: [PATCH 6/8] fix: Improve robustness of TestConn/Buffering test --- peer/conn_test.go | 46 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/peer/conn_test.go b/peer/conn_test.go index e528507a9fb0b..d1fbf63d15ab6 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -148,26 +148,44 @@ func TestConn(t *testing.T) { t.Parallel() client, server, _ := createPair(t) exchange(t, client, server) - cch, err := client.CreateChannel(context.Background(), "hello", &peer.ChannelOptions{}) - require.NoError(t, err) - sch, err := server.Accept(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + cch, err := client.CreateChannel(ctx, "hello", &peer.ChannelOptions{}) require.NoError(t, err) - defer sch.Close() + defer cch.Close() + + readErr := make(chan error, 1) go func() { + sch, err := server.Accept(ctx) + if err != nil { + readErr <- err + _ = cch.Close() + return + } + defer sch.Close() + bytes := make([]byte, 4096) - for i := 0; i < 1024; i++ { - _, err := cch.Write(bytes) - require.NoError(t, err) + for { + _, err = sch.Read(bytes) + if err != nil { + readErr <- err + return + } } - _ = cch.Close() }() + bytes := make([]byte, 4096) - for { - _, err = sch.Read(bytes) - if err != nil { - require.ErrorIs(t, err, peer.ErrClosed) - break - } + for i := 0; i < 1024; i++ { + _, err = cch.Write(bytes) + require.NoError(t, err, "write i=%d", i) + } + _ = cch.Close() + + select { + case err = <-readErr: + require.ErrorIs(t, err, peer.ErrClosed, "read error") + case <-ctx.Done(): + require.Fail(t, "timeout waiting for read error") } }) From ca349ca9cf74fb15e9110b65d5178505ac1f10a6 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 21 Jul 2022 16:48:33 +0300 Subject: [PATCH 7/8] Update peer/channel.go Co-authored-by: Steven Masley --- peer/channel.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/peer/channel.go b/peer/channel.go index 46668b87fb152..07ff0ff4cddb3 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -306,13 +306,10 @@ func (c *Channel) isClosed() bool { func (c *Channel) waitOpened() error { select { case <-c.opened: - // Re-check to prioritize the closed channel. - select { - case <-c.closed: + if c.isClosed() { return c.closeError - default: - return nil } + return nil case <-c.closed: return c.closeError } From 445e8b097a10998eb85e9de95af2cea1bae7d0a0 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 21 Jul 2022 16:51:10 +0300 Subject: [PATCH 8/8] chore: Re-introduce re-check comment --- peer/channel.go | 1 + 1 file changed, 1 insertion(+) diff --git a/peer/channel.go b/peer/channel.go index 07ff0ff4cddb3..d7119d1eafb7d 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -306,6 +306,7 @@ func (c *Channel) isClosed() bool { func (c *Channel) waitOpened() error { select { case <-c.opened: + // Re-check the closed channel to prioritize closure. if c.isClosed() { return c.closeError }