Skip to content

chore: Buffer remote candidates like local #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
310eb82
chore: Buffer remote candidates like local
kylecarbs Jan 27, 2022
b874a1e
Use local description instead
kylecarbs Jan 27, 2022
411542a
Add logging for candidate flush
kylecarbs Jan 27, 2022
90e00db
Fix race with atomic bool
kylecarbs Jan 27, 2022
85e6def
Simplify locks
kylecarbs Jan 27, 2022
7f26a99
Add mutex to flush
kylecarbs Jan 27, 2022
a56560b
Reset buffer
kylecarbs Jan 27, 2022
58445d5
Remove leak dependency to limit confusion
kylecarbs Jan 27, 2022
6a07a99
Fix ordering
kylecarbs Jan 27, 2022
3272e13
Revert channel close
kylecarbs Jan 27, 2022
877ae59
Flush candidates after remote session description is set
kylecarbs Jan 27, 2022
1e6a923
Bump up count to ensure race is fixed
kylecarbs Jan 27, 2022
0c02c78
Use custom ICE dependency
kylecarbs Jan 27, 2022
0439e23
Fix data race
kylecarbs Jan 27, 2022
a8b5a07
Lower timeout to make for fast CI
kylecarbs Jan 27, 2022
d74f454
Add back mutex to prevent race
kylecarbs Jan 27, 2022
b613b6d
Improve debug logging
kylecarbs Jan 27, 2022
ba878ca
Lock on local description
kylecarbs Jan 27, 2022
ef41921
Flush local candidates uniquely
kylecarbs Jan 27, 2022
3515fe7
Fix race
kylecarbs Jan 27, 2022
fbe847c
Move mutex to prevent candidate send race
kylecarbs Jan 27, 2022
7321303
Move lock to handshake so no race can occur
kylecarbs Jan 27, 2022
bebc74d
Reduce timeout to improve test times
kylecarbs Jan 27, 2022
48078d2
Move unlock to defer
kylecarbs Jan 27, 2022
5021b95
Use flushed bool instead of checking remote
kylecarbs Jan 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions peer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (c *Channel) closeWithError(err error) error {
c.conn.dcDisconnectListeners.Sub(1)
c.conn.dcFailedListeners.Sub(1)
c.conn.dcClosedWaitGroup.Done()

return err
}

Expand Down
32 changes: 25 additions & 7 deletions peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
dcDisconnectChannel: make(chan struct{}),
dcFailedChannel: make(chan struct{}),
localCandidateChannel: make(chan webrtc.ICECandidateInit),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
pendingCandidates: make([]webrtc.ICECandidateInit, 0),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
}
if client {
Expand Down Expand Up @@ -120,8 +120,9 @@ type Conn struct {
localSessionDescriptionChannel chan webrtc.SessionDescription
remoteSessionDescriptionChannel chan webrtc.SessionDescription

pendingCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex
pendingCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex
pendingCandidatesFlushed bool

pingChannelID uint16
pingEchoChannelID uint16
Expand All @@ -141,15 +142,15 @@ func (c *Conn) init() error {
if iceCandidate == nil {
return
}
// ICE Candidates on a remote peer are reset when an offer
// is received. We must wait until the offer<->answer has
// been negotiated to flush candidates.
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
if c.rtc.RemoteDescription() == nil {

if !c.pendingCandidatesFlushed {
c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer")
c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON())
return
}
c.opts.Logger.Debug(context.Background(), "adding local candidate")
select {
case <-c.closed:
break
Expand Down Expand Up @@ -282,10 +283,17 @@ func (c *Conn) negotiate() {

err := c.rtc.SetRemoteDescription(remoteDescription)
if err != nil {
c.pendingCandidatesMutex.Unlock()
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
return
}

if c.offerrer {
// ICE candidates reset when an offer/answer is set for the first
// time. If candidates flush before this point, a connection could fail.
c.flushPendingCandidates()
}

if !c.offerrer {
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
if err != nil {
Expand All @@ -305,18 +313,27 @@ func (c *Conn) negotiate() {
return
case c.localSessionDescriptionChannel <- answer:
}

// Wait until the local description is set to flush candidates.
c.flushPendingCandidates()
}
}

// flushPendingCandidates writes all local candidates to the candidate send channel.
// The localCandidateChannel is expected to be serviced, otherwise this could block.
func (c *Conn) flushPendingCandidates() {
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
for _, pendingCandidate := range c.pendingCandidates {
c.opts.Logger.Debug(context.Background(), "flushing local candidate")
select {
case <-c.closed:
return
case c.localCandidateChannel <- pendingCandidate:
}
}
c.pendingCandidates = make([]webrtc.ICECandidateInit, 0)
c.pendingCandidatesFlushed = true
c.opts.Logger.Debug(context.Background(), "flushed candidates")
}

Expand All @@ -328,6 +345,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {

// AddRemoteCandidate adds a remote candidate to the RTC connection.
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
c.opts.Logger.Debug(context.Background(), "adding remote candidate")
return c.rtc.AddICECandidate(i)
}

Expand Down
4 changes: 2 additions & 2 deletions peer/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ var (
// In CI resources are frequently contended, so increasing this value
// results in less flakes.
if os.Getenv("CI") == "true" {
return 4 * time.Second
return 3 * time.Second
}
return 100 * time.Millisecond
}()
failedTimeout = disconnectedTimeout * 4
failedTimeout = disconnectedTimeout * 3
keepAliveInterval = time.Millisecond * 2

// There's a global race in the vnet library allocation code.
Expand Down