From 310eb826487a82e7b0327bb0e4914a47b4950c94 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 02:38:36 +0000 Subject: [PATCH 01/25] chore: Buffer remote candidates like local This was added for local candidates, and is required for remote to prevent a race where they are added before a negotiation is complete. I removed the mutex earlier, because it would cause a different race. I didn't realize the remote candidates wouldn't be buffered, but with this change they are! --- peer/conn.go | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index 11e2a05f0ca89..1b7ab75ad07c6 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -73,7 +73,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp dcFailedChannel: make(chan struct{}), localCandidateChannel: make(chan webrtc.ICECandidateInit), localSessionDescriptionChannel: make(chan webrtc.SessionDescription), - pendingCandidates: make([]webrtc.ICECandidateInit, 0), + pendingLocalCandidates: make([]webrtc.ICECandidateInit, 0), + pendingRemoteCandidates: make([]webrtc.ICECandidateInit, 0), remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription), } if client { @@ -119,9 +120,13 @@ type Conn struct { localCandidateChannel chan webrtc.ICECandidateInit localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription + remoteSessionDescriptionMutex sync.Mutex - pendingCandidates []webrtc.ICECandidateInit - pendingCandidatesMutex sync.Mutex + pendingLocalCandidates []webrtc.ICECandidateInit + pendingLocalCandidatesMutex sync.Mutex + + pendingRemoteCandidates []webrtc.ICECandidateInit + pendingRemoteCandidatesMutex sync.Mutex pingChannelID uint16 pingEchoChannelID uint16 @@ -144,10 +149,10 @@ func (c *Conn) init() error { // ICE Candidates on a remote peer are reset when an offer // is received. We must wait until the offer<->answer has // been negotiated to flush candidates. - c.pendingCandidatesMutex.Lock() - defer c.pendingCandidatesMutex.Unlock() + c.pendingLocalCandidatesMutex.Lock() + defer c.pendingLocalCandidatesMutex.Unlock() if c.rtc.RemoteDescription() == nil { - c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON()) + c.pendingLocalCandidates = append(c.pendingLocalCandidates, iceCandidate.ToJSON()) return } select { @@ -254,6 +259,10 @@ func (c *Conn) pingEchoChannel() (*Channel, error) { func (c *Conn) negotiate() { c.opts.Logger.Debug(context.Background(), "negotiating") + // Locks while the negotiation for a remote session + // description is taking place. + c.remoteSessionDescriptionMutex.Lock() + defer c.remoteSessionDescriptionMutex.Unlock() if c.offerrer { offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) @@ -307,16 +316,27 @@ func (c *Conn) negotiate() { } } - c.pendingCandidatesMutex.Lock() - defer c.pendingCandidatesMutex.Unlock() - for _, pendingCandidate := range c.pendingCandidates { + c.pendingLocalCandidatesMutex.Lock() + defer c.pendingLocalCandidatesMutex.Unlock() + for _, pendingCandidate := range c.pendingLocalCandidates { select { case <-c.closed: return case c.localCandidateChannel <- pendingCandidate: } } - c.pendingCandidates = make([]webrtc.ICECandidateInit, 0) + c.pendingLocalCandidates = make([]webrtc.ICECandidateInit, 0) + + c.pendingRemoteCandidatesMutex.Lock() + defer c.pendingRemoteCandidatesMutex.Unlock() + for _, pendingCandidate := range c.pendingRemoteCandidates { + err = c.rtc.AddICECandidate(pendingCandidate) + if err != nil { + _ = c.CloseWithError(xerrors.Errorf("add remote candidate: %w", err)) + return + } + } + c.pendingRemoteCandidates = make([]webrtc.ICECandidateInit, 0) c.opts.Logger.Debug(context.Background(), "flushed candidates") } @@ -328,6 +348,12 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { + c.pendingRemoteCandidatesMutex.Lock() + defer c.pendingRemoteCandidatesMutex.Unlock() + if c.rtc.RemoteDescription() == nil { + c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) + return nil + } return c.rtc.AddICECandidate(i) } From b874a1e2cc18d72748c5cff536300862ef463288 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 02:46:45 +0000 Subject: [PATCH 02/25] Use local description instead --- peer/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer/conn.go b/peer/conn.go index 1b7ab75ad07c6..d8400d42251c3 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -350,7 +350,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { c.pendingRemoteCandidatesMutex.Lock() defer c.pendingRemoteCandidatesMutex.Unlock() - if c.rtc.RemoteDescription() == nil { + if c.rtc.LocalDescription() == nil { c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) return nil } From 411542aa88cbeb29c5a641863138ef6e74184a06 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 02:50:59 +0000 Subject: [PATCH 03/25] Add logging for candidate flush --- peer/conn.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/peer/conn.go b/peer/conn.go index d8400d42251c3..ea1fd3639d696 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -152,9 +152,11 @@ func (c *Conn) init() error { c.pendingLocalCandidatesMutex.Lock() defer c.pendingLocalCandidatesMutex.Unlock() if c.rtc.RemoteDescription() == nil { + c.opts.Logger.Debug(context.Background(), "adding local candidate to flush queue") c.pendingLocalCandidates = append(c.pendingLocalCandidates, iceCandidate.ToJSON()) return } + c.opts.Logger.Debug(context.Background(), "adding local candidate") select { case <-c.closed: break @@ -319,6 +321,7 @@ func (c *Conn) negotiate() { c.pendingLocalCandidatesMutex.Lock() defer c.pendingLocalCandidatesMutex.Unlock() for _, pendingCandidate := range c.pendingLocalCandidates { + c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { case <-c.closed: return @@ -330,6 +333,7 @@ func (c *Conn) negotiate() { c.pendingRemoteCandidatesMutex.Lock() defer c.pendingRemoteCandidatesMutex.Unlock() for _, pendingCandidate := range c.pendingRemoteCandidates { + c.opts.Logger.Debug(context.Background(), "flushing remote candidate") err = c.rtc.AddICECandidate(pendingCandidate) if err != nil { _ = c.CloseWithError(xerrors.Errorf("add remote candidate: %w", err)) @@ -351,9 +355,11 @@ func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { c.pendingRemoteCandidatesMutex.Lock() defer c.pendingRemoteCandidatesMutex.Unlock() if c.rtc.LocalDescription() == nil { + c.opts.Logger.Debug(context.Background(), "adding remote candidate to flush queue") c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) return nil } + c.opts.Logger.Debug(context.Background(), "adding remote candidate") return c.rtc.AddICECandidate(i) } From 90e00dbe3f92ddcc62d886fb3e3da3938a268013 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 03:03:10 +0000 Subject: [PATCH 04/25] Fix race with atomic bool --- peer/conn.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index ea1fd3639d696..8aed4b41f2b36 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -120,13 +120,14 @@ type Conn struct { localCandidateChannel chan webrtc.ICECandidateInit localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - remoteSessionDescriptionMutex sync.Mutex - pendingLocalCandidates []webrtc.ICECandidateInit - pendingLocalCandidatesMutex sync.Mutex + pendingLocalCandidatesFlushed atomic.Bool + pendingLocalCandidatesMutex sync.Mutex + pendingLocalCandidates []webrtc.ICECandidateInit - pendingRemoteCandidates []webrtc.ICECandidateInit - pendingRemoteCandidatesMutex sync.Mutex + pendingRemoteCandidatesFlushed atomic.Bool + pendingRemoteCandidatesMutex sync.Mutex + pendingRemoteCandidates []webrtc.ICECandidateInit pingChannelID uint16 pingEchoChannelID uint16 @@ -151,7 +152,7 @@ func (c *Conn) init() error { // been negotiated to flush candidates. c.pendingLocalCandidatesMutex.Lock() defer c.pendingLocalCandidatesMutex.Unlock() - if c.rtc.RemoteDescription() == nil { + if !c.pendingLocalCandidatesFlushed.Load() { c.opts.Logger.Debug(context.Background(), "adding local candidate to flush queue") c.pendingLocalCandidates = append(c.pendingLocalCandidates, iceCandidate.ToJSON()) return @@ -261,10 +262,8 @@ func (c *Conn) pingEchoChannel() (*Channel, error) { func (c *Conn) negotiate() { c.opts.Logger.Debug(context.Background(), "negotiating") - // Locks while the negotiation for a remote session - // description is taking place. - c.remoteSessionDescriptionMutex.Lock() - defer c.remoteSessionDescriptionMutex.Unlock() + c.pendingLocalCandidatesFlushed.Store(false) + c.pendingRemoteCandidatesFlushed.Store(false) if c.offerrer { offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) @@ -318,6 +317,7 @@ func (c *Conn) negotiate() { } } + c.pendingLocalCandidatesFlushed.Store(true) c.pendingLocalCandidatesMutex.Lock() defer c.pendingLocalCandidatesMutex.Unlock() for _, pendingCandidate := range c.pendingLocalCandidates { @@ -332,6 +332,7 @@ func (c *Conn) negotiate() { c.pendingRemoteCandidatesMutex.Lock() defer c.pendingRemoteCandidatesMutex.Unlock() + c.pendingRemoteCandidatesFlushed.Store(true) for _, pendingCandidate := range c.pendingRemoteCandidates { c.opts.Logger.Debug(context.Background(), "flushing remote candidate") err = c.rtc.AddICECandidate(pendingCandidate) @@ -354,7 +355,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { c.pendingRemoteCandidatesMutex.Lock() defer c.pendingRemoteCandidatesMutex.Unlock() - if c.rtc.LocalDescription() == nil { + if !c.pendingRemoteCandidatesFlushed.Load() { c.opts.Logger.Debug(context.Background(), "adding remote candidate to flush queue") c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) return nil From 85e6defc534d2d6ab82ac35754f22abcc6dbcb56 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 03:23:36 +0000 Subject: [PATCH 05/25] Simplify locks --- peer/conn.go | 56 ++++++++++------------------------------------------ 1 file changed, 10 insertions(+), 46 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index 8aed4b41f2b36..6cc7885f0ed17 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -72,9 +72,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp dcDisconnectChannel: make(chan struct{}), dcFailedChannel: make(chan struct{}), localCandidateChannel: make(chan webrtc.ICECandidateInit), + localCandidateBuffer: make([]webrtc.ICECandidateInit, 0), localSessionDescriptionChannel: make(chan webrtc.SessionDescription), - pendingLocalCandidates: make([]webrtc.ICECandidateInit, 0), - pendingRemoteCandidates: make([]webrtc.ICECandidateInit, 0), remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription), } if client { @@ -121,13 +120,8 @@ type Conn struct { localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - pendingLocalCandidatesFlushed atomic.Bool - pendingLocalCandidatesMutex sync.Mutex - pendingLocalCandidates []webrtc.ICECandidateInit - - pendingRemoteCandidatesFlushed atomic.Bool - pendingRemoteCandidatesMutex sync.Mutex - pendingRemoteCandidates []webrtc.ICECandidateInit + localCandidateMutex sync.Mutex + localCandidateBuffer []webrtc.ICECandidateInit pingChannelID uint16 pingEchoChannelID uint16 @@ -147,15 +141,11 @@ func (c *Conn) init() error { if iceCandidate == nil { return } - // ICE Candidates on a remote peer are reset when an offer - // is received. We must wait until the offer<->answer has - // been negotiated to flush candidates. - c.pendingLocalCandidatesMutex.Lock() - defer c.pendingLocalCandidatesMutex.Unlock() - if !c.pendingLocalCandidatesFlushed.Load() { - c.opts.Logger.Debug(context.Background(), "adding local candidate to flush queue") - c.pendingLocalCandidates = append(c.pendingLocalCandidates, iceCandidate.ToJSON()) - return + c.localCandidateMutex.Lock() + defer c.localCandidateMutex.Unlock() + if c.rtc.RemoteDescription() == nil { + c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") + c.localCandidateBuffer = append(c.localCandidateBuffer, iceCandidate.ToJSON()) } c.opts.Logger.Debug(context.Background(), "adding local candidate") select { @@ -262,8 +252,6 @@ func (c *Conn) pingEchoChannel() (*Channel, error) { func (c *Conn) negotiate() { c.opts.Logger.Debug(context.Background(), "negotiating") - c.pendingLocalCandidatesFlushed.Store(false) - c.pendingRemoteCandidatesFlushed.Store(false) if c.offerrer { offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) @@ -317,31 +305,14 @@ func (c *Conn) negotiate() { } } - c.pendingLocalCandidatesFlushed.Store(true) - c.pendingLocalCandidatesMutex.Lock() - defer c.pendingLocalCandidatesMutex.Unlock() - for _, pendingCandidate := range c.pendingLocalCandidates { + for _, localCandidate := range c.localCandidateBuffer { c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { case <-c.closed: return - case c.localCandidateChannel <- pendingCandidate: + case c.localCandidateChannel <- localCandidate: } } - c.pendingLocalCandidates = make([]webrtc.ICECandidateInit, 0) - - c.pendingRemoteCandidatesMutex.Lock() - defer c.pendingRemoteCandidatesMutex.Unlock() - c.pendingRemoteCandidatesFlushed.Store(true) - for _, pendingCandidate := range c.pendingRemoteCandidates { - c.opts.Logger.Debug(context.Background(), "flushing remote candidate") - err = c.rtc.AddICECandidate(pendingCandidate) - if err != nil { - _ = c.CloseWithError(xerrors.Errorf("add remote candidate: %w", err)) - return - } - } - c.pendingRemoteCandidates = make([]webrtc.ICECandidateInit, 0) c.opts.Logger.Debug(context.Background(), "flushed candidates") } @@ -353,13 +324,6 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { - c.pendingRemoteCandidatesMutex.Lock() - defer c.pendingRemoteCandidatesMutex.Unlock() - if !c.pendingRemoteCandidatesFlushed.Load() { - c.opts.Logger.Debug(context.Background(), "adding remote candidate to flush queue") - c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) - return nil - } c.opts.Logger.Debug(context.Background(), "adding remote candidate") return c.rtc.AddICECandidate(i) } From 7f26a994e2398bdc45134d137272dd1c7c78ab32 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 03:26:40 +0000 Subject: [PATCH 06/25] Add mutex to flush --- peer/conn.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/peer/conn.go b/peer/conn.go index 6cc7885f0ed17..1fa38b512a334 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -305,6 +305,8 @@ func (c *Conn) negotiate() { } } + c.localCandidateMutex.Lock() + defer c.localCandidateMutex.Unlock() for _, localCandidate := range c.localCandidateBuffer { c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { From a56560bb9e25245474c7ee532b04cc303ee82b81 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 03:28:32 +0000 Subject: [PATCH 07/25] Reset buffer --- peer/conn.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/peer/conn.go b/peer/conn.go index 1fa38b512a334..07303a3d06a18 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -146,6 +146,7 @@ func (c *Conn) init() error { if c.rtc.RemoteDescription() == nil { c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") c.localCandidateBuffer = append(c.localCandidateBuffer, iceCandidate.ToJSON()) + return } c.opts.Logger.Debug(context.Background(), "adding local candidate") select { @@ -315,6 +316,7 @@ func (c *Conn) negotiate() { case c.localCandidateChannel <- localCandidate: } } + c.localCandidateBuffer = make([]webrtc.ICECandidateInit, 0) c.opts.Logger.Debug(context.Background(), "flushed candidates") } From 58445d5b83972992d8042da6e80d33e72fda7c9a Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 03:37:41 +0000 Subject: [PATCH 08/25] Remove leak dependency to limit confusion --- go.mod | 3 --- go.sum | 7 +++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 90afdb8faf365..7505a34a7167c 100644 --- a/go.mod +++ b/go.mod @@ -2,9 +2,6 @@ module github.com/coder/coder go 1.17 -// Required until https://github.com/pion/ice/pull/413 is merged. -replace github.com/pion/ice/v2 => github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344 - // Required until https://github.com/hashicorp/terraform-config-inspect/pull/74 is merged. replace github.com/hashicorp/terraform-config-inspect => github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88 diff --git a/go.sum b/go.sum index add8a4cc62d3c..564c6aaae7be9 100644 --- a/go.sum +++ b/go.sum @@ -835,8 +835,6 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq19sBYvuMoyQ4= -github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344 h1:rXpDqMPlbnKASSBFwPrJbT2wEL5jZzIX/i0cvwISxlM= -github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344/go.mod h1:E5frMpIJ3zzcQiRo+XyT7z1IiAsGc1hDURcVJQUzGWA= github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88 h1:tvG/qs5c4worwGyGnbbb4i/dYYLjpFwDMqcIT3awAf8= github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88/go.mod h1:Z0Nnk4+3Cy89smEbrq+sl1bxc9198gIP4I7wcQF6Kqs= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= @@ -1014,8 +1012,12 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pion/datachannel v1.5.2 h1:piB93s8LGmbECrpO84DnkIVWasRMk3IimbcXkTQLE6E= github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ= +github.com/pion/dtls/v2 v2.0.13/go.mod h1:OaE7eTM+ppaUhJ99OTO4aHl9uY6vPrT1gPY27uNTxRY= github.com/pion/dtls/v2 v2.1.0 h1:g6gtKVNLp6URDkv9OijFJl16kqGHzVzZG+Fa4A38GTY= github.com/pion/dtls/v2 v2.1.0/go.mod h1:qG3gA7ZPZemBqpEFqRKyURYdKEwFZQCGb7gv9T3ON3Y= +github.com/pion/ice/v2 v2.1.18/go.mod h1:9jDr0iIUg8P6+0Jq8QJ/eFSkX3JnsPd293TjCdkfpTs= +github.com/pion/ice/v2 v2.1.19 h1:z7iVx/fHlqvPILUbvcj1xjuz/6eVKgEFOM8h1AuLbF8= +github.com/pion/ice/v2 v2.1.19/go.mod h1:E5frMpIJ3zzcQiRo+XyT7z1IiAsGc1hDURcVJQUzGWA= github.com/pion/interceptor v0.1.6/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U= github.com/pion/interceptor v0.1.7 h1:HThW0tIIKT9RRoDWGURe8rlZVOx0fJHxBHpA0ej0+bo= github.com/pion/interceptor v0.1.7/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U= @@ -1311,6 +1313,7 @@ golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8 h1:kACShD3qhmr/3rLmg1yXyt+N4HcwutKyPRB93s54TIU= golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= From 6a07a9946dfd71a38326871f9656afb7efc679aa Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 03:46:15 +0000 Subject: [PATCH 09/25] Fix ordering --- peer/conn.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/peer/conn.go b/peer/conn.go index 07303a3d06a18..2c3747f47307d 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -143,7 +143,11 @@ func (c *Conn) init() error { } c.localCandidateMutex.Lock() defer c.localCandidateMutex.Unlock() - if c.rtc.RemoteDescription() == nil { + queue := c.rtc.RemoteDescription() == nil + if !c.offerrer { + queue = c.rtc.LocalDescription() == nil + } + if queue { c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") c.localCandidateBuffer = append(c.localCandidateBuffer, iceCandidate.ToJSON()) return From 3272e1332b29d5a491d9cbeb660f7336bb704048 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 03:57:26 +0000 Subject: [PATCH 10/25] Revert channel close --- peer/channel.go | 9 +++++---- peer/conn.go | 10 +++------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/peer/channel.go b/peer/channel.go index 8c3f5118996c1..8d1d4694d220d 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -271,16 +271,17 @@ func (c *Channel) closeWithError(err error) error { } else { c.closeError = err } - if c.rwc != nil { - _ = c.rwc.Close() - } - _ = c.dc.Close() close(c.closed) close(c.sendMore) c.conn.dcDisconnectListeners.Sub(1) c.conn.dcFailedListeners.Sub(1) c.conn.dcClosedWaitGroup.Done() + + if c.rwc != nil { + _ = c.rwc.Close() + } + _ = c.dc.Close() return err } diff --git a/peer/conn.go b/peer/conn.go index 2c3747f47307d..c30c744800299 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -141,14 +141,10 @@ func (c *Conn) init() error { if iceCandidate == nil { return } - c.localCandidateMutex.Lock() - defer c.localCandidateMutex.Unlock() - queue := c.rtc.RemoteDescription() == nil - if !c.offerrer { - queue = c.rtc.LocalDescription() == nil - } - if queue { + if c.rtc.RemoteDescription() == nil { c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") + c.localCandidateMutex.Lock() + defer c.localCandidateMutex.Unlock() c.localCandidateBuffer = append(c.localCandidateBuffer, iceCandidate.ToJSON()) return } From 877ae595996ca8f1c3b21a4e27634e53bc42082e Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 04:05:07 +0000 Subject: [PATCH 11/25] Flush candidates after remote session description is set --- peer/conn.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index c30c744800299..0f2af1f07b84e 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -285,6 +285,19 @@ func (c *Conn) negotiate() { return } + c.localCandidateMutex.Lock() + defer c.localCandidateMutex.Unlock() + for _, localCandidate := range c.localCandidateBuffer { + c.opts.Logger.Debug(context.Background(), "flushing local candidate") + select { + case <-c.closed: + return + case c.localCandidateChannel <- localCandidate: + } + } + c.localCandidateBuffer = make([]webrtc.ICECandidateInit, 0) + c.opts.Logger.Debug(context.Background(), "flushed candidates") + if !c.offerrer { answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) if err != nil { @@ -305,19 +318,6 @@ func (c *Conn) negotiate() { case c.localSessionDescriptionChannel <- answer: } } - - c.localCandidateMutex.Lock() - defer c.localCandidateMutex.Unlock() - for _, localCandidate := range c.localCandidateBuffer { - c.opts.Logger.Debug(context.Background(), "flushing local candidate") - select { - case <-c.closed: - return - case c.localCandidateChannel <- localCandidate: - } - } - c.localCandidateBuffer = make([]webrtc.ICECandidateInit, 0) - c.opts.Logger.Debug(context.Background(), "flushed candidates") } // LocalCandidate returns a channel that emits when a local candidate From 1e6a9234e8957267c098849ec1bbc90908dc84ab Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 04:19:38 +0000 Subject: [PATCH 12/25] Bump up count to ensure race is fixed --- .github/workflows/coder.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/coder.yaml b/.github/workflows/coder.yaml index 8467dbe81ba4d..ddad179b626fb 100644 --- a/.github/workflows/coder.yaml +++ b/.github/workflows/coder.yaml @@ -152,8 +152,8 @@ jobs: - name: Test with Mock Database run: gotestsum --jsonfile="gotests.json" --packages="./..." -- - -covermode=atomic -coverprofile="gotests.coverage" -timeout=3m - -count=3 -race -parallel=2 + -covermode=atomic -coverprofile="gotests.coverage" -timeout=30m + -count=100 -race -parallel=2 - name: Test with PostgreSQL Database if: runner.os == 'Linux' From 0c02c7865caa704bda5aa68043a4660477217c2d Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 04:36:57 +0000 Subject: [PATCH 13/25] Use custom ICE dependency --- .github/workflows/coder.yaml | 2 +- go.mod | 3 +++ go.sum | 7 ++----- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/coder.yaml b/.github/workflows/coder.yaml index ddad179b626fb..c1299134b48e7 100644 --- a/.github/workflows/coder.yaml +++ b/.github/workflows/coder.yaml @@ -151,7 +151,7 @@ jobs: - name: Test with Mock Database run: - gotestsum --jsonfile="gotests.json" --packages="./..." -- + gotestsum --jsonfile="gotests.json" --packages="./peer" -- -covermode=atomic -coverprofile="gotests.coverage" -timeout=30m -count=100 -race -parallel=2 diff --git a/go.mod b/go.mod index 7505a34a7167c..90afdb8faf365 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,9 @@ module github.com/coder/coder go 1.17 +// Required until https://github.com/pion/ice/pull/413 is merged. +replace github.com/pion/ice/v2 => github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344 + // Required until https://github.com/hashicorp/terraform-config-inspect/pull/74 is merged. replace github.com/hashicorp/terraform-config-inspect => github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88 diff --git a/go.sum b/go.sum index 564c6aaae7be9..add8a4cc62d3c 100644 --- a/go.sum +++ b/go.sum @@ -835,6 +835,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq19sBYvuMoyQ4= +github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344 h1:rXpDqMPlbnKASSBFwPrJbT2wEL5jZzIX/i0cvwISxlM= +github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344/go.mod h1:E5frMpIJ3zzcQiRo+XyT7z1IiAsGc1hDURcVJQUzGWA= github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88 h1:tvG/qs5c4worwGyGnbbb4i/dYYLjpFwDMqcIT3awAf8= github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88/go.mod h1:Z0Nnk4+3Cy89smEbrq+sl1bxc9198gIP4I7wcQF6Kqs= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= @@ -1012,12 +1014,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pion/datachannel v1.5.2 h1:piB93s8LGmbECrpO84DnkIVWasRMk3IimbcXkTQLE6E= github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ= -github.com/pion/dtls/v2 v2.0.13/go.mod h1:OaE7eTM+ppaUhJ99OTO4aHl9uY6vPrT1gPY27uNTxRY= github.com/pion/dtls/v2 v2.1.0 h1:g6gtKVNLp6URDkv9OijFJl16kqGHzVzZG+Fa4A38GTY= github.com/pion/dtls/v2 v2.1.0/go.mod h1:qG3gA7ZPZemBqpEFqRKyURYdKEwFZQCGb7gv9T3ON3Y= -github.com/pion/ice/v2 v2.1.18/go.mod h1:9jDr0iIUg8P6+0Jq8QJ/eFSkX3JnsPd293TjCdkfpTs= -github.com/pion/ice/v2 v2.1.19 h1:z7iVx/fHlqvPILUbvcj1xjuz/6eVKgEFOM8h1AuLbF8= -github.com/pion/ice/v2 v2.1.19/go.mod h1:E5frMpIJ3zzcQiRo+XyT7z1IiAsGc1hDURcVJQUzGWA= github.com/pion/interceptor v0.1.6/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U= github.com/pion/interceptor v0.1.7 h1:HThW0tIIKT9RRoDWGURe8rlZVOx0fJHxBHpA0ej0+bo= github.com/pion/interceptor v0.1.7/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U= @@ -1313,7 +1311,6 @@ golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8 h1:kACShD3qhmr/3rLmg1yXyt+N4HcwutKyPRB93s54TIU= golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= From 0439e23f24bc51a3da2eec709b09294e6ca1ea91 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 04:43:25 +0000 Subject: [PATCH 14/25] Fix data race --- peer/channel.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/peer/channel.go b/peer/channel.go index 8d1d4694d220d..2c95d495c4d65 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -271,6 +271,10 @@ func (c *Channel) closeWithError(err error) error { } else { c.closeError = err } + if c.rwc != nil { + _ = c.rwc.Close() + } + _ = c.dc.Close() close(c.closed) close(c.sendMore) @@ -278,10 +282,6 @@ func (c *Channel) closeWithError(err error) error { c.conn.dcFailedListeners.Sub(1) c.conn.dcClosedWaitGroup.Done() - if c.rwc != nil { - _ = c.rwc.Close() - } - _ = c.dc.Close() return err } From a8b5a07f3976a2c1a93a3747e22af9da94798e39 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 04:48:01 +0000 Subject: [PATCH 15/25] Lower timeout to make for fast CI --- .github/workflows/coder.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/coder.yaml b/.github/workflows/coder.yaml index c1299134b48e7..8467dbe81ba4d 100644 --- a/.github/workflows/coder.yaml +++ b/.github/workflows/coder.yaml @@ -151,9 +151,9 @@ jobs: - name: Test with Mock Database run: - gotestsum --jsonfile="gotests.json" --packages="./peer" -- - -covermode=atomic -coverprofile="gotests.coverage" -timeout=30m - -count=100 -race -parallel=2 + gotestsum --jsonfile="gotests.json" --packages="./..." -- + -covermode=atomic -coverprofile="gotests.coverage" -timeout=3m + -count=3 -race -parallel=2 - name: Test with PostgreSQL Database if: runner.os == 'Linux' From d74f4544202378989349e648a2fd18bbc52f3328 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 05:19:37 +0000 Subject: [PATCH 16/25] Add back mutex to prevent race --- peer/conn.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/peer/conn.go b/peer/conn.go index 0f2af1f07b84e..0dabcc6287862 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -119,6 +119,7 @@ type Conn struct { localCandidateChannel chan webrtc.ICECandidateInit localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription + remoteSessionDescriptionMutex sync.Mutex localCandidateMutex sync.Mutex localCandidateBuffer []webrtc.ICECandidateInit @@ -253,6 +254,8 @@ func (c *Conn) pingEchoChannel() (*Channel, error) { func (c *Conn) negotiate() { c.opts.Logger.Debug(context.Background(), "negotiating") + c.remoteSessionDescriptionMutex.Lock() + defer c.remoteSessionDescriptionMutex.Unlock() if c.offerrer { offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) @@ -286,7 +289,6 @@ func (c *Conn) negotiate() { } c.localCandidateMutex.Lock() - defer c.localCandidateMutex.Unlock() for _, localCandidate := range c.localCandidateBuffer { c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { @@ -296,6 +298,7 @@ func (c *Conn) negotiate() { } } c.localCandidateBuffer = make([]webrtc.ICECandidateInit, 0) + c.localCandidateMutex.Unlock() c.opts.Logger.Debug(context.Background(), "flushed candidates") if !c.offerrer { @@ -328,6 +331,8 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { + c.remoteSessionDescriptionMutex.Lock() + defer c.remoteSessionDescriptionMutex.Unlock() c.opts.Logger.Debug(context.Background(), "adding remote candidate") return c.rtc.AddICECandidate(i) } From b613b6d05b9df2174530a422e38cd31e60abf014 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 05:24:19 +0000 Subject: [PATCH 17/25] Improve debug logging --- peer/conn.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index 0dabcc6287862..2dca08b2d6ed3 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -298,8 +298,8 @@ func (c *Conn) negotiate() { } } c.localCandidateBuffer = make([]webrtc.ICECandidateInit, 0) - c.localCandidateMutex.Unlock() c.opts.Logger.Debug(context.Background(), "flushed candidates") + c.localCandidateMutex.Unlock() if !c.offerrer { answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) @@ -331,9 +331,10 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { + c.opts.Logger.Debug(context.Background(), "locked on remote candidate") c.remoteSessionDescriptionMutex.Lock() defer c.remoteSessionDescriptionMutex.Unlock() - c.opts.Logger.Debug(context.Background(), "adding remote candidate") + c.opts.Logger.Debug(context.Background(), "added remote candidate") return c.rtc.AddICECandidate(i) } From ba878caecbbd6cee287071f2912b1df56d9fcc2c Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 14:04:07 +0000 Subject: [PATCH 18/25] Lock on local description --- peer/conn.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index 2dca08b2d6ed3..b2b1253fdd9e2 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -119,7 +119,6 @@ type Conn struct { localCandidateChannel chan webrtc.ICECandidateInit localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - remoteSessionDescriptionMutex sync.Mutex localCandidateMutex sync.Mutex localCandidateBuffer []webrtc.ICECandidateInit @@ -142,8 +141,15 @@ func (c *Conn) init() error { if iceCandidate == nil { return } - if c.rtc.RemoteDescription() == nil { - c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") + if !c.offerrer && c.rtc.LocalDescription() == nil { + c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer with local description") + c.localCandidateMutex.Lock() + defer c.localCandidateMutex.Unlock() + c.localCandidateBuffer = append(c.localCandidateBuffer, iceCandidate.ToJSON()) + return + } + if c.offerrer && c.rtc.RemoteDescription() == nil { + c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer with remote description") c.localCandidateMutex.Lock() defer c.localCandidateMutex.Unlock() c.localCandidateBuffer = append(c.localCandidateBuffer, iceCandidate.ToJSON()) @@ -254,8 +260,6 @@ func (c *Conn) pingEchoChannel() (*Channel, error) { func (c *Conn) negotiate() { c.opts.Logger.Debug(context.Background(), "negotiating") - c.remoteSessionDescriptionMutex.Lock() - defer c.remoteSessionDescriptionMutex.Unlock() if c.offerrer { offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) @@ -331,10 +335,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { - c.opts.Logger.Debug(context.Background(), "locked on remote candidate") - c.remoteSessionDescriptionMutex.Lock() - defer c.remoteSessionDescriptionMutex.Unlock() - c.opts.Logger.Debug(context.Background(), "added remote candidate") + c.opts.Logger.Debug(context.Background(), "adding remote candidate") return c.rtc.AddICECandidate(i) } From ef4192166aa01c7d52b5f3648aad63fd2891b040 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 14:08:49 +0000 Subject: [PATCH 19/25] Flush local candidates uniquely --- peer/conn.go | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index b2b1253fdd9e2..16e100a2ff641 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -141,14 +141,7 @@ func (c *Conn) init() error { if iceCandidate == nil { return } - if !c.offerrer && c.rtc.LocalDescription() == nil { - c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer with local description") - c.localCandidateMutex.Lock() - defer c.localCandidateMutex.Unlock() - c.localCandidateBuffer = append(c.localCandidateBuffer, iceCandidate.ToJSON()) - return - } - if c.offerrer && c.rtc.RemoteDescription() == nil { + if c.rtc.RemoteDescription() == nil { c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer with remote description") c.localCandidateMutex.Lock() defer c.localCandidateMutex.Unlock() @@ -292,18 +285,9 @@ func (c *Conn) negotiate() { return } - c.localCandidateMutex.Lock() - for _, localCandidate := range c.localCandidateBuffer { - c.opts.Logger.Debug(context.Background(), "flushing local candidate") - select { - case <-c.closed: - return - case c.localCandidateChannel <- localCandidate: - } + if c.offerrer { + c.flushLocalCandidates() } - c.localCandidateBuffer = make([]webrtc.ICECandidateInit, 0) - c.opts.Logger.Debug(context.Background(), "flushed candidates") - c.localCandidateMutex.Unlock() if !c.offerrer { answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) @@ -324,7 +308,24 @@ func (c *Conn) negotiate() { return case c.localSessionDescriptionChannel <- answer: } + + c.flushLocalCandidates() + } +} + +func (c *Conn) flushLocalCandidates() { + c.localCandidateMutex.Lock() + for _, localCandidate := range c.localCandidateBuffer { + c.opts.Logger.Debug(context.Background(), "flushing local candidate") + select { + case <-c.closed: + return + case c.localCandidateChannel <- localCandidate: + } } + c.localCandidateBuffer = make([]webrtc.ICECandidateInit, 0) + c.opts.Logger.Debug(context.Background(), "flushed candidates") + c.localCandidateMutex.Unlock() } // LocalCandidate returns a channel that emits when a local candidate From 3515fe7553d0c5938cfdd2c8e68ae7b347e819d1 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 14:16:52 +0000 Subject: [PATCH 20/25] Fix race --- peer/conn.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index 16e100a2ff641..a795d84675832 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -72,7 +72,7 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp dcDisconnectChannel: make(chan struct{}), dcFailedChannel: make(chan struct{}), localCandidateChannel: make(chan webrtc.ICECandidateInit), - localCandidateBuffer: make([]webrtc.ICECandidateInit, 0), + pendingCandidates: make([]webrtc.ICECandidateInit, 0), localSessionDescriptionChannel: make(chan webrtc.SessionDescription), remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription), } @@ -120,8 +120,8 @@ type Conn struct { localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - localCandidateMutex sync.Mutex - localCandidateBuffer []webrtc.ICECandidateInit + pendingCandidates []webrtc.ICECandidateInit + pendingCandidatesMutex sync.Mutex pingChannelID uint16 pingEchoChannelID uint16 @@ -143,9 +143,9 @@ func (c *Conn) init() error { } if c.rtc.RemoteDescription() == nil { c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer with remote description") - c.localCandidateMutex.Lock() - defer c.localCandidateMutex.Unlock() - c.localCandidateBuffer = append(c.localCandidateBuffer, iceCandidate.ToJSON()) + c.pendingCandidatesMutex.Lock() + defer c.pendingCandidatesMutex.Unlock() + c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON()) return } c.opts.Logger.Debug(context.Background(), "adding local candidate") @@ -286,7 +286,9 @@ func (c *Conn) negotiate() { } if c.offerrer { - c.flushLocalCandidates() + // ICE candidates reset when an offer/answer is set for the first + // time. If candidates flush before this point, a connection could fail. + c.flushPendingCandidates() } if !c.offerrer { @@ -309,13 +311,16 @@ func (c *Conn) negotiate() { case c.localSessionDescriptionChannel <- answer: } - c.flushLocalCandidates() + // Wait until the local description is set to flush candidates. + c.flushPendingCandidates() } } -func (c *Conn) flushLocalCandidates() { - c.localCandidateMutex.Lock() - for _, localCandidate := range c.localCandidateBuffer { +// flushPendingCandidates writes all local candidates to the candidate send channel. +// The localCandidateChannel is expected to be serviced, otherwise this could block. +func (c *Conn) flushPendingCandidates() { + c.pendingCandidatesMutex.Lock() + for _, localCandidate := range c.pendingCandidates { c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { case <-c.closed: @@ -323,9 +328,9 @@ func (c *Conn) flushLocalCandidates() { case c.localCandidateChannel <- localCandidate: } } - c.localCandidateBuffer = make([]webrtc.ICECandidateInit, 0) + c.pendingCandidates = make([]webrtc.ICECandidateInit, 0) c.opts.Logger.Debug(context.Background(), "flushed candidates") - c.localCandidateMutex.Unlock() + c.pendingCandidatesMutex.Unlock() } // LocalCandidate returns a channel that emits when a local candidate From fbe847cabd4ab39b11891e23e758e72c336804c5 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 14:28:08 +0000 Subject: [PATCH 21/25] Move mutex to prevent candidate send race --- peer/conn.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index a795d84675832..1a65c028bb5fd 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -141,10 +141,11 @@ func (c *Conn) init() error { if iceCandidate == nil { return } + c.pendingCandidatesMutex.Lock() + defer c.pendingCandidatesMutex.Unlock() + if c.rtc.RemoteDescription() == nil { c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer with remote description") - c.pendingCandidatesMutex.Lock() - defer c.pendingCandidatesMutex.Unlock() c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON()) return } From 7321303879d54f55000a53160c62dc636e3f74d1 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 14:40:10 +0000 Subject: [PATCH 22/25] Move lock to handshake so no race can occur --- peer/conn.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index 1a65c028bb5fd..85a3de7bd47c1 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -280,8 +280,11 @@ func (c *Conn) negotiate() { case remoteDescription = <-c.remoteSessionDescriptionChannel: } + // Must lock new candidates from being sent while the description is being flushed. + c.pendingCandidatesMutex.Lock() err := c.rtc.SetRemoteDescription(remoteDescription) if err != nil { + c.pendingCandidatesMutex.Unlock() _ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err)) return } @@ -291,8 +294,11 @@ func (c *Conn) negotiate() { // time. If candidates flush before this point, a connection could fail. c.flushPendingCandidates() } + c.pendingCandidatesMutex.Unlock() if !c.offerrer { + c.pendingCandidatesMutex.Lock() + answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) if err != nil { _ = c.CloseWithError(xerrors.Errorf("create answer: %w", err)) @@ -314,13 +320,13 @@ func (c *Conn) negotiate() { // Wait until the local description is set to flush candidates. c.flushPendingCandidates() + c.pendingCandidatesMutex.Unlock() } } // flushPendingCandidates writes all local candidates to the candidate send channel. // The localCandidateChannel is expected to be serviced, otherwise this could block. func (c *Conn) flushPendingCandidates() { - c.pendingCandidatesMutex.Lock() for _, localCandidate := range c.pendingCandidates { c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { @@ -331,7 +337,6 @@ func (c *Conn) flushPendingCandidates() { } c.pendingCandidates = make([]webrtc.ICECandidateInit, 0) c.opts.Logger.Debug(context.Background(), "flushed candidates") - c.pendingCandidatesMutex.Unlock() } // LocalCandidate returns a channel that emits when a local candidate From bebc74d1c961c9d0646f57cce48bc3423ed3ba4e Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 14:43:52 +0000 Subject: [PATCH 23/25] Reduce timeout to improve test times --- peer/conn.go | 2 +- peer/conn_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index 85a3de7bd47c1..dd2defd52fd43 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -145,7 +145,7 @@ func (c *Conn) init() error { defer c.pendingCandidatesMutex.Unlock() if c.rtc.RemoteDescription() == nil { - c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer with remote description") + c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON()) return } diff --git a/peer/conn_test.go b/peer/conn_test.go index 0b954579c9c27..f964a61bfe832 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -35,11 +35,11 @@ var ( // In CI resources are frequently contended, so increasing this value // results in less flakes. if os.Getenv("CI") == "true" { - return 4 * time.Second + return 3 * time.Second } return 100 * time.Millisecond }() - failedTimeout = disconnectedTimeout * 4 + failedTimeout = disconnectedTimeout * 3 keepAliveInterval = time.Millisecond * 2 // There's a global race in the vnet library allocation code. From 48078d2df0cdd2afa6b76479ac76a87e7b329164 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 14:49:25 +0000 Subject: [PATCH 24/25] Move unlock to defer --- peer/conn.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/peer/conn.go b/peer/conn.go index dd2defd52fd43..b6dcb4b8c6612 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -297,7 +297,9 @@ func (c *Conn) negotiate() { c.pendingCandidatesMutex.Unlock() if !c.offerrer { + // Lock new candidates from processing until we set the local description. c.pendingCandidatesMutex.Lock() + defer c.pendingCandidatesMutex.Unlock() answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) if err != nil { @@ -320,7 +322,6 @@ func (c *Conn) negotiate() { // Wait until the local description is set to flush candidates. c.flushPendingCandidates() - c.pendingCandidatesMutex.Unlock() } } From 5021b959bec76c7076983cb9023d202efc512a34 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 14:56:16 +0000 Subject: [PATCH 25/25] Use flushed bool instead of checking remote --- peer/conn.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index b6dcb4b8c6612..6eddee070a187 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -120,8 +120,9 @@ type Conn struct { localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - pendingCandidates []webrtc.ICECandidateInit - pendingCandidatesMutex sync.Mutex + pendingCandidates []webrtc.ICECandidateInit + pendingCandidatesMutex sync.Mutex + pendingCandidatesFlushed bool pingChannelID uint16 pingEchoChannelID uint16 @@ -144,7 +145,7 @@ func (c *Conn) init() error { c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() - if c.rtc.RemoteDescription() == nil { + if !c.pendingCandidatesFlushed { c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON()) return @@ -280,8 +281,6 @@ func (c *Conn) negotiate() { case remoteDescription = <-c.remoteSessionDescriptionChannel: } - // Must lock new candidates from being sent while the description is being flushed. - c.pendingCandidatesMutex.Lock() err := c.rtc.SetRemoteDescription(remoteDescription) if err != nil { c.pendingCandidatesMutex.Unlock() @@ -294,13 +293,8 @@ func (c *Conn) negotiate() { // time. If candidates flush before this point, a connection could fail. c.flushPendingCandidates() } - c.pendingCandidatesMutex.Unlock() if !c.offerrer { - // Lock new candidates from processing until we set the local description. - c.pendingCandidatesMutex.Lock() - defer c.pendingCandidatesMutex.Unlock() - answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) if err != nil { _ = c.CloseWithError(xerrors.Errorf("create answer: %w", err)) @@ -328,15 +322,18 @@ func (c *Conn) negotiate() { // flushPendingCandidates writes all local candidates to the candidate send channel. // The localCandidateChannel is expected to be serviced, otherwise this could block. func (c *Conn) flushPendingCandidates() { - for _, localCandidate := range c.pendingCandidates { + c.pendingCandidatesMutex.Lock() + defer c.pendingCandidatesMutex.Unlock() + for _, pendingCandidate := range c.pendingCandidates { c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { case <-c.closed: return - case c.localCandidateChannel <- localCandidate: + case c.localCandidateChannel <- pendingCandidate: } } c.pendingCandidates = make([]webrtc.ICECandidateInit, 0) + c.pendingCandidatesFlushed = true c.opts.Logger.Debug(context.Background(), "flushed candidates") }