diff --git a/peer/channel.go b/peer/channel.go index 8c3f5118996c1..2c95d495c4d65 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -281,6 +281,7 @@ func (c *Channel) closeWithError(err error) error { c.conn.dcDisconnectListeners.Sub(1) c.conn.dcFailedListeners.Sub(1) c.conn.dcClosedWaitGroup.Done() + return err } diff --git a/peer/conn.go b/peer/conn.go index 11e2a05f0ca89..6eddee070a187 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -72,8 +72,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp dcDisconnectChannel: make(chan struct{}), dcFailedChannel: make(chan struct{}), localCandidateChannel: make(chan webrtc.ICECandidateInit), - localSessionDescriptionChannel: make(chan webrtc.SessionDescription), pendingCandidates: make([]webrtc.ICECandidateInit, 0), + localSessionDescriptionChannel: make(chan webrtc.SessionDescription), remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription), } if client { @@ -120,8 +120,9 @@ type Conn struct { localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - pendingCandidates []webrtc.ICECandidateInit - pendingCandidatesMutex sync.Mutex + pendingCandidates []webrtc.ICECandidateInit + pendingCandidatesMutex sync.Mutex + pendingCandidatesFlushed bool pingChannelID uint16 pingEchoChannelID uint16 @@ -141,15 +142,15 @@ func (c *Conn) init() error { if iceCandidate == nil { return } - // ICE Candidates on a remote peer are reset when an offer - // is received. We must wait until the offer<->answer has - // been negotiated to flush candidates. c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() - if c.rtc.RemoteDescription() == nil { + + if !c.pendingCandidatesFlushed { + c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON()) return } + c.opts.Logger.Debug(context.Background(), "adding local candidate") select { case <-c.closed: break @@ -282,10 +283,17 @@ func (c *Conn) negotiate() { err := c.rtc.SetRemoteDescription(remoteDescription) if err != nil { + c.pendingCandidatesMutex.Unlock() _ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err)) return } + if c.offerrer { + // ICE candidates reset when an offer/answer is set for the first + // time. If candidates flush before this point, a connection could fail. + c.flushPendingCandidates() + } + if !c.offerrer { answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) if err != nil { @@ -305,11 +313,19 @@ func (c *Conn) negotiate() { return case c.localSessionDescriptionChannel <- answer: } + + // Wait until the local description is set to flush candidates. + c.flushPendingCandidates() } +} +// flushPendingCandidates writes all local candidates to the candidate send channel. +// The localCandidateChannel is expected to be serviced, otherwise this could block. +func (c *Conn) flushPendingCandidates() { c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() for _, pendingCandidate := range c.pendingCandidates { + c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { case <-c.closed: return @@ -317,6 +333,7 @@ func (c *Conn) negotiate() { } } c.pendingCandidates = make([]webrtc.ICECandidateInit, 0) + c.pendingCandidatesFlushed = true c.opts.Logger.Debug(context.Background(), "flushed candidates") } @@ -328,6 +345,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { + c.opts.Logger.Debug(context.Background(), "adding remote candidate") return c.rtc.AddICECandidate(i) } diff --git a/peer/conn_test.go b/peer/conn_test.go index 0b954579c9c27..f964a61bfe832 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -35,11 +35,11 @@ var ( // In CI resources are frequently contended, so increasing this value // results in less flakes. if os.Getenv("CI") == "true" { - return 4 * time.Second + return 3 * time.Second } return 100 * time.Millisecond }() - failedTimeout = disconnectedTimeout * 4 + failedTimeout = disconnectedTimeout * 3 keepAliveInterval = time.Millisecond * 2 // There's a global race in the vnet library allocation code.