@@ -108,11 +108,13 @@ type Conn struct {
108
108
// Determines whether this connection will send the offer or the answer.
109
109
offerrer bool
110
110
111
- closed chan struct {}
112
- closedRTC chan struct {}
113
- closedICE chan struct {}
114
- closeMutex sync.Mutex
115
- closeError error
111
+ closed chan struct {}
112
+ closedRTC chan struct {}
113
+ closedRTCMutex sync.Mutex
114
+ closedICE chan struct {}
115
+ closedICEMutex sync.Mutex
116
+ closeMutex sync.Mutex
117
+ closeError error
116
118
117
119
dcOpenChannel chan * webrtc.DataChannel
118
120
dcDisconnectChannel chan struct {}
@@ -128,7 +130,6 @@ type Conn struct {
128
130
129
131
pendingCandidatesToSend []webrtc.ICECandidateInit
130
132
pendingCandidatesToSendMutex sync.Mutex
131
- pendingCandidatesFlushed bool
132
133
133
134
pingChannelID uint16
134
135
pingEchoChannelID uint16
@@ -155,6 +156,8 @@ func (c *Conn) init() error {
155
156
slog .F ("state" , iceConnectionState ))
156
157
157
158
if iceConnectionState == webrtc .ICEConnectionStateClosed {
159
+ c .closedICEMutex .Lock ()
160
+ defer c .closedICEMutex .Unlock ()
158
161
select {
159
162
case <- c .closedICE :
160
163
default :
@@ -169,14 +172,24 @@ func (c *Conn) init() error {
169
172
c .rtc .OnICEGatheringStateChange (func (iceGatherState webrtc.ICEGathererState ) {
170
173
c .opts .Logger .Debug (context .Background (), "ice gathering state updated" ,
171
174
slog .F ("state" , iceGatherState ))
175
+
176
+ if iceGatherState == webrtc .ICEGathererStateClosed {
177
+ c .closedICEMutex .Lock ()
178
+ defer c .closedICEMutex .Unlock ()
179
+ select {
180
+ case <- c .closedICE :
181
+ default :
182
+ close (c .closedICE )
183
+ }
184
+ }
172
185
})
173
186
c .rtc .OnICECandidate (func (iceCandidate * webrtc.ICECandidate ) {
174
187
if iceCandidate == nil {
175
188
return
176
189
}
177
190
c .pendingCandidatesToSendMutex .Lock ()
178
191
defer c .pendingCandidatesToSendMutex .Unlock ()
179
- if ! c . pendingCandidatesFlushed {
192
+ if c . rtc . RemoteDescription () == nil {
180
193
c .pendingCandidatesToSend = append (c .pendingCandidatesToSend , iceCandidate .ToJSON ())
181
194
c .opts .Logger .Debug (context .Background (), "buffering local candidate" )
182
195
return
@@ -197,6 +210,11 @@ func (c *Conn) init() error {
197
210
}
198
211
})
199
212
c .rtc .OnConnectionStateChange (func (peerConnectionState webrtc.PeerConnectionState ) {
213
+ if c .isClosed () {
214
+ return
215
+ }
216
+ // Pion executes this handler multiple times in a rare condition.
217
+ // This prevents logging from happening after close.
200
218
c .opts .Logger .Debug (context .Background (), "rtc connection updated" ,
201
219
slog .F ("state" , peerConnectionState ))
202
220
@@ -215,18 +233,20 @@ func (c *Conn) init() error {
215
233
default :
216
234
}
217
235
}
218
- }
219
-
220
- if peerConnectionState == webrtc .PeerConnectionStateClosed {
236
+ case webrtc .PeerConnectionStateClosed :
221
237
// Pion executes event handlers after close is called
222
238
// on the RTC connection. This ensures our Close()
223
239
// handler properly cleans up before returning.
224
240
//
225
241
// Pion can execute this multiple times, so we check
226
242
// if it's open before closing.
243
+ c .closedRTCMutex .Lock ()
244
+ defer c .closedRTCMutex .Unlock ()
227
245
select {
228
246
case <- c .closedRTC :
247
+ c .opts .Logger .Debug (context .Background (), "closedRTC channel already closed" )
229
248
default :
249
+ c .opts .Logger .Debug (context .Background (), "closedRTC channel closing..." )
230
250
close (c .closedRTC )
231
251
}
232
252
}
@@ -248,14 +268,18 @@ func (c *Conn) init() error {
248
268
// uses trickle ICE by default. See: https://webrtchacks.com/trickle-ice/
249
269
func (c * Conn ) negotiate () {
250
270
c .opts .Logger .Debug (context .Background (), "negotiating" )
271
+ c .remoteSessionDescriptionMutex .Lock ()
272
+ defer c .remoteSessionDescriptionMutex .Unlock ()
251
273
252
274
if c .offerrer {
253
275
offer , err := c .rtc .CreateOffer (& webrtc.OfferOptions {})
254
276
if err != nil {
255
277
_ = c .CloseWithError (xerrors .Errorf ("create offer: %w" , err ))
256
278
return
257
279
}
280
+ c .closeMutex .Lock ()
258
281
err = c .rtc .SetLocalDescription (offer )
282
+ c .closeMutex .Unlock ()
259
283
if err != nil {
260
284
_ = c .CloseWithError (xerrors .Errorf ("set local description: %w" , err ))
261
285
return
@@ -266,25 +290,23 @@ func (c *Conn) negotiate() {
266
290
return
267
291
case c .localSessionDescriptionChannel <- offer :
268
292
}
293
+ c .opts .Logger .Debug (context .Background (), "sent offer" )
269
294
}
270
295
271
296
var sessionDescription webrtc.SessionDescription
297
+ c .opts .Logger .Debug (context .Background (), "awaiting remote description..." )
272
298
select {
273
299
case <- c .closed :
274
300
return
275
301
case sessionDescription = <- c .remoteSessionDescriptionChannel :
276
302
}
277
303
278
- // This prevents candidates from being added while
279
- // the remote description is being set.
280
- c .remoteSessionDescriptionMutex .Lock ()
281
304
c .opts .Logger .Debug (context .Background (), "setting remote description" )
282
305
err := c .rtc .SetRemoteDescription (sessionDescription )
283
306
if err != nil {
284
307
_ = c .CloseWithError (xerrors .Errorf ("set remote description (closed %v): %w" , c .isClosed (), err ))
285
308
return
286
309
}
287
- c .remoteSessionDescriptionMutex .Unlock ()
288
310
289
311
if ! c .offerrer {
290
312
answer , err := c .rtc .CreateAnswer (& webrtc.AnswerOptions {})
@@ -306,31 +328,44 @@ func (c *Conn) negotiate() {
306
328
return
307
329
case c .localSessionDescriptionChannel <- answer :
308
330
}
331
+ c .opts .Logger .Debug (context .Background (), "sent answer" )
309
332
}
310
333
311
- c .pendingCandidatesToSendMutex .Lock ()
312
- defer c .pendingCandidatesToSendMutex .Unlock ()
313
- for _ , pendingCandidate := range c .pendingCandidatesToSend {
314
- select {
315
- case <- c .closed :
316
- return
317
- case c .localCandidateChannel <- pendingCandidate :
334
+ go func () {
335
+ c .pendingCandidatesToSendMutex .Lock ()
336
+ defer c .pendingCandidatesToSendMutex .Unlock ()
337
+ for _ , pendingCandidate := range c .pendingCandidatesToSend {
338
+ select {
339
+ case <- c .closed :
340
+ return
341
+ case c .localCandidateChannel <- pendingCandidate :
342
+ }
343
+ c .opts .Logger .Debug (context .Background (), "flushed buffered local candidate" )
318
344
}
319
- c .opts .Logger .Debug (context .Background (), "flushed buffered local candidate" )
320
- }
321
- c .opts .Logger .Debug (context .Background (), "flushed buffered local candidates" ,
322
- slog .F ("count" , len (c .pendingCandidatesToSend )),
323
- )
324
- c .pendingCandidatesToSend = make ([]webrtc.ICECandidateInit , 0 )
325
- c .pendingCandidatesFlushed = true
345
+ c .opts .Logger .Debug (context .Background (), "flushed buffered local candidates" ,
346
+ slog .F ("count" , len (c .pendingCandidatesToSend )),
347
+ )
348
+ c .pendingCandidatesToSend = make ([]webrtc.ICECandidateInit , 0 )
349
+ }()
326
350
}
327
351
328
352
// AddRemoteCandidate adds a remote candidate to the RTC connection.
329
- func (c * Conn ) AddRemoteCandidate (i webrtc.ICECandidateInit ) error {
330
- c .remoteSessionDescriptionMutex .Lock ()
331
- defer c .remoteSessionDescriptionMutex .Unlock ()
332
- c .opts .Logger .Debug (context .Background (), "accepting candidate" , slog .F ("length" , len (i .Candidate )))
333
- return c .rtc .AddICECandidate (i )
353
+ func (c * Conn ) AddRemoteCandidate (i webrtc.ICECandidateInit ) {
354
+ if c .isClosed () {
355
+ return
356
+ }
357
+ go func () {
358
+ c .remoteSessionDescriptionMutex .Lock ()
359
+ defer c .remoteSessionDescriptionMutex .Unlock ()
360
+ if c .isClosed () {
361
+ return
362
+ }
363
+ c .opts .Logger .Debug (context .Background (), "accepting candidate" , slog .F ("length" , len (i .Candidate )))
364
+ err := c .rtc .AddICECandidate (i )
365
+ if err != nil {
366
+ _ = c .CloseWithError (xerrors .Errorf ("accept candidate: %w" , err ))
367
+ }
368
+ }()
334
369
}
335
370
336
371
// SetRemoteSessionDescription sets the remote description for the WebRTC connection.
@@ -528,7 +563,6 @@ func (c *Conn) CloseWithError(err error) error {
528
563
} else {
529
564
c .closeError = err
530
565
}
531
- close (c .closed )
532
566
533
567
if ch , _ := c .pingChannel (); ch != nil {
534
568
_ = ch .closeWithError (c .closeError )
@@ -538,10 +572,6 @@ func (c *Conn) CloseWithError(err error) error {
538
572
// closing an already closed connection isn't an issue for us.
539
573
_ = c .rtc .Close ()
540
574
541
- // Waits for all DataChannels to exit before officially labeling as closed.
542
- // All logging, goroutines, and async functionality is cleaned up after this.
543
- c .dcClosedWaitGroup .Wait ()
544
-
545
575
if c .rtc .ConnectionState () != webrtc .PeerConnectionStateNew {
546
576
c .opts .Logger .Debug (context .Background (), "waiting for rtc connection close..." )
547
577
<- c .closedRTC
@@ -552,5 +582,11 @@ func (c *Conn) CloseWithError(err error) error {
552
582
<- c .closedICE
553
583
}
554
584
585
+ // Waits for all DataChannels to exit before officially labeling as closed.
586
+ // All logging, goroutines, and async functionality is cleaned up after this.
587
+ c .dcClosedWaitGroup .Wait ()
588
+
589
+ close (c .closed )
590
+ c .opts .Logger .Debug (context .Background (), "closed" )
555
591
return err
556
592
}
0 commit comments