@@ -3,10 +3,8 @@ package peer
3
3
import (
4
4
"bytes"
5
5
"context"
6
- "fmt"
7
6
8
7
"crypto/rand"
9
- "crypto/sha256"
10
8
"io"
11
9
"sync"
12
10
"time"
@@ -124,7 +122,8 @@ type Conn struct {
124
122
dcFailedListeners atomic.Uint32
125
123
dcClosedWaitGroup sync.WaitGroup
126
124
127
- localNegotiator chan Negotiation
125
+ localNegotiator chan Negotiation
126
+
128
127
remoteSessionDescription chan webrtc.SessionDescription
129
128
remoteSessionDescriptionMutex sync.Mutex
130
129
@@ -179,16 +178,12 @@ func (c *Conn) init() error {
179
178
c .pendingCandidatesToSendMutex .Lock ()
180
179
defer c .pendingCandidatesToSendMutex .Unlock ()
181
180
json := iceCandidate .ToJSON ()
182
- fields := []slog.Field {
183
- slog .F ("hash" , c .hashCandidate (json )),
184
- slog .F ("length" , len (json .Candidate )),
185
- }
186
181
if ! c .pendingCandidatesFlushed {
187
182
c .pendingCandidatesToSend = append (c .pendingCandidatesToSend , json )
188
- c .opts .Logger .Debug (context .Background (), "buffering local candidate to send" , fields ... )
183
+ c .opts .Logger .Debug (context .Background (), "buffering local candidate" )
189
184
return
190
185
}
191
- c .opts .Logger .Debug (context .Background (), "sending local candidate directly" , fields ... )
186
+ c .opts .Logger .Debug (context .Background (), "sending local candidate" )
192
187
select {
193
188
case <- c .closed :
194
189
break
@@ -250,93 +245,24 @@ func (c *Conn) init() error {
250
245
return nil
251
246
}
252
247
253
- func (c * Conn ) pingChannel () (* Channel , error ) {
254
- c .pingOnce .Do (func () {
255
- c .pingChan , c .pingError = c .dialChannel (context .Background (), "ping" , & ChannelOptions {
256
- ID : c .pingChannelID ,
257
- Negotiated : true ,
258
- OpenOnDisconnect : true ,
259
- })
260
- if c .pingError != nil {
261
- return
262
- }
263
- })
264
- return c .pingChan , c .pingError
265
- }
266
-
267
- func (c * Conn ) pingEchoChannel () (* Channel , error ) {
268
- c .pingEchoOnce .Do (func () {
269
- c .pingEchoChan , c .pingEchoError = c .dialChannel (context .Background (), "echo" , & ChannelOptions {
270
- ID : c .pingEchoChannelID ,
271
- Negotiated : true ,
272
- OpenOnDisconnect : true ,
273
- })
274
- if c .pingEchoError != nil {
275
- return
276
- }
277
- go func () {
278
- for {
279
- data := make ([]byte , pingDataLength )
280
- bytesRead , err := c .pingEchoChan .Read (data )
281
- if err != nil {
282
- if c .isClosed () {
283
- return
284
- }
285
- _ = c .CloseWithError (xerrors .Errorf ("read ping echo channel: %w" , err ))
286
- return
287
- }
288
- _ , err = c .pingEchoChan .Write (data [:bytesRead ])
289
- if err != nil {
290
- _ = c .CloseWithError (xerrors .Errorf ("write ping echo channel: %w" , err ))
291
- return
292
- }
293
- }
294
- }()
295
- })
296
- return c .pingEchoChan , c .pingEchoError
297
- }
298
-
299
- // Computes a hash of the ICE candidate. Used for debug logging.
300
- func (* Conn ) hashCandidate (iceCandidate webrtc.ICECandidateInit ) string {
301
- hash := sha256 .Sum224 ([]byte (iceCandidate .Candidate ))
302
- return fmt .Sprintf ("%x" , hash [:8 ])
303
- }
304
-
305
248
// Negotiate exchanges ICECandidate pairs over the exposed channels.
306
249
// The diagram below shows the expected handshake. pion/webrtc v3
307
250
// uses trickle ICE by default. See: https://webrtchacks.com/trickle-ice/
308
- // ┌────────┐ ┌────────┐
309
- // │offerrer│ │answerer│
310
- // │(client)│ │(server)│
311
- // └─┬────┬─┘ └─┬──────┘
312
- // │ │ offer │
313
- // ┌──────────▼┐ ├──────────────►├──►┌───────────┐
314
- // │STUN Server│ │ │ │STUN Server│
315
- // │(optional) ├──►│ candidate │◄──┤(optional) │
316
- // └───────────┘ │ (async) │ └───────────┘
317
- // │◄─────────────►│
318
- // │ │
319
- // │ answer │
320
- // └◄──────────────┘
321
251
func (c * Conn ) negotiate () {
322
252
c .opts .Logger .Debug (context .Background (), "negotiating" )
323
253
324
254
if c .offerrer {
325
- c .closeMutex .Lock ()
326
255
offer , err := c .rtc .CreateOffer (& webrtc.OfferOptions {})
327
- c .closeMutex .Unlock ()
328
256
if err != nil {
329
257
_ = c .CloseWithError (xerrors .Errorf ("create offer: %w" , err ))
330
258
return
331
259
}
332
- c .opts .Logger .Debug (context .Background (), "setting local description" , slog .F ("closed" , c .isClosed ()))
333
- c .closeMutex .Lock ()
334
260
err = c .rtc .SetLocalDescription (offer )
335
- c .closeMutex .Unlock ()
336
261
if err != nil {
337
262
_ = c .CloseWithError (xerrors .Errorf ("set local description: %w" , err ))
338
263
return
339
264
}
265
+ c .opts .Logger .Debug (context .Background (), "sending offer" )
340
266
select {
341
267
case <- c .closed :
342
268
return
@@ -355,25 +281,21 @@ func (c *Conn) negotiate() {
355
281
// the remote description is being set.
356
282
c .remoteSessionDescriptionMutex .Lock ()
357
283
c .opts .Logger .Debug (context .Background (), "setting remote description" )
358
- c .closeMutex .Lock ()
359
284
err := c .rtc .SetRemoteDescription (sessionDescription )
360
- c .closeMutex .Unlock ()
361
285
if err != nil {
362
286
_ = c .CloseWithError (xerrors .Errorf ("set remote description (closed %v): %w" , c .isClosed (), err ))
363
287
return
364
288
}
365
289
c .remoteSessionDescriptionMutex .Unlock ()
366
290
367
291
if ! c .offerrer {
368
- c .closeMutex .Lock ()
369
292
answer , err := c .rtc .CreateAnswer (& webrtc.AnswerOptions {})
370
- c .closeMutex .Unlock ()
371
293
if err != nil {
372
294
_ = c .CloseWithError (xerrors .Errorf ("create answer: %w" , err ))
373
295
return
374
296
}
375
- c .opts .Logger .Debug (context .Background (), "setting local description" , slog .F ("closed" , c .isClosed ()))
376
297
c .closeMutex .Lock ()
298
+ // pion doesn't handle a close properly if it occurs during this function.
377
299
err = c .rtc .SetLocalDescription (answer )
378
300
c .closeMutex .Unlock ()
379
301
if err != nil {
@@ -404,10 +326,13 @@ func (c *Conn) negotiate() {
404
326
c .pendingCandidatesFlushed = true
405
327
}
406
328
329
+ // LocalNegotiation returns a channel for connection negotiation.
330
+ // This should be piped to another peer connection.
407
331
func (c * Conn ) LocalNegotiation () <- chan Negotiation {
408
332
return c .localNegotiator
409
333
}
410
334
335
+ // AddRemoteNegotiation accepts a negotiation message for handshaking a connection.
411
336
func (c * Conn ) AddRemoteNegotiation (negotiation Negotiation ) error {
412
337
if negotiation .SessionDescription != nil {
413
338
c .opts .Logger .Debug (context .Background (), "adding remote negotiation with session description" )
@@ -434,6 +359,52 @@ func (c *Conn) AddRemoteNegotiation(negotiation Negotiation) error {
434
359
return nil
435
360
}
436
361
362
+ func (c * Conn ) pingChannel () (* Channel , error ) {
363
+ c .pingOnce .Do (func () {
364
+ c .pingChan , c .pingError = c .dialChannel (context .Background (), "ping" , & ChannelOptions {
365
+ ID : c .pingChannelID ,
366
+ Negotiated : true ,
367
+ OpenOnDisconnect : true ,
368
+ })
369
+ if c .pingError != nil {
370
+ return
371
+ }
372
+ })
373
+ return c .pingChan , c .pingError
374
+ }
375
+
376
+ func (c * Conn ) pingEchoChannel () (* Channel , error ) {
377
+ c .pingEchoOnce .Do (func () {
378
+ c .pingEchoChan , c .pingEchoError = c .dialChannel (context .Background (), "echo" , & ChannelOptions {
379
+ ID : c .pingEchoChannelID ,
380
+ Negotiated : true ,
381
+ OpenOnDisconnect : true ,
382
+ })
383
+ if c .pingEchoError != nil {
384
+ return
385
+ }
386
+ go func () {
387
+ for {
388
+ data := make ([]byte , pingDataLength )
389
+ bytesRead , err := c .pingEchoChan .Read (data )
390
+ if err != nil {
391
+ if c .isClosed () {
392
+ return
393
+ }
394
+ _ = c .CloseWithError (xerrors .Errorf ("read ping echo channel: %w" , err ))
395
+ return
396
+ }
397
+ _ , err = c .pingEchoChan .Write (data [:bytesRead ])
398
+ if err != nil {
399
+ _ = c .CloseWithError (xerrors .Errorf ("write ping echo channel: %w" , err ))
400
+ return
401
+ }
402
+ }
403
+ }()
404
+ })
405
+ return c .pingEchoChan , c .pingEchoError
406
+ }
407
+
437
408
// SetConfiguration applies options to the WebRTC connection.
438
409
// Generally used for updating transport options, like ICE servers.
439
410
func (c * Conn ) SetConfiguration (configuration webrtc.Configuration ) error {
0 commit comments