@@ -69,6 +69,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
69
69
rtc : rtc ,
70
70
offerrer : client ,
71
71
closed : make (chan struct {}),
72
+ closedRTC : make (chan struct {}),
73
+ closedICE : make (chan struct {}),
72
74
dcOpenChannel : make (chan * webrtc.DataChannel ),
73
75
dcDisconnectChannel : make (chan struct {}),
74
76
dcFailedChannel : make (chan struct {}),
@@ -109,6 +111,8 @@ type Conn struct {
109
111
offerrer bool
110
112
111
113
closed chan struct {}
114
+ closedRTC chan struct {}
115
+ closedICE chan struct {}
112
116
closeMutex sync.Mutex
113
117
closeError error
114
118
@@ -142,26 +146,18 @@ type Conn struct {
142
146
func (c * Conn ) init () error {
143
147
c .rtc .OnNegotiationNeeded (c .negotiate )
144
148
c .rtc .OnICEConnectionStateChange (func (iceConnectionState webrtc.ICEConnectionState ) {
145
- // Close must be locked here otherwise log output can appear
146
- // after the connection has been closed.
147
- c .closeMutex .Lock ()
148
- defer c .closeMutex .Unlock ()
149
- if c .isClosed () {
150
- return
151
- }
152
-
153
149
c .opts .Logger .Debug (context .Background (), "ice connection state updated" ,
154
150
slog .F ("state" , iceConnectionState ))
151
+
152
+ if iceConnectionState == webrtc .ICEConnectionStateClosed {
153
+ close (c .closedICE )
154
+ }
155
+ })
156
+ c .rtc .OnSignalingStateChange (func (signalState webrtc.SignalingState ) {
157
+ c .opts .Logger .Debug (context .Background (), "signal state updated" ,
158
+ slog .F ("state" , signalState ))
155
159
})
156
160
c .rtc .OnICEGatheringStateChange (func (iceGatherState webrtc.ICEGathererState ) {
157
- // Close can't be locked here, because this is triggered
158
- // when close is called. It doesn't appear this get's
159
- // executed after close though, so it shouldn't cause
160
- // problems.
161
- if c .isClosed () {
162
- return
163
- }
164
-
165
161
c .opts .Logger .Debug (context .Background (), "ice gathering state updated" ,
166
162
slog .F ("state" , iceGatherState ))
167
163
})
@@ -170,8 +166,9 @@ func (c *Conn) init() error {
170
166
return
171
167
}
172
168
json := iceCandidate .ToJSON ()
169
+ hash := sha256 .Sum224 ([]byte (json .Candidate ))
173
170
c .opts .Logger .Debug (context .Background (), "writing candidate to channel" ,
174
- slog .F ("hash" , sha256 . Sum224 ([] byte ( json . Candidate ) )),
171
+ slog .F ("hash" , string ( hash [:] )),
175
172
slog .F ("length" , len (json .Candidate )),
176
173
)
177
174
select {
@@ -188,19 +185,11 @@ func (c *Conn) init() error {
188
185
default :
189
186
}
190
187
})
191
- c .rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
192
- // Close must be locked here otherwise log output can appear
193
- // after the connection has been closed.
194
- c .closeMutex .Lock ()
195
- defer c .closeMutex .Unlock ()
196
- if c .isClosed () {
197
- return
198
- }
199
-
188
+ c .rtc .OnConnectionStateChange (func (peerConnectionState webrtc.PeerConnectionState ) {
200
189
c .opts .Logger .Debug (context .Background (), "rtc connection updated" ,
201
- slog .F ("state" , pcs ))
190
+ slog .F ("state" , peerConnectionState ))
202
191
203
- switch pcs {
192
+ switch peerConnectionState {
204
193
case webrtc .PeerConnectionStateDisconnected :
205
194
for i := 0 ; i < int (c .dcDisconnectListeners .Load ()); i ++ {
206
195
select {
@@ -216,6 +205,13 @@ func (c *Conn) init() error {
216
205
}
217
206
}
218
207
}
208
+
209
+ if peerConnectionState == webrtc .PeerConnectionStateClosed {
210
+ // Pion executes event handlers after close is called
211
+ // on the RTC connection. This ensures our Close()
212
+ // handler properly cleans up before returning.
213
+ close (c .closedRTC )
214
+ }
219
215
})
220
216
_ , err := c .pingChannel ()
221
217
if err != nil {
@@ -341,7 +337,7 @@ func (c *Conn) negotiate() {
341
337
for _ , pendingCandidate := range c .pendingRemoteCandidates {
342
338
hash := sha256 .Sum224 ([]byte (pendingCandidate .Candidate ))
343
339
c .opts .Logger .Debug (context .Background (), "flushing buffered remote candidate" ,
344
- slog .F ("hash" , hash ),
340
+ slog .F ("hash" , string ( hash [:]) ),
345
341
slog .F ("length" , len (pendingCandidate .Candidate )),
346
342
)
347
343
err := c .rtc .AddICECandidate (pendingCandidate )
@@ -367,8 +363,9 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
367
363
func (c * Conn ) AddRemoteCandidate (i webrtc.ICECandidateInit ) error {
368
364
c .pendingCandidatesMutex .Lock ()
369
365
defer c .pendingCandidatesMutex .Unlock ()
366
+ hash := sha256 .Sum224 ([]byte (i .Candidate ))
370
367
fields := []slog.Field {
371
- slog .F ("hash" , sha256 . Sum224 ([] byte ( i . Candidate ) )),
368
+ slog .F ("hash" , string ( hash [:] )),
372
369
slog .F ("length" , len (i .Candidate )),
373
370
}
374
371
if ! c .pendingCandidatesFlushed {
@@ -542,5 +539,15 @@ func (c *Conn) CloseWithError(err error) error {
542
539
// All logging, goroutines, and async functionality is cleaned up after this.
543
540
c .dcClosedWaitGroup .Wait ()
544
541
542
+ if c .rtc .ConnectionState () != webrtc .PeerConnectionStateNew {
543
+ c .opts .Logger .Debug (context .Background (), "waiting for rtc connection close..." )
544
+ <- c .closedRTC
545
+ }
546
+
547
+ if c .rtc .ICEConnectionState () != webrtc .ICEConnectionStateNew {
548
+ c .opts .Logger .Debug (context .Background (), "waiting for ice connection close..." )
549
+ <- c .closedICE
550
+ }
551
+
545
552
return err
546
553
}
0 commit comments