Skip to content

Commit 1facda9

Browse files
committed
Improve comments
1 parent 09d8442 commit 1facda9

File tree

1 file changed

+67
-59
lines changed

1 file changed

+67
-59
lines changed

peer/conn.go

Lines changed: 67 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ type Conn struct {
126126
localCandidateChannel chan webrtc.ICECandidateInit
127127
localSessionDescriptionChannel chan webrtc.SessionDescription
128128
remoteSessionDescriptionChannel chan webrtc.SessionDescription
129-
remoteSessionDescriptionMutex sync.Mutex
129+
130+
negotiateMutex sync.Mutex
130131

131132
pendingCandidatesToSend []webrtc.ICECandidateInit
132133
pendingCandidatesToSendMutex sync.Mutex
@@ -143,19 +144,16 @@ type Conn struct {
143144
pingError error
144145
}
145146

146-
// Negotiation represents a handshake message between peer connections.
147-
type Negotiation struct {
148-
SessionDescription *webrtc.SessionDescription
149-
ICECandidates []webrtc.ICECandidateInit
150-
}
151-
152147
func (c *Conn) init() error {
153148
c.rtc.OnNegotiationNeeded(c.negotiate)
154149
c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) {
155150
c.opts.Logger.Debug(context.Background(), "ice connection state updated",
156151
slog.F("state", iceConnectionState))
157152

158153
if iceConnectionState == webrtc.ICEConnectionStateClosed {
154+
// pion/webrtc can update this state multiple times.
155+
// A connection can never become un-closed, so we
156+
// close the channel if it isn't already.
159157
c.closedICEMutex.Lock()
160158
defer c.closedICEMutex.Unlock()
161159
select {
@@ -165,15 +163,14 @@ func (c *Conn) init() error {
165163
}
166164
}
167165
})
168-
c.rtc.OnSignalingStateChange(func(signalState webrtc.SignalingState) {
169-
c.opts.Logger.Debug(context.Background(), "signal state updated",
170-
slog.F("state", signalState))
171-
})
172166
c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) {
173167
c.opts.Logger.Debug(context.Background(), "ice gathering state updated",
174168
slog.F("state", iceGatherState))
175169

176170
if iceGatherState == webrtc.ICEGathererStateClosed {
171+
// pion/webrtc can update this state multiple times.
172+
// A connection can never become un-closed, so we
173+
// close the channel if it isn't already.
177174
c.closedICEMutex.Lock()
178175
defer c.closedICEMutex.Unlock()
179176
select {
@@ -183,40 +180,11 @@ func (c *Conn) init() error {
183180
}
184181
}
185182
})
186-
c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) {
187-
if iceCandidate == nil {
188-
return
189-
}
190-
go func() {
191-
c.pendingCandidatesToSendMutex.Lock()
192-
defer c.pendingCandidatesToSendMutex.Unlock()
193-
if c.rtc.RemoteDescription() == nil {
194-
c.pendingCandidatesToSend = append(c.pendingCandidatesToSend, iceCandidate.ToJSON())
195-
c.opts.Logger.Debug(context.Background(), "buffering local candidate")
196-
return
197-
}
198-
c.opts.Logger.Debug(context.Background(), "sending local candidate")
199-
select {
200-
case <-c.closed:
201-
break
202-
case c.localCandidateChannel <- iceCandidate.ToJSON():
203-
}
204-
}()
205-
})
206-
c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) {
207-
select {
208-
case <-c.closed:
209-
return
210-
case c.dcOpenChannel <- dc:
211-
default:
212-
}
213-
})
214183
c.rtc.OnConnectionStateChange(func(peerConnectionState webrtc.PeerConnectionState) {
215184
if c.isClosed() {
185+
// Make sure we don't log after Close() has been called.
216186
return
217187
}
218-
// Pion executes this handler multiple times in a rare condition.
219-
// This prevents logging from happening after close.
220188
c.opts.Logger.Debug(context.Background(), "rtc connection updated",
221189
slog.F("state", peerConnectionState))
222190

@@ -236,23 +204,55 @@ func (c *Conn) init() error {
236204
}
237205
}
238206
case webrtc.PeerConnectionStateClosed:
239-
// Pion executes event handlers after close is called
240-
// on the RTC connection. This ensures our Close()
241-
// handler properly cleans up before returning.
242-
//
243-
// Pion can execute this multiple times, so we check
244-
// if it's open before closing.
207+
// pion/webrtc can update this state multiple times.
208+
// A connection can never become un-closed, so we
209+
// close the channel if it isn't already.
245210
c.closedRTCMutex.Lock()
246211
defer c.closedRTCMutex.Unlock()
247212
select {
248213
case <-c.closedRTC:
249-
c.opts.Logger.Debug(context.Background(), "closedRTC channel already closed")
250214
default:
251-
c.opts.Logger.Debug(context.Background(), "closedRTC channel closing...")
252215
close(c.closedRTC)
253216
}
254217
}
255218
})
219+
c.rtc.OnSignalingStateChange(func(signalState webrtc.SignalingState) {
220+
c.opts.Logger.Debug(context.Background(), "signaling state updated",
221+
slog.F("state", signalState))
222+
})
223+
c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) {
224+
if iceCandidate == nil {
225+
return
226+
}
227+
// Run this in a goroutine so we don't block pion/webrtc
228+
// from continuing.
229+
go func() {
230+
c.pendingCandidatesToSendMutex.Lock()
231+
defer c.pendingCandidatesToSendMutex.Unlock()
232+
// If the remote description hasn't been set yet, we queue the send of these candidates.
233+
// It may work to send these immediately, but at the time of writing this package is
234+
// unstable, so better being safe than sorry.
235+
if c.rtc.RemoteDescription() == nil {
236+
c.pendingCandidatesToSend = append(c.pendingCandidatesToSend, iceCandidate.ToJSON())
237+
c.opts.Logger.Debug(context.Background(), "buffering local candidate")
238+
return
239+
}
240+
c.opts.Logger.Debug(context.Background(), "sending local candidate")
241+
select {
242+
case <-c.closed:
243+
break
244+
case c.localCandidateChannel <- iceCandidate.ToJSON():
245+
}
246+
}()
247+
})
248+
c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) {
249+
select {
250+
case <-c.closed:
251+
return
252+
case c.dcOpenChannel <- dc:
253+
default:
254+
}
255+
})
256256
_, err := c.pingChannel()
257257
if err != nil {
258258
return err
@@ -265,20 +265,23 @@ func (c *Conn) init() error {
265265
return nil
266266
}
267267

268-
// Negotiate exchanges ICECandidate pairs over the exposed channels.
269-
// The diagram below shows the expected handshake. pion/webrtc v3
270-
// uses trickle ICE by default. See: https://webrtchacks.com/trickle-ice/
268+
// negotiate is triggered when a connection is ready to be established.
269+
// See trickle ICE for the expected exchange: https://webrtchacks.com/trickle-ice/
271270
func (c *Conn) negotiate() {
272271
c.opts.Logger.Debug(context.Background(), "negotiating")
273-
c.remoteSessionDescriptionMutex.Lock()
274-
defer c.remoteSessionDescriptionMutex.Unlock()
272+
// ICE candidates cannot be added until SessionDescriptions have been
273+
// exchanged between peers.
274+
c.negotiateMutex.Lock()
275+
defer c.negotiateMutex.Unlock()
275276

276277
if c.offerrer {
277278
offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{})
278279
if err != nil {
279280
_ = c.CloseWithError(xerrors.Errorf("create offer: %w", err))
280281
return
281282
}
283+
// pion/webrtc will panic if Close is called while this
284+
// function is being executed.
282285
c.closeMutex.Lock()
283286
err = c.rtc.SetLocalDescription(offer)
284287
c.closeMutex.Unlock()
@@ -302,8 +305,8 @@ func (c *Conn) negotiate() {
302305
return
303306
case sessionDescription = <-c.remoteSessionDescriptionChannel:
304307
}
305-
306308
c.opts.Logger.Debug(context.Background(), "setting remote description")
309+
307310
err := c.rtc.SetRemoteDescription(sessionDescription)
308311
if err != nil {
309312
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
@@ -316,8 +319,9 @@ func (c *Conn) negotiate() {
316319
_ = c.CloseWithError(xerrors.Errorf("create answer: %w", err))
317320
return
318321
}
322+
// pion/webrtc will panic if Close is called while this
323+
// function is being executed.
319324
c.closeMutex.Lock()
320-
// pion doesn't handle a close properly if it occurs during this function.
321325
err = c.rtc.SetLocalDescription(answer)
322326
c.closeMutex.Unlock()
323327
if err != nil {
@@ -333,6 +337,7 @@ func (c *Conn) negotiate() {
333337
c.opts.Logger.Debug(context.Background(), "sent answer")
334338
}
335339

340+
// Flush bufferred candidates after both sides have been negotiated!
336341
go func() {
337342
c.pendingCandidatesToSendMutex.Lock()
338343
defer c.pendingCandidatesToSendMutex.Unlock()
@@ -356,9 +361,11 @@ func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) {
356361
if c.isClosed() {
357362
return
358363
}
364+
// This must occur in a goroutine to allow the SessionDescriptions
365+
// to be exchanged first.
359366
go func() {
360-
c.remoteSessionDescriptionMutex.Lock()
361-
defer c.remoteSessionDescriptionMutex.Unlock()
367+
c.negotiateMutex.Lock()
368+
defer c.negotiateMutex.Unlock()
362369
if c.isClosed() {
363370
return
364371
}
@@ -574,11 +581,12 @@ func (c *Conn) CloseWithError(err error) error {
574581
// closing an already closed connection isn't an issue for us.
575582
_ = c.rtc.Close()
576583

584+
// Waiting for pion/webrtc to report closed state on both of these
585+
// ensures no goroutine leaks.
577586
if c.rtc.ConnectionState() != webrtc.PeerConnectionStateNew {
578587
c.opts.Logger.Debug(context.Background(), "waiting for rtc connection close...")
579588
<-c.closedRTC
580589
}
581-
582590
if c.rtc.ICEConnectionState() != webrtc.ICEConnectionStateNew {
583591
c.opts.Logger.Debug(context.Background(), "waiting for ice connection close...")
584592
<-c.closedICE

0 commit comments

Comments
 (0)