-
Notifications
You must be signed in to change notification settings - Fork 936
chore: Update pion/ice fork to resolve goroutine leak #78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
18eac83
ebb94b8
de209c7
6f7e3af
a918c93
44cad06
d62797e
982fde3
e154d6e
a2a4da4
aa1e664
7b3e89d
f3717ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,17 +62,19 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp | |
return nil, xerrors.Errorf("create peer connection: %w", err) | ||
} | ||
conn := &Conn{ | ||
pingChannelID: 1, | ||
pingEchoChannelID: 2, | ||
opts: opts, | ||
rtc: rtc, | ||
offerrer: client, | ||
closed: make(chan struct{}), | ||
dcOpenChannel: make(chan *webrtc.DataChannel), | ||
dcDisconnectChannel: make(chan struct{}), | ||
dcFailedChannel: make(chan struct{}), | ||
localCandidateChannel: make(chan webrtc.ICECandidateInit), | ||
pendingCandidates: make([]webrtc.ICECandidateInit, 0), | ||
pingChannelID: 1, | ||
pingEchoChannelID: 2, | ||
opts: opts, | ||
rtc: rtc, | ||
offerrer: client, | ||
closed: make(chan struct{}), | ||
dcOpenChannel: make(chan *webrtc.DataChannel), | ||
dcDisconnectChannel: make(chan struct{}), | ||
dcFailedChannel: make(chan struct{}), | ||
// This channel needs to be bufferred otherwise slow consumers | ||
// of this will cause a connection failure. | ||
localCandidateChannel: make(chan webrtc.ICECandidateInit, 16), | ||
pendingRemoteCandidates: make([]webrtc.ICECandidateInit, 0), | ||
localSessionDescriptionChannel: make(chan webrtc.SessionDescription), | ||
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription), | ||
} | ||
|
@@ -120,7 +122,7 @@ type Conn struct { | |
localSessionDescriptionChannel chan webrtc.SessionDescription | ||
remoteSessionDescriptionChannel chan webrtc.SessionDescription | ||
|
||
pendingCandidates []webrtc.ICECandidateInit | ||
pendingRemoteCandidates []webrtc.ICECandidateInit | ||
pendingCandidatesMutex sync.Mutex | ||
pendingCandidatesFlushed bool | ||
|
||
|
@@ -142,14 +144,6 @@ func (c *Conn) init() error { | |
if iceCandidate == nil { | ||
return | ||
} | ||
c.pendingCandidatesMutex.Lock() | ||
defer c.pendingCandidatesMutex.Unlock() | ||
|
||
if !c.pendingCandidatesFlushed { | ||
c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") | ||
c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON()) | ||
return | ||
} | ||
c.opts.Logger.Debug(context.Background(), "adding local candidate") | ||
select { | ||
case <-c.closed: | ||
|
@@ -262,6 +256,7 @@ func (c *Conn) negotiate() { | |
_ = c.CloseWithError(xerrors.Errorf("create offer: %w", err)) | ||
return | ||
} | ||
c.opts.Logger.Debug(context.Background(), "setting local description") | ||
err = c.rtc.SetLocalDescription(offer) | ||
if err != nil { | ||
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err)) | ||
|
@@ -281,25 +276,20 @@ func (c *Conn) negotiate() { | |
case remoteDescription = <-c.remoteSessionDescriptionChannel: | ||
} | ||
|
||
c.opts.Logger.Debug(context.Background(), "setting remote description") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you should put fields on these so they have more details There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't wanna leak the session description in logs, since it technically contains the exchange keys. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a risk? We already log this stuff on coder already. Only the two peers would be able to see the keys. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't really help IMO. Since we catch all errors, this would error if the session description was invalid. |
||
err := c.rtc.SetRemoteDescription(remoteDescription) | ||
if err != nil { | ||
c.pendingCandidatesMutex.Unlock() | ||
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err)) | ||
return | ||
} | ||
|
||
if c.offerrer { | ||
// ICE candidates reset when an offer/answer is set for the first | ||
// time. If candidates flush before this point, a connection could fail. | ||
c.flushPendingCandidates() | ||
} | ||
|
||
if !c.offerrer { | ||
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) | ||
if err != nil { | ||
_ = c.CloseWithError(xerrors.Errorf("create answer: %w", err)) | ||
return | ||
} | ||
c.opts.Logger.Debug(context.Background(), "setting local description") | ||
err = c.rtc.SetLocalDescription(answer) | ||
if err != nil { | ||
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err)) | ||
|
@@ -313,28 +303,23 @@ func (c *Conn) negotiate() { | |
return | ||
case c.localSessionDescriptionChannel <- answer: | ||
} | ||
|
||
// Wait until the local description is set to flush candidates. | ||
c.flushPendingCandidates() | ||
} | ||
} | ||
|
||
// flushPendingCandidates writes all local candidates to the candidate send channel. | ||
// The localCandidateChannel is expected to be serviced, otherwise this could block. | ||
func (c *Conn) flushPendingCandidates() { | ||
// The ICE transport resets when the remote description is updated. | ||
// Adding ICE candidates before this point causes a failed connection, | ||
// because the candidate would be lost. | ||
c.pendingCandidatesMutex.Lock() | ||
defer c.pendingCandidatesMutex.Unlock() | ||
for _, pendingCandidate := range c.pendingCandidates { | ||
c.opts.Logger.Debug(context.Background(), "flushing local candidate") | ||
select { | ||
case <-c.closed: | ||
for _, pendingCandidate := range c.pendingRemoteCandidates { | ||
c.opts.Logger.Debug(context.Background(), "flushing remote candidate") | ||
err := c.rtc.AddICECandidate(pendingCandidate) | ||
if err != nil { | ||
_ = c.CloseWithError(xerrors.Errorf("flush pending candidates: %w", err)) | ||
return | ||
case c.localCandidateChannel <- pendingCandidate: | ||
} | ||
} | ||
c.pendingCandidates = make([]webrtc.ICECandidateInit, 0) | ||
c.pendingCandidatesFlushed = true | ||
c.opts.Logger.Debug(context.Background(), "flushed candidates") | ||
c.opts.Logger.Debug(context.Background(), "flushed remote candidates") | ||
} | ||
|
||
// LocalCandidate returns a channel that emits when a local candidate | ||
|
@@ -345,6 +330,13 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { | |
|
||
// AddRemoteCandidate adds a remote candidate to the RTC connection. | ||
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { | ||
c.pendingCandidatesMutex.Lock() | ||
defer c.pendingCandidatesMutex.Unlock() | ||
if !c.pendingCandidatesFlushed { | ||
c.opts.Logger.Debug(context.Background(), "adding remote candidate to buffer") | ||
c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) | ||
return nil | ||
} | ||
c.opts.Logger.Debug(context.Background(), "adding remote candidate") | ||
return c.rtc.AddICECandidate(i) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, good to get rid of that fork 👍