Skip to content

Commit e5cb06b

Browse files
committed
Properly close ICE gatherer
1 parent f20ce9d commit e5cb06b

File tree

8 files changed

+127
-72
lines changed

8 files changed

+127
-72
lines changed

.github/workflows/coder.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ jobs:
158158
run:
159159
gotestsum --jsonfile="gotests.json" --packages="./..." --
160160
-covermode=atomic -coverprofile="gotests.coverage" -timeout=3m
161-
-count=3 -race -parallel=2
161+
-count=3 -race -short -parallel=2
162162

163163
- name: Test with PostgreSQL Database
164164
if: runner.os == 'Linux'

database/migrate_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ func TestMain(m *testing.M) {
2020
func TestMigrate(t *testing.T) {
2121
t.Parallel()
2222

23+
if testing.Short() {
24+
t.Skip()
25+
return
26+
}
27+
2328
t.Run("Once", func(t *testing.T) {
2429
t.Parallel()
2530
connection, closeFn, err := postgres.Open()

database/postgres/postgres_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ func TestMain(m *testing.M) {
2121
func TestPostgres(t *testing.T) {
2222
t.Parallel()
2323

24+
if testing.Short() {
25+
t.Skip()
26+
return
27+
}
28+
2429
connect, close, err := postgres.Open()
2530
require.NoError(t, err)
2631
defer close()

peer/channel.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,9 @@ func (c *Channel) init() {
141141
// A DataChannel can disconnect multiple times, so this needs to loop.
142142
for {
143143
select {
144-
case <-c.closed:
144+
case <-c.conn.closedRTC:
145145
// If this channel was closed, there's no need to close again.
146-
return
146+
err = c.conn.closeError
147147
case <-c.conn.Closed():
148148
// If the RTC connection closed with an error, this channel
149149
// should end with the same one.

peer/conn.go

Lines changed: 74 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,13 @@ type Conn struct {
108108
// Determines whether this connection will send the offer or the answer.
109109
offerrer bool
110110

111-
closed chan struct{}
112-
closedRTC chan struct{}
113-
closedICE chan struct{}
114-
closeMutex sync.Mutex
115-
closeError error
111+
closed chan struct{}
112+
closedRTC chan struct{}
113+
closedRTCMutex sync.Mutex
114+
closedICE chan struct{}
115+
closedICEMutex sync.Mutex
116+
closeMutex sync.Mutex
117+
closeError error
116118

117119
dcOpenChannel chan *webrtc.DataChannel
118120
dcDisconnectChannel chan struct{}
@@ -128,7 +130,6 @@ type Conn struct {
128130

129131
pendingCandidatesToSend []webrtc.ICECandidateInit
130132
pendingCandidatesToSendMutex sync.Mutex
131-
pendingCandidatesFlushed bool
132133

133134
pingChannelID uint16
134135
pingEchoChannelID uint16
@@ -155,6 +156,8 @@ func (c *Conn) init() error {
155156
slog.F("state", iceConnectionState))
156157

157158
if iceConnectionState == webrtc.ICEConnectionStateClosed {
159+
c.closedICEMutex.Lock()
160+
defer c.closedICEMutex.Unlock()
158161
select {
159162
case <-c.closedICE:
160163
default:
@@ -169,14 +172,24 @@ func (c *Conn) init() error {
169172
c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) {
170173
c.opts.Logger.Debug(context.Background(), "ice gathering state updated",
171174
slog.F("state", iceGatherState))
175+
176+
if iceGatherState == webrtc.ICEGathererStateClosed {
177+
c.closedICEMutex.Lock()
178+
defer c.closedICEMutex.Unlock()
179+
select {
180+
case <-c.closedICE:
181+
default:
182+
close(c.closedICE)
183+
}
184+
}
172185
})
173186
c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) {
174187
if iceCandidate == nil {
175188
return
176189
}
177190
c.pendingCandidatesToSendMutex.Lock()
178191
defer c.pendingCandidatesToSendMutex.Unlock()
179-
if !c.pendingCandidatesFlushed {
192+
if c.rtc.RemoteDescription() == nil {
180193
c.pendingCandidatesToSend = append(c.pendingCandidatesToSend, iceCandidate.ToJSON())
181194
c.opts.Logger.Debug(context.Background(), "buffering local candidate")
182195
return
@@ -197,6 +210,11 @@ func (c *Conn) init() error {
197210
}
198211
})
199212
c.rtc.OnConnectionStateChange(func(peerConnectionState webrtc.PeerConnectionState) {
213+
if c.isClosed() {
214+
return
215+
}
216+
// Pion executes this handler multiple times in a rare condition.
217+
// This prevents logging from happening after close.
200218
c.opts.Logger.Debug(context.Background(), "rtc connection updated",
201219
slog.F("state", peerConnectionState))
202220

@@ -215,18 +233,20 @@ func (c *Conn) init() error {
215233
default:
216234
}
217235
}
218-
}
219-
220-
if peerConnectionState == webrtc.PeerConnectionStateClosed {
236+
case webrtc.PeerConnectionStateClosed:
221237
// Pion executes event handlers after close is called
222238
// on the RTC connection. This ensures our Close()
223239
// handler properly cleans up before returning.
224240
//
225241
// Pion can execute this multiple times, so we check
226242
// if it's open before closing.
243+
c.closedRTCMutex.Lock()
244+
defer c.closedRTCMutex.Unlock()
227245
select {
228246
case <-c.closedRTC:
247+
c.opts.Logger.Debug(context.Background(), "closedRTC channel already closed")
229248
default:
249+
c.opts.Logger.Debug(context.Background(), "closedRTC channel closing...")
230250
close(c.closedRTC)
231251
}
232252
}
@@ -248,14 +268,18 @@ func (c *Conn) init() error {
248268
// uses trickle ICE by default. See: https://webrtchacks.com/trickle-ice/
249269
func (c *Conn) negotiate() {
250270
c.opts.Logger.Debug(context.Background(), "negotiating")
271+
c.remoteSessionDescriptionMutex.Lock()
272+
defer c.remoteSessionDescriptionMutex.Unlock()
251273

252274
if c.offerrer {
253275
offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{})
254276
if err != nil {
255277
_ = c.CloseWithError(xerrors.Errorf("create offer: %w", err))
256278
return
257279
}
280+
c.closeMutex.Lock()
258281
err = c.rtc.SetLocalDescription(offer)
282+
c.closeMutex.Unlock()
259283
if err != nil {
260284
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
261285
return
@@ -266,25 +290,23 @@ func (c *Conn) negotiate() {
266290
return
267291
case c.localSessionDescriptionChannel <- offer:
268292
}
293+
c.opts.Logger.Debug(context.Background(), "sent offer")
269294
}
270295

271296
var sessionDescription webrtc.SessionDescription
297+
c.opts.Logger.Debug(context.Background(), "awaiting remote description...")
272298
select {
273299
case <-c.closed:
274300
return
275301
case sessionDescription = <-c.remoteSessionDescriptionChannel:
276302
}
277303

278-
// This prevents candidates from being added while
279-
// the remote description is being set.
280-
c.remoteSessionDescriptionMutex.Lock()
281304
c.opts.Logger.Debug(context.Background(), "setting remote description")
282305
err := c.rtc.SetRemoteDescription(sessionDescription)
283306
if err != nil {
284307
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
285308
return
286309
}
287-
c.remoteSessionDescriptionMutex.Unlock()
288310

289311
if !c.offerrer {
290312
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
@@ -306,31 +328,44 @@ func (c *Conn) negotiate() {
306328
return
307329
case c.localSessionDescriptionChannel <- answer:
308330
}
331+
c.opts.Logger.Debug(context.Background(), "sent answer")
309332
}
310333

311-
c.pendingCandidatesToSendMutex.Lock()
312-
defer c.pendingCandidatesToSendMutex.Unlock()
313-
for _, pendingCandidate := range c.pendingCandidatesToSend {
314-
select {
315-
case <-c.closed:
316-
return
317-
case c.localCandidateChannel <- pendingCandidate:
334+
go func() {
335+
c.pendingCandidatesToSendMutex.Lock()
336+
defer c.pendingCandidatesToSendMutex.Unlock()
337+
for _, pendingCandidate := range c.pendingCandidatesToSend {
338+
select {
339+
case <-c.closed:
340+
return
341+
case c.localCandidateChannel <- pendingCandidate:
342+
}
343+
c.opts.Logger.Debug(context.Background(), "flushed buffered local candidate")
318344
}
319-
c.opts.Logger.Debug(context.Background(), "flushed buffered local candidate")
320-
}
321-
c.opts.Logger.Debug(context.Background(), "flushed buffered local candidates",
322-
slog.F("count", len(c.pendingCandidatesToSend)),
323-
)
324-
c.pendingCandidatesToSend = make([]webrtc.ICECandidateInit, 0)
325-
c.pendingCandidatesFlushed = true
345+
c.opts.Logger.Debug(context.Background(), "flushed buffered local candidates",
346+
slog.F("count", len(c.pendingCandidatesToSend)),
347+
)
348+
c.pendingCandidatesToSend = make([]webrtc.ICECandidateInit, 0)
349+
}()
326350
}
327351

328352
// AddRemoteCandidate adds a remote candidate to the RTC connection.
329-
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
330-
c.remoteSessionDescriptionMutex.Lock()
331-
defer c.remoteSessionDescriptionMutex.Unlock()
332-
c.opts.Logger.Debug(context.Background(), "accepting candidate", slog.F("length", len(i.Candidate)))
333-
return c.rtc.AddICECandidate(i)
353+
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) {
354+
if c.isClosed() {
355+
return
356+
}
357+
go func() {
358+
c.remoteSessionDescriptionMutex.Lock()
359+
defer c.remoteSessionDescriptionMutex.Unlock()
360+
if c.isClosed() {
361+
return
362+
}
363+
c.opts.Logger.Debug(context.Background(), "accepting candidate", slog.F("length", len(i.Candidate)))
364+
err := c.rtc.AddICECandidate(i)
365+
if err != nil {
366+
_ = c.CloseWithError(xerrors.Errorf("accept candidate: %w", err))
367+
}
368+
}()
334369
}
335370

336371
// SetRemoteSessionDescription sets the remote description for the WebRTC connection.
@@ -528,7 +563,6 @@ func (c *Conn) CloseWithError(err error) error {
528563
} else {
529564
c.closeError = err
530565
}
531-
close(c.closed)
532566

533567
if ch, _ := c.pingChannel(); ch != nil {
534568
_ = ch.closeWithError(c.closeError)
@@ -538,10 +572,6 @@ func (c *Conn) CloseWithError(err error) error {
538572
// closing an already closed connection isn't an issue for us.
539573
_ = c.rtc.Close()
540574

541-
// Waits for all DataChannels to exit before officially labeling as closed.
542-
// All logging, goroutines, and async functionality is cleaned up after this.
543-
c.dcClosedWaitGroup.Wait()
544-
545575
if c.rtc.ConnectionState() != webrtc.PeerConnectionStateNew {
546576
c.opts.Logger.Debug(context.Background(), "waiting for rtc connection close...")
547577
<-c.closedRTC
@@ -552,5 +582,11 @@ func (c *Conn) CloseWithError(err error) error {
552582
<-c.closedICE
553583
}
554584

585+
// Waits for all DataChannels to exit before officially labeling as closed.
586+
// All logging, goroutines, and async functionality is cleaned up after this.
587+
c.dcClosedWaitGroup.Wait()
588+
589+
close(c.closed)
590+
c.opts.Logger.Debug(context.Background(), "closed")
555591
return err
556592
}

0 commit comments

Comments
 (0)