From 117bae07f2a90b95aecb8ad914f26d46951aa1b2 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 13 Jan 2022 20:48:05 +0000 Subject: [PATCH 1/2] fix: Lock when obtaining a peer connection answer<->offer This fixes a race in the peerbroker package where ICE candidates could be added before the connection was negotiated. This would result in the connection failing. --- peer/conn.go | 9 +++++++++ peerbroker/dial.go | 2 +- peerbroker/dial_test.go | 3 +++ peerbroker/listen.go | 2 +- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index e4d3fec760434..f8f0e437bc257 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -116,6 +116,7 @@ type Conn struct { localCandidateChannel chan webrtc.ICECandidateInit localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription + remoteSessionDescriptionMutex sync.Mutex pingChannelID uint16 pingEchoChannelID uint16 @@ -228,6 +229,11 @@ func (c *Conn) negotiate() { c.opts.Logger.Debug(context.Background(), "negotiating") flushCandidates := c.proxyICECandidates() + // Locks while the negotiation for a remote session + // description is taking place. + c.remoteSessionDescriptionMutex.Lock() + defer c.remoteSessionDescriptionMutex.Unlock() + if c.offerrer { offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) if err != nil { @@ -328,6 +334,9 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { + // Prevents candidates from being added before an offer<->answer has occurred. + c.remoteSessionDescriptionMutex.Lock() + defer c.remoteSessionDescriptionMutex.Unlock() return c.rtc.AddICECandidate(i) } diff --git a/peerbroker/dial.go b/peerbroker/dial.go index 9505a364ad874..10511eed5c2cc 100644 --- a/peerbroker/dial.go +++ b/peerbroker/dial.go @@ -84,7 +84,7 @@ func Dial(stream proto.DRPCPeerBroker_NegotiateConnectionClient, iceServers []we for { serverToClientMessage, err := stream.Recv() if err != nil { - _ = peerConn.CloseWithError(err) + _ = peerConn.CloseWithError(xerrors.Errorf("recv: %w", err)) return } diff --git a/peerbroker/dial_test.go b/peerbroker/dial_test.go index c88a36a0eb1bf..e874b32409b84 100644 --- a/peerbroker/dial_test.go +++ b/peerbroker/dial_test.go @@ -18,7 +18,10 @@ func TestMain(m *testing.M) { } func TestDial(t *testing.T) { + t.Parallel() + t.Run("Connect", func(t *testing.T) { + t.Parallel() ctx := context.Background() client, server := provisionersdk.TransportPipe() defer client.Close() diff --git a/peerbroker/listen.go b/peerbroker/listen.go index c63d283d0feae..9da4ba059c878 100644 --- a/peerbroker/listen.go +++ b/peerbroker/listen.go @@ -184,7 +184,7 @@ func (b *peerBrokerService) NegotiateConnection(stream proto.DRPCPeerBroker_Nego Candidate: clientToServerMessage.GetIceCandidate(), }) if err != nil { - return peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err)) + return peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %+v: %w", clientToServerMessage.GetIceCandidate(), err)) } default: return peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(clientToServerMessage).String())) From a99e8e6b002f8be6adf25a3fe291d349b35031f5 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 13 Jan 2022 20:49:55 +0000 Subject: [PATCH 2/2] Remove unnecessary log --- peerbroker/listen.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peerbroker/listen.go b/peerbroker/listen.go index 9da4ba059c878..c63d283d0feae 100644 --- a/peerbroker/listen.go +++ b/peerbroker/listen.go @@ -184,7 +184,7 @@ func (b *peerBrokerService) NegotiateConnection(stream proto.DRPCPeerBroker_Nego Candidate: clientToServerMessage.GetIceCandidate(), }) if err != nil { - return peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %+v: %w", clientToServerMessage.GetIceCandidate(), err)) + return peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err)) } default: return peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(clientToServerMessage).String()))