Skip to content

chore: Update pion/ice fork to resolve goroutine leak #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 28, 2022
Merged
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ module github.com/coder/coder

go 1.17

// Required until https://github.com/pion/ice/pull/413 is merged.
replace github.com/pion/ice/v2 => github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, good to get rid of that fork 👍


// Required until https://github.com/hashicorp/terraform-config-inspect/pull/74 is merged.
replace github.com/hashicorp/terraform-config-inspect => github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88

Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,6 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq19sBYvuMoyQ4=
github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344 h1:rXpDqMPlbnKASSBFwPrJbT2wEL5jZzIX/i0cvwISxlM=
github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344/go.mod h1:E5frMpIJ3zzcQiRo+XyT7z1IiAsGc1hDURcVJQUzGWA=
github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88 h1:tvG/qs5c4worwGyGnbbb4i/dYYLjpFwDMqcIT3awAf8=
github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88/go.mod h1:Z0Nnk4+3Cy89smEbrq+sl1bxc9198gIP4I7wcQF6Kqs=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
Expand Down Expand Up @@ -1014,8 +1012,12 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pion/datachannel v1.5.2 h1:piB93s8LGmbECrpO84DnkIVWasRMk3IimbcXkTQLE6E=
github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ=
github.com/pion/dtls/v2 v2.0.13/go.mod h1:OaE7eTM+ppaUhJ99OTO4aHl9uY6vPrT1gPY27uNTxRY=
github.com/pion/dtls/v2 v2.1.0 h1:g6gtKVNLp6URDkv9OijFJl16kqGHzVzZG+Fa4A38GTY=
github.com/pion/dtls/v2 v2.1.0/go.mod h1:qG3gA7ZPZemBqpEFqRKyURYdKEwFZQCGb7gv9T3ON3Y=
github.com/pion/ice/v2 v2.1.18/go.mod h1:9jDr0iIUg8P6+0Jq8QJ/eFSkX3JnsPd293TjCdkfpTs=
github.com/pion/ice/v2 v2.1.19 h1:z7iVx/fHlqvPILUbvcj1xjuz/6eVKgEFOM8h1AuLbF8=
github.com/pion/ice/v2 v2.1.19/go.mod h1:E5frMpIJ3zzcQiRo+XyT7z1IiAsGc1hDURcVJQUzGWA=
github.com/pion/interceptor v0.1.6/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U=
github.com/pion/interceptor v0.1.7 h1:HThW0tIIKT9RRoDWGURe8rlZVOx0fJHxBHpA0ej0+bo=
github.com/pion/interceptor v0.1.7/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U=
Expand Down Expand Up @@ -1311,6 +1313,7 @@ golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8 h1:kACShD3qhmr/3rLmg1yXyt+N4HcwutKyPRB93s54TIU=
golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
Expand Down
74 changes: 33 additions & 41 deletions peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,19 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
return nil, xerrors.Errorf("create peer connection: %w", err)
}
conn := &Conn{
pingChannelID: 1,
pingEchoChannelID: 2,
opts: opts,
rtc: rtc,
offerrer: client,
closed: make(chan struct{}),
dcOpenChannel: make(chan *webrtc.DataChannel),
dcDisconnectChannel: make(chan struct{}),
dcFailedChannel: make(chan struct{}),
localCandidateChannel: make(chan webrtc.ICECandidateInit),
pendingCandidates: make([]webrtc.ICECandidateInit, 0),
pingChannelID: 1,
pingEchoChannelID: 2,
opts: opts,
rtc: rtc,
offerrer: client,
closed: make(chan struct{}),
dcOpenChannel: make(chan *webrtc.DataChannel),
dcDisconnectChannel: make(chan struct{}),
dcFailedChannel: make(chan struct{}),
// This channel needs to be bufferred otherwise slow consumers
// of this will cause a connection failure.
localCandidateChannel: make(chan webrtc.ICECandidateInit, 16),
pendingRemoteCandidates: make([]webrtc.ICECandidateInit, 0),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
}
Expand Down Expand Up @@ -120,7 +122,7 @@ type Conn struct {
localSessionDescriptionChannel chan webrtc.SessionDescription
remoteSessionDescriptionChannel chan webrtc.SessionDescription

pendingCandidates []webrtc.ICECandidateInit
pendingRemoteCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex
pendingCandidatesFlushed bool

Expand All @@ -142,14 +144,6 @@ func (c *Conn) init() error {
if iceCandidate == nil {
return
}
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()

if !c.pendingCandidatesFlushed {
c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer")
c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON())
return
}
c.opts.Logger.Debug(context.Background(), "adding local candidate")
select {
case <-c.closed:
Expand Down Expand Up @@ -262,6 +256,7 @@ func (c *Conn) negotiate() {
_ = c.CloseWithError(xerrors.Errorf("create offer: %w", err))
return
}
c.opts.Logger.Debug(context.Background(), "setting local description")
err = c.rtc.SetLocalDescription(offer)
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
Expand All @@ -281,25 +276,20 @@ func (c *Conn) negotiate() {
case remoteDescription = <-c.remoteSessionDescriptionChannel:
}

c.opts.Logger.Debug(context.Background(), "setting remote description")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should put fields on these so they have more details

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't wanna leak the session description in logs, since it technically contains the exchange keys.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a risk? We already log this stuff on coder already. Only the two peers would be able to see the keys.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't really help IMO. Since we catch all errors, this would error if the session description was invalid.

err := c.rtc.SetRemoteDescription(remoteDescription)
if err != nil {
c.pendingCandidatesMutex.Unlock()
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
return
}

if c.offerrer {
// ICE candidates reset when an offer/answer is set for the first
// time. If candidates flush before this point, a connection could fail.
c.flushPendingCandidates()
}

if !c.offerrer {
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("create answer: %w", err))
return
}
c.opts.Logger.Debug(context.Background(), "setting local description")
err = c.rtc.SetLocalDescription(answer)
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
Expand All @@ -313,28 +303,23 @@ func (c *Conn) negotiate() {
return
case c.localSessionDescriptionChannel <- answer:
}

// Wait until the local description is set to flush candidates.
c.flushPendingCandidates()
}
}

// flushPendingCandidates writes all local candidates to the candidate send channel.
// The localCandidateChannel is expected to be serviced, otherwise this could block.
func (c *Conn) flushPendingCandidates() {
// The ICE transport resets when the remote description is updated.
// Adding ICE candidates before this point causes a failed connection,
// because the candidate would be lost.
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
for _, pendingCandidate := range c.pendingCandidates {
c.opts.Logger.Debug(context.Background(), "flushing local candidate")
select {
case <-c.closed:
for _, pendingCandidate := range c.pendingRemoteCandidates {
c.opts.Logger.Debug(context.Background(), "flushing remote candidate")
err := c.rtc.AddICECandidate(pendingCandidate)
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("flush pending candidates: %w", err))
return
case c.localCandidateChannel <- pendingCandidate:
}
}
c.pendingCandidates = make([]webrtc.ICECandidateInit, 0)
c.pendingCandidatesFlushed = true
c.opts.Logger.Debug(context.Background(), "flushed candidates")
c.opts.Logger.Debug(context.Background(), "flushed remote candidates")
}

// LocalCandidate returns a channel that emits when a local candidate
Expand All @@ -345,6 +330,13 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {

// AddRemoteCandidate adds a remote candidate to the RTC connection.
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
if !c.pendingCandidatesFlushed {
c.opts.Logger.Debug(context.Background(), "adding remote candidate to buffer")
c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i)
return nil
}
c.opts.Logger.Debug(context.Background(), "adding remote candidate")
return c.rtc.AddICECandidate(i)
}
Expand Down
9 changes: 8 additions & 1 deletion peer/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ var (
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
// pion/ice doesn't properly close immediately. The solution for this isn't yet known. See:
// https://github.com/pion/ice/pull/413
goleak.VerifyTestMain(m,
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).startOnConnectionStateChangeRoutine.func1"),
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).startOnConnectionStateChangeRoutine.func2"),
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).taskLoop"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
)
}

func TestConn(t *testing.T) {
Expand Down