Skip to content

Commit c7ae085

Browse files
committed
fix: Close peer negotiate mutex if we haven't negotiated
Closes #1706 and #1644.
1 parent 31b819e commit c7ae085

File tree

3 files changed

+18
-5
lines changed

3 files changed

+18
-5
lines changed

peer/channel.go

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"net"
88
"sync"
9+
"time"
910

1011
"github.com/pion/datachannel"
1112
"github.com/pion/webrtc/v3"
@@ -244,6 +245,14 @@ func (c *Channel) Write(bytes []byte) (n int, err error) {
244245
if c.dc.BufferedAmount()+uint64(len(bytes)) >= maxBufferedAmount {
245246
<-c.sendMore
246247
}
248+
249+
// There's an obvious race-condition here. This is an edge-case, as
250+
// most-frequently data won't be pooled so synchronously, but is
251+
// definitely possible.
252+
//
253+
// See: https://github.com/pion/sctp/issues/181
254+
time.Sleep(time.Microsecond)
255+
247256
return c.rwc.Write(bytes)
248257
}
249258

peer/conn.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ type Conn struct {
125125
remoteSessionDescriptionChannel chan webrtc.SessionDescription
126126

127127
negotiateMutex sync.Mutex
128-
hasNegotiated bool
128+
hasNegotiated atomic.Bool
129129

130130
loggerValue atomic.Value
131131
settingEngine webrtc.SettingEngine
@@ -290,10 +290,9 @@ func (c *Conn) negotiate() {
290290
c.logger().Debug(context.Background(), "negotiating")
291291
// ICE candidates cannot be added until SessionDescriptions have been
292292
// exchanged between peers.
293-
if c.hasNegotiated {
293+
if c.hasNegotiated.Swap(true) {
294294
c.negotiateMutex.Lock()
295295
}
296-
c.hasNegotiated = true
297296
defer c.negotiateMutex.Unlock()
298297

299298
if c.offerer {
@@ -605,6 +604,13 @@ func (c *Conn) CloseWithError(err error) error {
605604
// All logging, goroutines, and async functionality is cleaned up after this.
606605
c.dcClosedWaitGroup.Wait()
607606

607+
// It's possible the connection can be closed before negotiation has
608+
// began. In this case, we want to unlock the mutex to unblock any
609+
// pending candidates being flushed.
610+
if !c.hasNegotiated.Load() {
611+
c.negotiateMutex.Unlock()
612+
}
613+
608614
// Disable logging!
609615
c.loggerValue.Store(slog.Logger{})
610616
logger.Sync()

peer/conn_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ func TestMain(m *testing.M) {
5959
}
6060

6161
func TestConn(t *testing.T) {
62-
t.Skip("known flake -- https://github.com/coder/coder/issues/1644")
6362
t.Parallel()
64-
6563
t.Run("Ping", func(t *testing.T) {
6664
t.Parallel()
6765
client, server, _ := createPair(t)

0 commit comments

Comments
 (0)