Skip to content

chore: Fix race in collecting ICE Candidates #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 27, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: Fix race in collecting ICE Candidates
This logic was flawed previously. ICE Candidates could collect
before a negotiation was triggered, which led to a race where
candidates would be lost. Candidates can no longer be lost,
and we removed some code 😎.
  • Loading branch information
kylecarbs committed Jan 26, 2022
commit 0461bb6ed1abaef0dab4e54cecb8148495e73a01
69 changes: 26 additions & 43 deletions peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
dcFailedChannel: make(chan struct{}),
localCandidateChannel: make(chan webrtc.ICECandidateInit),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
pendingCandidates: make([]webrtc.ICECandidateInit, 0),
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
}
if client {
Expand Down Expand Up @@ -118,7 +119,9 @@ type Conn struct {
localCandidateChannel chan webrtc.ICECandidateInit
localSessionDescriptionChannel chan webrtc.SessionDescription
remoteSessionDescriptionChannel chan webrtc.SessionDescription
remoteSessionDescriptionMutex sync.Mutex

pendingCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex

pingChannelID uint16
pingEchoChannelID uint16
Expand All @@ -134,6 +137,22 @@ type Conn struct {

func (c *Conn) init() error {
c.rtc.OnNegotiationNeeded(c.negotiate)
c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) {
if iceCandidate == nil {
return
}
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
if c.rtc.RemoteDescription() == nil {
c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON())
return
}
select {
case <-c.closed:
break
case c.localCandidateChannel <- iceCandidate.ToJSON():
}
})
c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) {
select {
case <-c.closed:
Expand Down Expand Up @@ -232,12 +251,6 @@ func (c *Conn) pingEchoChannel() (*Channel, error) {

func (c *Conn) negotiate() {
c.opts.Logger.Debug(context.Background(), "negotiating")
flushCandidates := c.proxyICECandidates()

// Locks while the negotiation for a remote session
// description is taking place.
c.remoteSessionDescriptionMutex.Lock()
defer c.remoteSessionDescriptionMutex.Unlock()

if c.offerrer {
offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{})
Expand Down Expand Up @@ -291,44 +304,17 @@ func (c *Conn) negotiate() {
}
}

flushCandidates()
c.opts.Logger.Debug(context.Background(), "flushed candidates")
}

func (c *Conn) proxyICECandidates() func() {
var (
mut sync.Mutex
queue = []webrtc.ICECandidateInit{}
flushed = false
)
c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) {
if iceCandidate == nil {
return
}
mut.Lock()
defer mut.Unlock()
if !flushed {
queue = append(queue, iceCandidate.ToJSON())
return
}
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
for _, pendingCandidate := range c.pendingCandidates {
select {
case <-c.closed:
return
case c.localCandidateChannel <- iceCandidate.ToJSON():
}
})
return func() {
mut.Lock()
defer mut.Unlock()
for _, q := range queue {
select {
case <-c.closed:
break
case c.localCandidateChannel <- q:
}
case c.localCandidateChannel <- pendingCandidate:
}
flushed = true
}
c.pendingCandidates = make([]webrtc.ICECandidateInit, 0)
c.opts.Logger.Debug(context.Background(), "flushed candidates")
}

// LocalCandidate returns a channel that emits when a local candidate
Expand All @@ -339,9 +325,6 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {

// AddRemoteCandidate adds a remote candidate to the RTC connection.
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
// Prevents candidates from being added before an offer<->answer has occurred.
c.remoteSessionDescriptionMutex.Lock()
defer c.remoteSessionDescriptionMutex.Unlock()
return c.rtc.AddICECandidate(i)
}

Expand Down