@@ -126,7 +126,8 @@ type Conn struct {
126
126
localCandidateChannel chan webrtc.ICECandidateInit
127
127
localSessionDescriptionChannel chan webrtc.SessionDescription
128
128
remoteSessionDescriptionChannel chan webrtc.SessionDescription
129
- remoteSessionDescriptionMutex sync.Mutex
129
+
130
+ negotiateMutex sync.Mutex
130
131
131
132
pendingCandidatesToSend []webrtc.ICECandidateInit
132
133
pendingCandidatesToSendMutex sync.Mutex
@@ -143,19 +144,16 @@ type Conn struct {
143
144
pingError error
144
145
}
145
146
146
- // Negotiation represents a handshake message between peer connections.
147
- type Negotiation struct {
148
- SessionDescription * webrtc.SessionDescription
149
- ICECandidates []webrtc.ICECandidateInit
150
- }
151
-
152
147
func (c * Conn ) init () error {
153
148
c .rtc .OnNegotiationNeeded (c .negotiate )
154
149
c .rtc .OnICEConnectionStateChange (func (iceConnectionState webrtc.ICEConnectionState ) {
155
150
c .opts .Logger .Debug (context .Background (), "ice connection state updated" ,
156
151
slog .F ("state" , iceConnectionState ))
157
152
158
153
if iceConnectionState == webrtc .ICEConnectionStateClosed {
154
+ // pion/webrtc can update this state multiple times.
155
+ // A connection can never become un-closed, so we
156
+ // close the channel if it isn't already.
159
157
c .closedICEMutex .Lock ()
160
158
defer c .closedICEMutex .Unlock ()
161
159
select {
@@ -165,15 +163,14 @@ func (c *Conn) init() error {
165
163
}
166
164
}
167
165
})
168
- c .rtc .OnSignalingStateChange (func (signalState webrtc.SignalingState ) {
169
- c .opts .Logger .Debug (context .Background (), "signal state updated" ,
170
- slog .F ("state" , signalState ))
171
- })
172
166
c .rtc .OnICEGatheringStateChange (func (iceGatherState webrtc.ICEGathererState ) {
173
167
c .opts .Logger .Debug (context .Background (), "ice gathering state updated" ,
174
168
slog .F ("state" , iceGatherState ))
175
169
176
170
if iceGatherState == webrtc .ICEGathererStateClosed {
171
+ // pion/webrtc can update this state multiple times.
172
+ // A connection can never become un-closed, so we
173
+ // close the channel if it isn't already.
177
174
c .closedICEMutex .Lock ()
178
175
defer c .closedICEMutex .Unlock ()
179
176
select {
@@ -183,40 +180,11 @@ func (c *Conn) init() error {
183
180
}
184
181
}
185
182
})
186
- c .rtc .OnICECandidate (func (iceCandidate * webrtc.ICECandidate ) {
187
- if iceCandidate == nil {
188
- return
189
- }
190
- go func () {
191
- c .pendingCandidatesToSendMutex .Lock ()
192
- defer c .pendingCandidatesToSendMutex .Unlock ()
193
- if c .rtc .RemoteDescription () == nil {
194
- c .pendingCandidatesToSend = append (c .pendingCandidatesToSend , iceCandidate .ToJSON ())
195
- c .opts .Logger .Debug (context .Background (), "buffering local candidate" )
196
- return
197
- }
198
- c .opts .Logger .Debug (context .Background (), "sending local candidate" )
199
- select {
200
- case <- c .closed :
201
- break
202
- case c .localCandidateChannel <- iceCandidate .ToJSON ():
203
- }
204
- }()
205
- })
206
- c .rtc .OnDataChannel (func (dc * webrtc.DataChannel ) {
207
- select {
208
- case <- c .closed :
209
- return
210
- case c .dcOpenChannel <- dc :
211
- default :
212
- }
213
- })
214
183
c .rtc .OnConnectionStateChange (func (peerConnectionState webrtc.PeerConnectionState ) {
215
184
if c .isClosed () {
185
+ // Make sure we don't log after Close() has been called.
216
186
return
217
187
}
218
- // Pion executes this handler multiple times in a rare condition.
219
- // This prevents logging from happening after close.
220
188
c .opts .Logger .Debug (context .Background (), "rtc connection updated" ,
221
189
slog .F ("state" , peerConnectionState ))
222
190
@@ -236,23 +204,55 @@ func (c *Conn) init() error {
236
204
}
237
205
}
238
206
case webrtc .PeerConnectionStateClosed :
239
- // Pion executes event handlers after close is called
240
- // on the RTC connection. This ensures our Close()
241
- // handler properly cleans up before returning.
242
- //
243
- // Pion can execute this multiple times, so we check
244
- // if it's open before closing.
207
+ // pion/webrtc can update this state multiple times.
208
+ // A connection can never become un-closed, so we
209
+ // close the channel if it isn't already.
245
210
c .closedRTCMutex .Lock ()
246
211
defer c .closedRTCMutex .Unlock ()
247
212
select {
248
213
case <- c .closedRTC :
249
- c .opts .Logger .Debug (context .Background (), "closedRTC channel already closed" )
250
214
default :
251
- c .opts .Logger .Debug (context .Background (), "closedRTC channel closing..." )
252
215
close (c .closedRTC )
253
216
}
254
217
}
255
218
})
219
+ c .rtc .OnSignalingStateChange (func (signalState webrtc.SignalingState ) {
220
+ c .opts .Logger .Debug (context .Background (), "signaling state updated" ,
221
+ slog .F ("state" , signalState ))
222
+ })
223
+ c .rtc .OnICECandidate (func (iceCandidate * webrtc.ICECandidate ) {
224
+ if iceCandidate == nil {
225
+ return
226
+ }
227
+ // Run this in a goroutine so we don't block pion/webrtc
228
+ // from continuing.
229
+ go func () {
230
+ c .pendingCandidatesToSendMutex .Lock ()
231
+ defer c .pendingCandidatesToSendMutex .Unlock ()
232
+ // If the remote description hasn't been set yet, we queue the send of these candidates.
233
+ // It may work to send these immediately, but at the time of writing this package is
234
+ // unstable, so better being safe than sorry.
235
+ if c .rtc .RemoteDescription () == nil {
236
+ c .pendingCandidatesToSend = append (c .pendingCandidatesToSend , iceCandidate .ToJSON ())
237
+ c .opts .Logger .Debug (context .Background (), "buffering local candidate" )
238
+ return
239
+ }
240
+ c .opts .Logger .Debug (context .Background (), "sending local candidate" )
241
+ select {
242
+ case <- c .closed :
243
+ break
244
+ case c .localCandidateChannel <- iceCandidate .ToJSON ():
245
+ }
246
+ }()
247
+ })
248
+ c .rtc .OnDataChannel (func (dc * webrtc.DataChannel ) {
249
+ select {
250
+ case <- c .closed :
251
+ return
252
+ case c .dcOpenChannel <- dc :
253
+ default :
254
+ }
255
+ })
256
256
_ , err := c .pingChannel ()
257
257
if err != nil {
258
258
return err
@@ -265,20 +265,23 @@ func (c *Conn) init() error {
265
265
return nil
266
266
}
267
267
268
- // Negotiate exchanges ICECandidate pairs over the exposed channels.
269
- // The diagram below shows the expected handshake. pion/webrtc v3
270
- // uses trickle ICE by default. See: https://webrtchacks.com/trickle-ice/
268
+ // negotiate is triggered when a connection is ready to be established.
269
+ // See trickle ICE for the expected exchange: https://webrtchacks.com/trickle-ice/
271
270
func (c * Conn ) negotiate () {
272
271
c .opts .Logger .Debug (context .Background (), "negotiating" )
273
- c .remoteSessionDescriptionMutex .Lock ()
274
- defer c .remoteSessionDescriptionMutex .Unlock ()
272
+ // ICE candidates cannot be added until SessionDescriptions have been
273
+ // exchanged between peers.
274
+ c .negotiateMutex .Lock ()
275
+ defer c .negotiateMutex .Unlock ()
275
276
276
277
if c .offerrer {
277
278
offer , err := c .rtc .CreateOffer (& webrtc.OfferOptions {})
278
279
if err != nil {
279
280
_ = c .CloseWithError (xerrors .Errorf ("create offer: %w" , err ))
280
281
return
281
282
}
283
+ // pion/webrtc will panic if Close is called while this
284
+ // function is being executed.
282
285
c .closeMutex .Lock ()
283
286
err = c .rtc .SetLocalDescription (offer )
284
287
c .closeMutex .Unlock ()
@@ -302,8 +305,8 @@ func (c *Conn) negotiate() {
302
305
return
303
306
case sessionDescription = <- c .remoteSessionDescriptionChannel :
304
307
}
305
-
306
308
c .opts .Logger .Debug (context .Background (), "setting remote description" )
309
+
307
310
err := c .rtc .SetRemoteDescription (sessionDescription )
308
311
if err != nil {
309
312
_ = c .CloseWithError (xerrors .Errorf ("set remote description (closed %v): %w" , c .isClosed (), err ))
@@ -316,8 +319,9 @@ func (c *Conn) negotiate() {
316
319
_ = c .CloseWithError (xerrors .Errorf ("create answer: %w" , err ))
317
320
return
318
321
}
322
+ // pion/webrtc will panic if Close is called while this
323
+ // function is being executed.
319
324
c .closeMutex .Lock ()
320
- // pion doesn't handle a close properly if it occurs during this function.
321
325
err = c .rtc .SetLocalDescription (answer )
322
326
c .closeMutex .Unlock ()
323
327
if err != nil {
@@ -333,6 +337,7 @@ func (c *Conn) negotiate() {
333
337
c .opts .Logger .Debug (context .Background (), "sent answer" )
334
338
}
335
339
340
+ // Flush bufferred candidates after both sides have been negotiated!
336
341
go func () {
337
342
c .pendingCandidatesToSendMutex .Lock ()
338
343
defer c .pendingCandidatesToSendMutex .Unlock ()
@@ -356,9 +361,11 @@ func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) {
356
361
if c .isClosed () {
357
362
return
358
363
}
364
+ // This must occur in a goroutine to allow the SessionDescriptions
365
+ // to be exchanged first.
359
366
go func () {
360
- c .remoteSessionDescriptionMutex .Lock ()
361
- defer c .remoteSessionDescriptionMutex .Unlock ()
367
+ c .negotiateMutex .Lock ()
368
+ defer c .negotiateMutex .Unlock ()
362
369
if c .isClosed () {
363
370
return
364
371
}
@@ -574,11 +581,12 @@ func (c *Conn) CloseWithError(err error) error {
574
581
// closing an already closed connection isn't an issue for us.
575
582
_ = c .rtc .Close ()
576
583
584
+ // Waiting for pion/webrtc to report closed state on both of these
585
+ // ensures no goroutine leaks.
577
586
if c .rtc .ConnectionState () != webrtc .PeerConnectionStateNew {
578
587
c .opts .Logger .Debug (context .Background (), "waiting for rtc connection close..." )
579
588
<- c .closedRTC
580
589
}
581
-
582
590
if c .rtc .ICEConnectionState () != webrtc .ICEConnectionStateNew {
583
591
c .opts .Logger .Debug (context .Background (), "waiting for ice connection close..." )
584
592
<- c .closedICE
0 commit comments