Skip to content

Commit 679e4aa

Browse files
committed
Organize conn
1 parent 7eccb81 commit 679e4aa

File tree

1 file changed

+55
-84
lines changed

1 file changed

+55
-84
lines changed

peer/conn.go

Lines changed: 55 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ package peer
33
import (
44
"bytes"
55
"context"
6-
"fmt"
76

87
"crypto/rand"
9-
"crypto/sha256"
108
"io"
119
"sync"
1210
"time"
@@ -124,7 +122,8 @@ type Conn struct {
124122
dcFailedListeners atomic.Uint32
125123
dcClosedWaitGroup sync.WaitGroup
126124

127-
localNegotiator chan Negotiation
125+
localNegotiator chan Negotiation
126+
128127
remoteSessionDescription chan webrtc.SessionDescription
129128
remoteSessionDescriptionMutex sync.Mutex
130129

@@ -179,16 +178,12 @@ func (c *Conn) init() error {
179178
c.pendingCandidatesToSendMutex.Lock()
180179
defer c.pendingCandidatesToSendMutex.Unlock()
181180
json := iceCandidate.ToJSON()
182-
fields := []slog.Field{
183-
slog.F("hash", c.hashCandidate(json)),
184-
slog.F("length", len(json.Candidate)),
185-
}
186181
if !c.pendingCandidatesFlushed {
187182
c.pendingCandidatesToSend = append(c.pendingCandidatesToSend, json)
188-
c.opts.Logger.Debug(context.Background(), "buffering local candidate to send", fields...)
183+
c.opts.Logger.Debug(context.Background(), "buffering local candidate")
189184
return
190185
}
191-
c.opts.Logger.Debug(context.Background(), "sending local candidate directly", fields...)
186+
c.opts.Logger.Debug(context.Background(), "sending local candidate")
192187
select {
193188
case <-c.closed:
194189
break
@@ -250,93 +245,24 @@ func (c *Conn) init() error {
250245
return nil
251246
}
252247

253-
func (c *Conn) pingChannel() (*Channel, error) {
254-
c.pingOnce.Do(func() {
255-
c.pingChan, c.pingError = c.dialChannel(context.Background(), "ping", &ChannelOptions{
256-
ID: c.pingChannelID,
257-
Negotiated: true,
258-
OpenOnDisconnect: true,
259-
})
260-
if c.pingError != nil {
261-
return
262-
}
263-
})
264-
return c.pingChan, c.pingError
265-
}
266-
267-
func (c *Conn) pingEchoChannel() (*Channel, error) {
268-
c.pingEchoOnce.Do(func() {
269-
c.pingEchoChan, c.pingEchoError = c.dialChannel(context.Background(), "echo", &ChannelOptions{
270-
ID: c.pingEchoChannelID,
271-
Negotiated: true,
272-
OpenOnDisconnect: true,
273-
})
274-
if c.pingEchoError != nil {
275-
return
276-
}
277-
go func() {
278-
for {
279-
data := make([]byte, pingDataLength)
280-
bytesRead, err := c.pingEchoChan.Read(data)
281-
if err != nil {
282-
if c.isClosed() {
283-
return
284-
}
285-
_ = c.CloseWithError(xerrors.Errorf("read ping echo channel: %w", err))
286-
return
287-
}
288-
_, err = c.pingEchoChan.Write(data[:bytesRead])
289-
if err != nil {
290-
_ = c.CloseWithError(xerrors.Errorf("write ping echo channel: %w", err))
291-
return
292-
}
293-
}
294-
}()
295-
})
296-
return c.pingEchoChan, c.pingEchoError
297-
}
298-
299-
// Computes a hash of the ICE candidate. Used for debug logging.
300-
func (*Conn) hashCandidate(iceCandidate webrtc.ICECandidateInit) string {
301-
hash := sha256.Sum224([]byte(iceCandidate.Candidate))
302-
return fmt.Sprintf("%x", hash[:8])
303-
}
304-
305248
// Negotiate exchanges ICECandidate pairs over the exposed channels.
306249
// The diagram below shows the expected handshake. pion/webrtc v3
307250
// uses trickle ICE by default. See: https://webrtchacks.com/trickle-ice/
308-
// ┌────────┐ ┌────────┐
309-
// │offerrer│ │answerer│
310-
// │(client)│ │(server)│
311-
// └─┬────┬─┘ └─┬──────┘
312-
// │ │ offer │
313-
// ┌──────────▼┐ ├──────────────►├──►┌───────────┐
314-
// │STUN Server│ │ │ │STUN Server│
315-
// │(optional) ├──►│ candidate │◄──┤(optional) │
316-
// └───────────┘ │ (async) │ └───────────┘
317-
// │◄─────────────►│
318-
// │ │
319-
// │ answer │
320-
// └◄──────────────┘
321251
func (c *Conn) negotiate() {
322252
c.opts.Logger.Debug(context.Background(), "negotiating")
323253

324254
if c.offerrer {
325-
c.closeMutex.Lock()
326255
offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{})
327-
c.closeMutex.Unlock()
328256
if err != nil {
329257
_ = c.CloseWithError(xerrors.Errorf("create offer: %w", err))
330258
return
331259
}
332-
c.opts.Logger.Debug(context.Background(), "setting local description", slog.F("closed", c.isClosed()))
333-
c.closeMutex.Lock()
334260
err = c.rtc.SetLocalDescription(offer)
335-
c.closeMutex.Unlock()
336261
if err != nil {
337262
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
338263
return
339264
}
265+
c.opts.Logger.Debug(context.Background(), "sending offer")
340266
select {
341267
case <-c.closed:
342268
return
@@ -355,25 +281,21 @@ func (c *Conn) negotiate() {
355281
// the remote description is being set.
356282
c.remoteSessionDescriptionMutex.Lock()
357283
c.opts.Logger.Debug(context.Background(), "setting remote description")
358-
c.closeMutex.Lock()
359284
err := c.rtc.SetRemoteDescription(sessionDescription)
360-
c.closeMutex.Unlock()
361285
if err != nil {
362286
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
363287
return
364288
}
365289
c.remoteSessionDescriptionMutex.Unlock()
366290

367291
if !c.offerrer {
368-
c.closeMutex.Lock()
369292
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
370-
c.closeMutex.Unlock()
371293
if err != nil {
372294
_ = c.CloseWithError(xerrors.Errorf("create answer: %w", err))
373295
return
374296
}
375-
c.opts.Logger.Debug(context.Background(), "setting local description", slog.F("closed", c.isClosed()))
376297
c.closeMutex.Lock()
298+
// pion doesn't handle a close properly if it occurs during this function.
377299
err = c.rtc.SetLocalDescription(answer)
378300
c.closeMutex.Unlock()
379301
if err != nil {
@@ -404,10 +326,13 @@ func (c *Conn) negotiate() {
404326
c.pendingCandidatesFlushed = true
405327
}
406328

329+
// LocalNegotiation returns a channel for connection negotiation.
330+
// This should be piped to another peer connection.
407331
func (c *Conn) LocalNegotiation() <-chan Negotiation {
408332
return c.localNegotiator
409333
}
410334

335+
// AddRemoteNegotiation accepts a negotiation message for handshaking a connection.
411336
func (c *Conn) AddRemoteNegotiation(negotiation Negotiation) error {
412337
if negotiation.SessionDescription != nil {
413338
c.opts.Logger.Debug(context.Background(), "adding remote negotiation with session description")
@@ -434,6 +359,52 @@ func (c *Conn) AddRemoteNegotiation(negotiation Negotiation) error {
434359
return nil
435360
}
436361

362+
func (c *Conn) pingChannel() (*Channel, error) {
363+
c.pingOnce.Do(func() {
364+
c.pingChan, c.pingError = c.dialChannel(context.Background(), "ping", &ChannelOptions{
365+
ID: c.pingChannelID,
366+
Negotiated: true,
367+
OpenOnDisconnect: true,
368+
})
369+
if c.pingError != nil {
370+
return
371+
}
372+
})
373+
return c.pingChan, c.pingError
374+
}
375+
376+
func (c *Conn) pingEchoChannel() (*Channel, error) {
377+
c.pingEchoOnce.Do(func() {
378+
c.pingEchoChan, c.pingEchoError = c.dialChannel(context.Background(), "echo", &ChannelOptions{
379+
ID: c.pingEchoChannelID,
380+
Negotiated: true,
381+
OpenOnDisconnect: true,
382+
})
383+
if c.pingEchoError != nil {
384+
return
385+
}
386+
go func() {
387+
for {
388+
data := make([]byte, pingDataLength)
389+
bytesRead, err := c.pingEchoChan.Read(data)
390+
if err != nil {
391+
if c.isClosed() {
392+
return
393+
}
394+
_ = c.CloseWithError(xerrors.Errorf("read ping echo channel: %w", err))
395+
return
396+
}
397+
_, err = c.pingEchoChan.Write(data[:bytesRead])
398+
if err != nil {
399+
_ = c.CloseWithError(xerrors.Errorf("write ping echo channel: %w", err))
400+
return
401+
}
402+
}
403+
}()
404+
})
405+
return c.pingEchoChan, c.pingEchoError
406+
}
407+
437408
// SetConfiguration applies options to the WebRTC connection.
438409
// Generally used for updating transport options, like ICE servers.
439410
func (c *Conn) SetConfiguration(configuration webrtc.Configuration) error {

0 commit comments

Comments
 (0)