@@ -78,11 +78,9 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
78
78
dcFailedChannel : make (chan struct {}),
79
79
// This channel needs to be bufferred otherwise slow consumers
80
80
// of this will cause a connection failure.
81
- localCandidateChannel : make (chan webrtc.ICECandidateInit , 16 ),
82
- pendingCandidatesToSend : make ([]webrtc.ICECandidateInit , 0 ),
83
- pendingCandidatesToAccept : make ([]webrtc.ICECandidateInit , 0 ),
84
- localSessionDescriptionChannel : make (chan webrtc.SessionDescription , 1 ),
85
- remoteSessionDescriptionChannel : make (chan webrtc.SessionDescription , 1 ),
81
+ localNegotiator : make (chan Negotiation , 8 ),
82
+ remoteSessionDescription : make (chan webrtc.SessionDescription , 1 ),
83
+ pendingCandidatesToSend : make ([]webrtc.ICECandidateInit , 0 ),
86
84
}
87
85
if client {
88
86
// If we're the client, we want to flip the echo and
@@ -126,15 +124,12 @@ type Conn struct {
126
124
dcFailedListeners atomic.Uint32
127
125
dcClosedWaitGroup sync.WaitGroup
128
126
129
- localCandidateChannel chan webrtc.ICECandidateInit
130
- localSessionDescriptionChannel chan webrtc.SessionDescription
131
- remoteSessionDescriptionChannel chan webrtc.SessionDescription
127
+ localNegotiator chan Negotiation
128
+ remoteSessionDescription chan webrtc.SessionDescription
132
129
133
130
pendingCandidatesToSend []webrtc.ICECandidateInit
134
131
pendingCandidatesToSendMutex sync.Mutex
135
-
136
- pendingCandidatesToAccept []webrtc.ICECandidateInit
137
- pendingCandidatesToAcceptMutex sync.Mutex
132
+ pendingCandidatesFlushed bool
138
133
139
134
pingChannelID uint16
140
135
pingEchoChannelID uint16
@@ -148,6 +143,12 @@ type Conn struct {
148
143
pingError error
149
144
}
150
145
146
+ // Negotiation represents a handshake message between peer connections.
147
+ type Negotiation struct {
148
+ SessionDescription * webrtc.SessionDescription
149
+ ICECandidates []webrtc.ICECandidateInit
150
+ }
151
+
151
152
func (c * Conn ) init () error {
152
153
c .rtc .OnNegotiationNeeded (c .negotiate )
153
154
c .rtc .OnICEConnectionStateChange (func (iceConnectionState webrtc.ICEConnectionState ) {
@@ -181,7 +182,7 @@ func (c *Conn) init() error {
181
182
slog .F ("hash" , c .hashCandidate (json )),
182
183
slog .F ("length" , len (json .Candidate )),
183
184
}
184
- if c . rtc . RemoteDescription () == nil {
185
+ if ! c . pendingCandidatesFlushed {
185
186
c .pendingCandidatesToSend = append (c .pendingCandidatesToSend , json )
186
187
c .opts .Logger .Debug (context .Background (), "buffering local candidate to send" , fields ... )
187
188
return
@@ -190,7 +191,7 @@ func (c *Conn) init() error {
190
191
select {
191
192
case <- c .closed :
192
193
break
193
- case c .localCandidateChannel <- json :
194
+ case c .localNegotiator <- Negotiation { nil , []webrtc. ICECandidateInit { json }} :
194
195
}
195
196
})
196
197
c .rtc .OnDataChannel (func (dc * webrtc.DataChannel ) {
@@ -341,20 +342,20 @@ func (c *Conn) negotiate() {
341
342
select {
342
343
case <- c .closed :
343
344
return
344
- case c .localSessionDescriptionChannel <- offer :
345
+ case c .localNegotiator <- Negotiation { & offer , nil } :
345
346
}
346
347
}
347
348
348
- var remoteDescription webrtc.SessionDescription
349
+ var sessionDescription webrtc.SessionDescription
349
350
select {
350
351
case <- c .closed :
351
352
return
352
- case remoteDescription = <- c .remoteSessionDescriptionChannel :
353
+ case sessionDescription = <- c .remoteSessionDescription :
353
354
}
354
355
355
356
c .opts .Logger .Debug (context .Background (), "setting remote description" )
356
357
c .closeMutex .Lock ()
357
- err := c .rtc .SetRemoteDescription (remoteDescription )
358
+ err := c .rtc .SetRemoteDescription (sessionDescription )
358
359
c .closeMutex .Unlock ()
359
360
if err != nil {
360
361
_ = c .CloseWithError (xerrors .Errorf ("set remote description (closed %v): %w" , c .isClosed (), err ))
@@ -378,80 +379,58 @@ func (c *Conn) negotiate() {
378
379
_ = c .CloseWithError (xerrors .Errorf ("set local description: %w" , err ))
379
380
return
380
381
}
381
-
382
+ c . opts . Logger . Debug ( context . Background (), "sending answer" )
382
383
select {
383
384
case <- c .closed :
384
385
return
385
- case c .localSessionDescriptionChannel <- answer :
386
+ case c .localNegotiator <- Negotiation { & answer , nil } :
386
387
}
387
388
}
388
389
389
390
c .pendingCandidatesToSendMutex .Lock ()
390
- for _ , pendingCandidate := range c .pendingCandidatesToSend {
391
- c .opts .Logger .Debug (context .Background (), "sending buffered local candidate" ,
392
- slog .F ("hash" , c .hashCandidate (pendingCandidate )),
393
- slog .F ("length" , len (pendingCandidate .Candidate )),
394
- )
391
+ defer c .pendingCandidatesToSendMutex .Unlock ()
392
+ if len (c .pendingCandidatesToSend ) > 0 {
395
393
select {
396
394
case <- c .closed :
397
395
return
398
- case c .localCandidateChannel <- pendingCandidate :
396
+ case c .localNegotiator <- Negotiation { nil , c . pendingCandidatesToSend } :
399
397
}
400
398
}
401
399
c .opts .Logger .Debug (context .Background (), "flushed buffered local candidates" ,
402
400
slog .F ("count" , len (c .pendingCandidatesToSend )),
403
401
)
404
402
c .pendingCandidatesToSend = make ([]webrtc.ICECandidateInit , 0 )
405
- c .pendingCandidatesToSendMutex .Unlock ()
406
-
407
- c .pendingCandidatesToAcceptMutex .Lock ()
408
- defer c .pendingCandidatesToAcceptMutex .Unlock ()
409
- for _ , pendingCandidate := range c .pendingCandidatesToAccept {
410
- c .opts .Logger .Debug (context .Background (), "adding buffered remote candidate" ,
411
- slog .F ("hash" , c .hashCandidate (pendingCandidate )),
412
- slog .F ("length" , len (pendingCandidate .Candidate )),
413
- )
414
- err = c .rtc .AddICECandidate (pendingCandidate )
415
- if err != nil {
416
- _ = c .CloseWithError (xerrors .Errorf ("accept buffered remote candidate: %w" , err ))
417
- return
418
- }
419
- }
420
- c .opts .Logger .Debug (context .Background (), "flushed buffered remote candidates" ,
421
- slog .F ("count" , len (c .pendingCandidatesToAccept )),
422
- )
423
- c .pendingCandidatesToAccept = make ([]webrtc.ICECandidateInit , 0 )
403
+ c .pendingCandidatesFlushed = true
424
404
}
425
405
426
- // LocalCandidate returns a channel that emits when a local candidate
427
- // needs to be exchanged with a remote connection.
428
- func (c * Conn ) LocalCandidate () <- chan webrtc.ICECandidateInit {
429
- return c .localCandidateChannel
406
+ func (c * Conn ) LocalNegotiation () <- chan Negotiation {
407
+ return c .localNegotiator
430
408
}
431
409
432
- // AddRemoteCandidate adds a remote candidate to the RTC connection.
433
- func (c * Conn ) AddRemoteCandidate (iceCandidate webrtc.ICECandidateInit ) error {
434
- c .pendingCandidatesToAcceptMutex .Lock ()
435
- defer c .pendingCandidatesToAcceptMutex .Unlock ()
436
- fields := []slog.Field {
437
- slog .F ("hash" , c .hashCandidate (iceCandidate )),
438
- slog .F ("length" , len (iceCandidate .Candidate )),
439
- }
440
- // The consumer doesn't need to set the session description before
441
- // adding remote candidates. This buffers it so an error doesn't occur.
442
- if c .rtc .RemoteDescription () == nil {
443
- c .opts .Logger .Debug (context .Background (), "buffering remote candidate to accept" , fields ... )
444
- c .pendingCandidatesToAccept = append (c .pendingCandidatesToAccept , iceCandidate )
445
- return nil
446
- }
447
- c .opts .Logger .Debug (context .Background (), "adding remote candidate" , fields ... )
448
- return c .rtc .AddICECandidate (iceCandidate )
449
- }
410
+ func (c * Conn ) AddRemoteNegotiation (negotiation Negotiation ) error {
411
+ if negotiation .SessionDescription != nil {
412
+ c .opts .Logger .Debug (context .Background (), "adding remote negotiation with session description" )
413
+ select {
414
+ case <- c .closed :
415
+ return nil
416
+ case c .remoteSessionDescription <- * negotiation .SessionDescription :
417
+ }
418
+ }
419
+
420
+ if len (negotiation .ICECandidates ) > 0 {
421
+ c .opts .Logger .Debug (context .Background (), "adding remote negotiation with ice candidates" ,
422
+ slog .F ("count" , len (negotiation .ICECandidates )))
423
+ c .closeMutex .Lock ()
424
+ defer c .closeMutex .Unlock ()
425
+ for _ , iceCandidate := range negotiation .ICECandidates {
426
+ err := c .rtc .AddICECandidate (iceCandidate )
427
+ if err != nil {
428
+ return err
429
+ }
430
+ }
431
+ }
450
432
451
- // LocalSessionDescription returns a channel that emits a session description
452
- // when one is required to be exchanged.
453
- func (c * Conn ) LocalSessionDescription () <- chan webrtc.SessionDescription {
454
- return c .localSessionDescriptionChannel
433
+ return nil
455
434
}
456
435
457
436
// SetConfiguration applies options to the WebRTC connection.
@@ -460,19 +439,6 @@ func (c *Conn) SetConfiguration(configuration webrtc.Configuration) error {
460
439
return c .rtc .SetConfiguration (configuration )
461
440
}
462
441
463
- // SetRemoteSessionDescription sets the remote description for the WebRTC connection.
464
- func (c * Conn ) SetRemoteSessionDescription (sessionDescription webrtc.SessionDescription ) {
465
- if c .isClosed () {
466
- return
467
- }
468
- c .closeMutex .Lock ()
469
- defer c .closeMutex .Unlock ()
470
- select {
471
- case <- c .closed :
472
- case c .remoteSessionDescriptionChannel <- sessionDescription :
473
- }
474
- }
475
-
476
442
// Accept blocks waiting for a channel to be opened.
477
443
func (c * Conn ) Accept (ctx context.Context ) (* Channel , error ) {
478
444
var dataChannel * webrtc.DataChannel
0 commit comments