Skip to content

Commit f49dbce

Browse files
committed
fix: Synchronize peer logging with a channel
We were depending on the close mutex to properly report connection state. This ensures the RTC connection is properly closed before returning.
1 parent 5367d93 commit f49dbce

File tree

1 file changed

+23
-27
lines changed

1 file changed

+23
-27
lines changed

peer/conn.go

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
6969
rtc: rtc,
7070
offerrer: client,
7171
closed: make(chan struct{}),
72+
closedRTC: make(chan struct{}),
7273
dcOpenChannel: make(chan *webrtc.DataChannel),
7374
dcDisconnectChannel: make(chan struct{}),
7475
dcFailedChannel: make(chan struct{}),
@@ -109,6 +110,7 @@ type Conn struct {
109110
offerrer bool
110111

111112
closed chan struct{}
113+
closedRTC chan struct{}
112114
closeMutex sync.Mutex
113115
closeError error
114116

@@ -142,26 +144,14 @@ type Conn struct {
142144
func (c *Conn) init() error {
143145
c.rtc.OnNegotiationNeeded(c.negotiate)
144146
c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) {
145-
// Close must be locked here otherwise log output can appear
146-
// after the connection has been closed.
147-
c.closeMutex.Lock()
148-
defer c.closeMutex.Unlock()
149-
if c.isClosed() {
150-
return
151-
}
152-
153147
c.opts.Logger.Debug(context.Background(), "ice connection state updated",
154148
slog.F("state", iceConnectionState))
155149
})
150+
c.rtc.OnSignalingStateChange(func(signalState webrtc.SignalingState) {
151+
c.opts.Logger.Debug(context.Background(), "signal state updated",
152+
slog.F("state", signalState))
153+
})
156154
c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) {
157-
// Close can't be locked here, because this is triggered
158-
// when close is called. It doesn't appear this get's
159-
// executed after close though, so it shouldn't cause
160-
// problems.
161-
if c.isClosed() {
162-
return
163-
}
164-
165155
c.opts.Logger.Debug(context.Background(), "ice gathering state updated",
166156
slog.F("state", iceGatherState))
167157
})
@@ -170,8 +160,9 @@ func (c *Conn) init() error {
170160
return
171161
}
172162
json := iceCandidate.ToJSON()
163+
hash := sha256.Sum224([]byte(json.Candidate))
173164
c.opts.Logger.Debug(context.Background(), "writing candidate to channel",
174-
slog.F("hash", sha256.Sum224([]byte(json.Candidate))),
165+
slog.F("hash", string(hash[:])),
175166
slog.F("length", len(json.Candidate)),
176167
)
177168
select {
@@ -189,14 +180,6 @@ func (c *Conn) init() error {
189180
}
190181
})
191182
c.rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
192-
// Close must be locked here otherwise log output can appear
193-
// after the connection has been closed.
194-
c.closeMutex.Lock()
195-
defer c.closeMutex.Unlock()
196-
if c.isClosed() {
197-
return
198-
}
199-
200183
c.opts.Logger.Debug(context.Background(), "rtc connection updated",
201184
slog.F("state", pcs))
202185

@@ -216,6 +199,13 @@ func (c *Conn) init() error {
216199
}
217200
}
218201
}
202+
203+
if pcs == webrtc.PeerConnectionStateClosed {
204+
// Pion executes event handlers after close is called
205+
// on the RTC connection. This ensures our Close()
206+
// handler properly cleans up before returning.
207+
close(c.closedRTC)
208+
}
219209
})
220210
_, err := c.pingChannel()
221211
if err != nil {
@@ -341,7 +331,7 @@ func (c *Conn) negotiate() {
341331
for _, pendingCandidate := range c.pendingRemoteCandidates {
342332
hash := sha256.Sum224([]byte(pendingCandidate.Candidate))
343333
c.opts.Logger.Debug(context.Background(), "flushing buffered remote candidate",
344-
slog.F("hash", hash),
334+
slog.F("hash", string(hash[:])),
345335
slog.F("length", len(pendingCandidate.Candidate)),
346336
)
347337
err := c.rtc.AddICECandidate(pendingCandidate)
@@ -367,8 +357,9 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
367357
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
368358
c.pendingCandidatesMutex.Lock()
369359
defer c.pendingCandidatesMutex.Unlock()
360+
hash := sha256.Sum224([]byte(i.Candidate))
370361
fields := []slog.Field{
371-
slog.F("hash", sha256.Sum224([]byte(i.Candidate))),
362+
slog.F("hash", string(hash[:])),
372363
slog.F("length", len(i.Candidate)),
373364
}
374365
if !c.pendingCandidatesFlushed {
@@ -542,5 +533,10 @@ func (c *Conn) CloseWithError(err error) error {
542533
// All logging, goroutines, and async functionality is cleaned up after this.
543534
c.dcClosedWaitGroup.Wait()
544535

536+
if c.rtc.ConnectionState() != webrtc.PeerConnectionStateNew {
537+
c.opts.Logger.Debug(context.Background(), "waiting for connection close...")
538+
<-c.closedRTC
539+
}
540+
545541
return err
546542
}

0 commit comments

Comments
 (0)