Skip to content

Commit f2293ec

Browse files
committed
fix: Synchronize peer logging with a channel
We were depending on the close mutex to properly report connection state. This ensures the RTC connection is properly closed before returning.
1 parent 5367d93 commit f2293ec

File tree

1 file changed

+37
-30
lines changed

1 file changed

+37
-30
lines changed

peer/conn.go

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
6969
rtc: rtc,
7070
offerrer: client,
7171
closed: make(chan struct{}),
72+
closedRTC: make(chan struct{}),
73+
closedICE: make(chan struct{}),
7274
dcOpenChannel: make(chan *webrtc.DataChannel),
7375
dcDisconnectChannel: make(chan struct{}),
7476
dcFailedChannel: make(chan struct{}),
@@ -109,6 +111,8 @@ type Conn struct {
109111
offerrer bool
110112

111113
closed chan struct{}
114+
closedRTC chan struct{}
115+
closedICE chan struct{}
112116
closeMutex sync.Mutex
113117
closeError error
114118

@@ -142,26 +146,18 @@ type Conn struct {
142146
func (c *Conn) init() error {
143147
c.rtc.OnNegotiationNeeded(c.negotiate)
144148
c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) {
145-
// Close must be locked here otherwise log output can appear
146-
// after the connection has been closed.
147-
c.closeMutex.Lock()
148-
defer c.closeMutex.Unlock()
149-
if c.isClosed() {
150-
return
151-
}
152-
153149
c.opts.Logger.Debug(context.Background(), "ice connection state updated",
154150
slog.F("state", iceConnectionState))
151+
152+
if iceConnectionState == webrtc.ICEConnectionStateClosed {
153+
close(c.closedICE)
154+
}
155+
})
156+
c.rtc.OnSignalingStateChange(func(signalState webrtc.SignalingState) {
157+
c.opts.Logger.Debug(context.Background(), "signal state updated",
158+
slog.F("state", signalState))
155159
})
156160
c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) {
157-
// Close can't be locked here, because this is triggered
158-
// when close is called. It doesn't appear this get's
159-
// executed after close though, so it shouldn't cause
160-
// problems.
161-
if c.isClosed() {
162-
return
163-
}
164-
165161
c.opts.Logger.Debug(context.Background(), "ice gathering state updated",
166162
slog.F("state", iceGatherState))
167163
})
@@ -170,8 +166,9 @@ func (c *Conn) init() error {
170166
return
171167
}
172168
json := iceCandidate.ToJSON()
169+
hash := sha256.Sum224([]byte(json.Candidate))
173170
c.opts.Logger.Debug(context.Background(), "writing candidate to channel",
174-
slog.F("hash", sha256.Sum224([]byte(json.Candidate))),
171+
slog.F("hash", string(hash[:])),
175172
slog.F("length", len(json.Candidate)),
176173
)
177174
select {
@@ -188,19 +185,11 @@ func (c *Conn) init() error {
188185
default:
189186
}
190187
})
191-
c.rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
192-
// Close must be locked here otherwise log output can appear
193-
// after the connection has been closed.
194-
c.closeMutex.Lock()
195-
defer c.closeMutex.Unlock()
196-
if c.isClosed() {
197-
return
198-
}
199-
188+
c.rtc.OnConnectionStateChange(func(peerConnectionState webrtc.PeerConnectionState) {
200189
c.opts.Logger.Debug(context.Background(), "rtc connection updated",
201-
slog.F("state", pcs))
190+
slog.F("state", peerConnectionState))
202191

203-
switch pcs {
192+
switch peerConnectionState {
204193
case webrtc.PeerConnectionStateDisconnected:
205194
for i := 0; i < int(c.dcDisconnectListeners.Load()); i++ {
206195
select {
@@ -216,6 +205,13 @@ func (c *Conn) init() error {
216205
}
217206
}
218207
}
208+
209+
if peerConnectionState == webrtc.PeerConnectionStateClosed {
210+
// Pion executes event handlers after close is called
211+
// on the RTC connection. This ensures our Close()
212+
// handler properly cleans up before returning.
213+
close(c.closedRTC)
214+
}
219215
})
220216
_, err := c.pingChannel()
221217
if err != nil {
@@ -341,7 +337,7 @@ func (c *Conn) negotiate() {
341337
for _, pendingCandidate := range c.pendingRemoteCandidates {
342338
hash := sha256.Sum224([]byte(pendingCandidate.Candidate))
343339
c.opts.Logger.Debug(context.Background(), "flushing buffered remote candidate",
344-
slog.F("hash", hash),
340+
slog.F("hash", string(hash[:])),
345341
slog.F("length", len(pendingCandidate.Candidate)),
346342
)
347343
err := c.rtc.AddICECandidate(pendingCandidate)
@@ -367,8 +363,9 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
367363
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
368364
c.pendingCandidatesMutex.Lock()
369365
defer c.pendingCandidatesMutex.Unlock()
366+
hash := sha256.Sum224([]byte(i.Candidate))
370367
fields := []slog.Field{
371-
slog.F("hash", sha256.Sum224([]byte(i.Candidate))),
368+
slog.F("hash", string(hash[:])),
372369
slog.F("length", len(i.Candidate)),
373370
}
374371
if !c.pendingCandidatesFlushed {
@@ -542,5 +539,15 @@ func (c *Conn) CloseWithError(err error) error {
542539
// All logging, goroutines, and async functionality is cleaned up after this.
543540
c.dcClosedWaitGroup.Wait()
544541

542+
if c.rtc.ConnectionState() != webrtc.PeerConnectionStateNew {
543+
c.opts.Logger.Debug(context.Background(), "waiting for rtc connection close...")
544+
<-c.closedRTC
545+
}
546+
547+
if c.rtc.ICEConnectionState() != webrtc.ICEConnectionStateNew {
548+
c.opts.Logger.Debug(context.Background(), "waiting for ice connection close...")
549+
<-c.closedICE
550+
}
551+
545552
return err
546553
}

0 commit comments

Comments
 (0)