@@ -63,22 +63,21 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
63
63
return nil , xerrors .Errorf ("create peer connection: %w" , err )
64
64
}
65
65
conn := & Conn {
66
- pingChannelID : 1 ,
67
- pingEchoChannelID : 2 ,
68
- opts : opts ,
69
- rtc : rtc ,
70
- offerrer : client ,
71
- closed : make (chan struct {}),
72
- closedRTC : make (chan struct {}),
73
- closedICE : make (chan struct {}),
74
- dcOpenChannel : make (chan * webrtc.DataChannel ),
75
- dcDisconnectChannel : make (chan struct {}),
76
- dcFailedChannel : make (chan struct {}),
77
- // This channel needs to be bufferred otherwise slow consumers
78
- // of this will cause a connection failure.
79
- localNegotiator : make (chan Negotiation , 8 ),
80
- remoteSessionDescription : make (chan webrtc.SessionDescription ),
81
- pendingCandidatesToSend : make ([]webrtc.ICECandidateInit , 0 ),
66
+ pingChannelID : 1 ,
67
+ pingEchoChannelID : 2 ,
68
+ opts : opts ,
69
+ rtc : rtc ,
70
+ offerrer : client ,
71
+ closed : make (chan struct {}),
72
+ closedRTC : make (chan struct {}),
73
+ closedICE : make (chan struct {}),
74
+ dcOpenChannel : make (chan * webrtc.DataChannel ),
75
+ dcDisconnectChannel : make (chan struct {}),
76
+ dcFailedChannel : make (chan struct {}),
77
+ localCandidateChannel : make (chan webrtc.ICECandidateInit ),
78
+ localSessionDescriptionChannel : make (chan webrtc.SessionDescription ),
79
+ remoteSessionDescriptionChannel : make (chan webrtc.SessionDescription ),
80
+ pendingCandidatesToSend : make ([]webrtc.ICECandidateInit , 0 ),
82
81
}
83
82
if client {
84
83
// If we're the client, we want to flip the echo and
@@ -122,10 +121,10 @@ type Conn struct {
122
121
dcFailedListeners atomic.Uint32
123
122
dcClosedWaitGroup sync.WaitGroup
124
123
125
- localNegotiator chan Negotiation
126
-
127
- remoteSessionDescription chan webrtc.SessionDescription
128
- remoteSessionDescriptionMutex sync.Mutex
124
+ localCandidateChannel chan webrtc. ICECandidateInit
125
+ localSessionDescriptionChannel chan webrtc. SessionDescription
126
+ remoteSessionDescriptionChannel chan webrtc.SessionDescription
127
+ remoteSessionDescriptionMutex sync.Mutex
129
128
130
129
pendingCandidatesToSend []webrtc.ICECandidateInit
131
130
pendingCandidatesToSendMutex sync.Mutex
@@ -177,17 +176,16 @@ func (c *Conn) init() error {
177
176
}
178
177
c .pendingCandidatesToSendMutex .Lock ()
179
178
defer c .pendingCandidatesToSendMutex .Unlock ()
180
- json := iceCandidate .ToJSON ()
181
179
if ! c .pendingCandidatesFlushed {
182
- c .pendingCandidatesToSend = append (c .pendingCandidatesToSend , json )
180
+ c .pendingCandidatesToSend = append (c .pendingCandidatesToSend , iceCandidate . ToJSON () )
183
181
c .opts .Logger .Debug (context .Background (), "buffering local candidate" )
184
182
return
185
183
}
186
184
c .opts .Logger .Debug (context .Background (), "sending local candidate" )
187
185
select {
188
186
case <- c .closed :
189
187
break
190
- case c .localNegotiator <- Negotiation { nil , []webrtc. ICECandidateInit { json }} :
188
+ case c .localCandidateChannel <- iceCandidate . ToJSON () :
191
189
}
192
190
})
193
191
c .rtc .OnDataChannel (func (dc * webrtc.DataChannel ) {
@@ -266,15 +264,15 @@ func (c *Conn) negotiate() {
266
264
select {
267
265
case <- c .closed :
268
266
return
269
- case c .localNegotiator <- Negotiation { & offer , nil } :
267
+ case c .localSessionDescriptionChannel <- offer :
270
268
}
271
269
}
272
270
273
271
var sessionDescription webrtc.SessionDescription
274
272
select {
275
273
case <- c .closed :
276
274
return
277
- case sessionDescription = <- c .remoteSessionDescription :
275
+ case sessionDescription = <- c .remoteSessionDescriptionChannel :
278
276
}
279
277
280
278
// This prevents candidates from being added while
@@ -306,18 +304,19 @@ func (c *Conn) negotiate() {
306
304
select {
307
305
case <- c .closed :
308
306
return
309
- case c .localNegotiator <- Negotiation { & answer , nil } :
307
+ case c .localSessionDescriptionChannel <- answer :
310
308
}
311
309
}
312
310
313
311
c .pendingCandidatesToSendMutex .Lock ()
314
312
defer c .pendingCandidatesToSendMutex .Unlock ()
315
- if len ( c .pendingCandidatesToSend ) > 0 {
313
+ for _ , pendingCandidate := range c .pendingCandidatesToSend {
316
314
select {
317
315
case <- c .closed :
318
316
return
319
- case c .localNegotiator <- Negotiation { nil , c . pendingCandidatesToSend } :
317
+ case c .localCandidateChannel <- pendingCandidate :
320
318
}
319
+ c .opts .Logger .Debug (context .Background (), "flushed buffered local candidate" )
321
320
}
322
321
c .opts .Logger .Debug (context .Background (), "flushed buffered local candidates" ,
323
322
slog .F ("count" , len (c .pendingCandidatesToSend )),
@@ -326,37 +325,32 @@ func (c *Conn) negotiate() {
326
325
c .pendingCandidatesFlushed = true
327
326
}
328
327
329
- // LocalNegotiation returns a channel for connection negotiation.
330
- // This should be piped to another peer connection.
331
- func (c * Conn ) LocalNegotiation () <- chan Negotiation {
332
- return c .localNegotiator
328
+ // AddRemoteCandidate adds a remote candidate to the RTC connection.
329
+ func (c * Conn ) AddRemoteCandidate (i webrtc.ICECandidateInit ) error {
330
+ c .remoteSessionDescriptionMutex .Lock ()
331
+ defer c .remoteSessionDescriptionMutex .Unlock ()
332
+ c .opts .Logger .Debug (context .Background (), "accepting candidate" , slog .F ("length" , len (i .Candidate )))
333
+ return c .rtc .AddICECandidate (i )
333
334
}
334
335
335
- // AddRemoteNegotiation accepts a negotiation message for handshaking a connection.
336
- func (c * Conn ) AddRemoteNegotiation (negotiation Negotiation ) error {
337
- if negotiation .SessionDescription != nil {
338
- c .opts .Logger .Debug (context .Background (), "adding remote negotiation with session description" )
339
- select {
340
- case <- c .closed :
341
- return nil
342
- case c .remoteSessionDescription <- * negotiation .SessionDescription :
343
- }
336
+ // SetRemoteSessionDescription sets the remote description for the WebRTC connection.
337
+ func (c * Conn ) SetRemoteSessionDescription (sessionDescription webrtc.SessionDescription ) {
338
+ select {
339
+ case <- c .closed :
340
+ case c .remoteSessionDescriptionChannel <- sessionDescription :
344
341
}
342
+ }
345
343
346
- if len (negotiation .ICECandidates ) > 0 {
347
- c .remoteSessionDescriptionMutex .Lock ()
348
- defer c .remoteSessionDescriptionMutex .Unlock ()
349
- c .opts .Logger .Debug (context .Background (), "adding remote negotiation with ice candidates" ,
350
- slog .F ("count" , len (negotiation .ICECandidates )))
351
- for _ , iceCandidate := range negotiation .ICECandidates {
352
- err := c .rtc .AddICECandidate (iceCandidate )
353
- if err != nil {
354
- return err
355
- }
356
- }
357
- }
344
+ // LocalSessionDescription returns a channel that emits a session description
345
+ // when one is required to be exchanged.
346
+ func (c * Conn ) LocalSessionDescription () <- chan webrtc.SessionDescription {
347
+ return c .localSessionDescriptionChannel
348
+ }
358
349
359
- return nil
350
+ // LocalCandidate returns a channel that emits when a local candidate
351
+ // needs to be exchanged with a remote connection.
352
+ func (c * Conn ) LocalCandidate () <- chan webrtc.ICECandidateInit {
353
+ return c .localCandidateChannel
360
354
}
361
355
362
356
func (c * Conn ) pingChannel () (* Channel , error ) {
0 commit comments