Skip to content

Commit 30dae97

Browse files
authored
chore: Buffer remote candidates like local (#77)
* chore: Buffer remote candidates like local This was added for local candidates, and is required for remote to prevent a race where they are added before a negotiation is complete. I removed the mutex earlier, because it would cause a different race. I didn't realize the remote candidates wouldn't be buffered, but with this change they are! * Use local description instead * Add logging for candidate flush * Fix race with atomic bool * Simplify locks * Add mutex to flush * Reset buffer * Remove leak dependency to limit confusion * Fix ordering * Revert channel close * Flush candidates after remote session description is set * Bump up count to ensure race is fixed * Use custom ICE dependency * Fix data race * Lower timeout to make for fast CI * Add back mutex to prevent race * Improve debug logging * Lock on local description * Flush local candidates uniquely * Fix race * Move mutex to prevent candidate send race * Move lock to handshake so no race can occur * Reduce timeout to improve test times * Move unlock to defer * Use flushed bool instead of checking remote
1 parent 9329a50 commit 30dae97

File tree

3 files changed

+28
-9
lines changed

3 files changed

+28
-9
lines changed

peer/channel.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ func (c *Channel) closeWithError(err error) error {
281281
c.conn.dcDisconnectListeners.Sub(1)
282282
c.conn.dcFailedListeners.Sub(1)
283283
c.conn.dcClosedWaitGroup.Done()
284+
284285
return err
285286
}
286287

peer/conn.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
7272
dcDisconnectChannel: make(chan struct{}),
7373
dcFailedChannel: make(chan struct{}),
7474
localCandidateChannel: make(chan webrtc.ICECandidateInit),
75-
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
7675
pendingCandidates: make([]webrtc.ICECandidateInit, 0),
76+
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
7777
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
7878
}
7979
if client {
@@ -120,8 +120,9 @@ type Conn struct {
120120
localSessionDescriptionChannel chan webrtc.SessionDescription
121121
remoteSessionDescriptionChannel chan webrtc.SessionDescription
122122

123-
pendingCandidates []webrtc.ICECandidateInit
124-
pendingCandidatesMutex sync.Mutex
123+
pendingCandidates []webrtc.ICECandidateInit
124+
pendingCandidatesMutex sync.Mutex
125+
pendingCandidatesFlushed bool
125126

126127
pingChannelID uint16
127128
pingEchoChannelID uint16
@@ -141,15 +142,15 @@ func (c *Conn) init() error {
141142
if iceCandidate == nil {
142143
return
143144
}
144-
// ICE Candidates on a remote peer are reset when an offer
145-
// is received. We must wait until the offer<->answer has
146-
// been negotiated to flush candidates.
147145
c.pendingCandidatesMutex.Lock()
148146
defer c.pendingCandidatesMutex.Unlock()
149-
if c.rtc.RemoteDescription() == nil {
147+
148+
if !c.pendingCandidatesFlushed {
149+
c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer")
150150
c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON())
151151
return
152152
}
153+
c.opts.Logger.Debug(context.Background(), "adding local candidate")
153154
select {
154155
case <-c.closed:
155156
break
@@ -282,10 +283,17 @@ func (c *Conn) negotiate() {
282283

283284
err := c.rtc.SetRemoteDescription(remoteDescription)
284285
if err != nil {
286+
c.pendingCandidatesMutex.Unlock()
285287
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
286288
return
287289
}
288290

291+
if c.offerrer {
292+
// ICE candidates reset when an offer/answer is set for the first
293+
// time. If candidates flush before this point, a connection could fail.
294+
c.flushPendingCandidates()
295+
}
296+
289297
if !c.offerrer {
290298
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
291299
if err != nil {
@@ -305,18 +313,27 @@ func (c *Conn) negotiate() {
305313
return
306314
case c.localSessionDescriptionChannel <- answer:
307315
}
316+
317+
// Wait until the local description is set to flush candidates.
318+
c.flushPendingCandidates()
308319
}
320+
}
309321

322+
// flushPendingCandidates writes all local candidates to the candidate send channel.
323+
// The localCandidateChannel is expected to be serviced, otherwise this could block.
324+
func (c *Conn) flushPendingCandidates() {
310325
c.pendingCandidatesMutex.Lock()
311326
defer c.pendingCandidatesMutex.Unlock()
312327
for _, pendingCandidate := range c.pendingCandidates {
328+
c.opts.Logger.Debug(context.Background(), "flushing local candidate")
313329
select {
314330
case <-c.closed:
315331
return
316332
case c.localCandidateChannel <- pendingCandidate:
317333
}
318334
}
319335
c.pendingCandidates = make([]webrtc.ICECandidateInit, 0)
336+
c.pendingCandidatesFlushed = true
320337
c.opts.Logger.Debug(context.Background(), "flushed candidates")
321338
}
322339

@@ -328,6 +345,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
328345

329346
// AddRemoteCandidate adds a remote candidate to the RTC connection.
330347
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
348+
c.opts.Logger.Debug(context.Background(), "adding remote candidate")
331349
return c.rtc.AddICECandidate(i)
332350
}
333351

peer/conn_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ var (
3535
// In CI resources are frequently contended, so increasing this value
3636
// results in less flakes.
3737
if os.Getenv("CI") == "true" {
38-
return 4 * time.Second
38+
return 3 * time.Second
3939
}
4040
return 100 * time.Millisecond
4141
}()
42-
failedTimeout = disconnectedTimeout * 4
42+
failedTimeout = disconnectedTimeout * 3
4343
keepAliveInterval = time.Millisecond * 2
4444

4545
// There's a global race in the vnet library allocation code.

0 commit comments

Comments
 (0)