From a7a1ec5fb6879240771aa9b0557dae45317deef3 Mon Sep 17 00:00:00 2001 From: kylecarbs Date: Thu, 26 May 2022 03:36:43 +0000 Subject: [PATCH] fix: Close peer negotiate mutex if we haven't negotiated Closes #1706 and #1644. --- peer/channel.go | 9 +++++++++ peer/conn.go | 34 ++++++++++++++++++++++------------ peer/conn_test.go | 2 -- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/peer/channel.go b/peer/channel.go index 7db76d984f815..4af4fbcb465f2 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -6,6 +6,7 @@ import ( "io" "net" "sync" + "time" "github.com/pion/datachannel" "github.com/pion/webrtc/v3" @@ -244,6 +245,14 @@ func (c *Channel) Write(bytes []byte) (n int, err error) { if c.dc.BufferedAmount()+uint64(len(bytes)) >= maxBufferedAmount { <-c.sendMore } + + // There's an obvious race-condition here. This is an edge-case, as + // most-frequently data won't be pooled so synchronously, but is + // definitely possible. + // + // See: https://github.com/pion/sctp/issues/181 + time.Sleep(time.Microsecond) + return c.rwc.Write(bytes) } diff --git a/peer/conn.go b/peer/conn.go index c81b29d0bbd38..0540a496e0dfa 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -73,6 +73,7 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp dcFailedChannel: make(chan struct{}), localCandidateChannel: make(chan webrtc.ICECandidateInit), localSessionDescriptionChannel: make(chan webrtc.SessionDescription, 1), + negotiated: make(chan struct{}), remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription, 1), settingEngine: opts.SettingEngine, } @@ -124,8 +125,7 @@ type Conn struct { localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - negotiateMutex sync.Mutex - hasNegotiated bool + negotiated chan struct{} loggerValue atomic.Value settingEngine webrtc.SettingEngine @@ -152,9 +152,6 @@ func (c *Conn) logger() slog.Logger { } func (c *Conn) init() error { - // The negotiation needed callback can take a little bit to execute! - c.negotiateMutex.Lock() - c.rtc.OnNegotiationNeeded(c.negotiate) c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) { c.closedICEMutex.Lock() @@ -290,11 +287,13 @@ func (c *Conn) negotiate() { c.logger().Debug(context.Background(), "negotiating") // ICE candidates cannot be added until SessionDescriptions have been // exchanged between peers. - if c.hasNegotiated { - c.negotiateMutex.Lock() - } - c.hasNegotiated = true - defer c.negotiateMutex.Unlock() + defer func() { + select { + case <-c.negotiated: + default: + close(c.negotiated) + } + }() if c.offerer { offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) @@ -368,8 +367,10 @@ func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) { // This must occur in a goroutine to allow the SessionDescriptions // to be exchanged first. go func() { - c.negotiateMutex.Lock() - defer c.negotiateMutex.Unlock() + select { + case <-c.closed: + case <-c.negotiated: + } if c.isClosed() { return } @@ -605,6 +606,15 @@ func (c *Conn) CloseWithError(err error) error { // All logging, goroutines, and async functionality is cleaned up after this. c.dcClosedWaitGroup.Wait() + // It's possible the connection can be closed before negotiation has + // began. In this case, we want to unlock the mutex to unblock any + // pending candidates being flushed. + select { + case <-c.negotiated: + default: + close(c.negotiated) + } + // Disable logging! c.loggerValue.Store(slog.Logger{}) logger.Sync() diff --git a/peer/conn_test.go b/peer/conn_test.go index 161c48bbca92a..20f4c84638b0c 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -59,9 +59,7 @@ func TestMain(m *testing.M) { } func TestConn(t *testing.T) { - t.Skip("known flake -- https://github.com/coder/coder/issues/1644") t.Parallel() - t.Run("Ping", func(t *testing.T) { t.Parallel() client, server, _ := createPair(t)