diff --git a/peer/conn.go b/peer/conn.go index e4d3fec760434..f8f0e437bc257 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -116,6 +116,7 @@ type Conn struct { localCandidateChannel chan webrtc.ICECandidateInit localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription + remoteSessionDescriptionMutex sync.Mutex pingChannelID uint16 pingEchoChannelID uint16 @@ -228,6 +229,11 @@ func (c *Conn) negotiate() { c.opts.Logger.Debug(context.Background(), "negotiating") flushCandidates := c.proxyICECandidates() + // Locks while the negotiation for a remote session + // description is taking place. + c.remoteSessionDescriptionMutex.Lock() + defer c.remoteSessionDescriptionMutex.Unlock() + if c.offerrer { offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) if err != nil { @@ -328,6 +334,9 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { + // Prevents candidates from being added before an offer<->answer has occurred. + c.remoteSessionDescriptionMutex.Lock() + defer c.remoteSessionDescriptionMutex.Unlock() return c.rtc.AddICECandidate(i) } diff --git a/peerbroker/dial.go b/peerbroker/dial.go index 9505a364ad874..10511eed5c2cc 100644 --- a/peerbroker/dial.go +++ b/peerbroker/dial.go @@ -84,7 +84,7 @@ func Dial(stream proto.DRPCPeerBroker_NegotiateConnectionClient, iceServers []we for { serverToClientMessage, err := stream.Recv() if err != nil { - _ = peerConn.CloseWithError(err) + _ = peerConn.CloseWithError(xerrors.Errorf("recv: %w", err)) return } diff --git a/peerbroker/dial_test.go b/peerbroker/dial_test.go index c88a36a0eb1bf..e874b32409b84 100644 --- a/peerbroker/dial_test.go +++ b/peerbroker/dial_test.go @@ -18,7 +18,10 @@ func TestMain(m *testing.M) { } func TestDial(t *testing.T) { + t.Parallel() + t.Run("Connect", func(t *testing.T) { + t.Parallel() ctx := context.Background() client, server := provisionersdk.TransportPipe() defer client.Close()