@@ -69,6 +69,7 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
69
69
rtc : rtc ,
70
70
offerrer : client ,
71
71
closed : make (chan struct {}),
72
+ closedRTC : make (chan struct {}),
72
73
dcOpenChannel : make (chan * webrtc.DataChannel ),
73
74
dcDisconnectChannel : make (chan struct {}),
74
75
dcFailedChannel : make (chan struct {}),
@@ -109,6 +110,7 @@ type Conn struct {
109
110
offerrer bool
110
111
111
112
closed chan struct {}
113
+ closedRTC chan struct {}
112
114
closeMutex sync.Mutex
113
115
closeError error
114
116
@@ -142,26 +144,14 @@ type Conn struct {
142
144
func (c * Conn ) init () error {
143
145
c .rtc .OnNegotiationNeeded (c .negotiate )
144
146
c .rtc .OnICEConnectionStateChange (func (iceConnectionState webrtc.ICEConnectionState ) {
145
- // Close must be locked here otherwise log output can appear
146
- // after the connection has been closed.
147
- c .closeMutex .Lock ()
148
- defer c .closeMutex .Unlock ()
149
- if c .isClosed () {
150
- return
151
- }
152
-
153
147
c .opts .Logger .Debug (context .Background (), "ice connection state updated" ,
154
148
slog .F ("state" , iceConnectionState ))
155
149
})
150
+ c .rtc .OnSignalingStateChange (func (signalState webrtc.SignalingState ) {
151
+ c .opts .Logger .Debug (context .Background (), "signal state updated" ,
152
+ slog .F ("state" , signalState ))
153
+ })
156
154
c .rtc .OnICEGatheringStateChange (func (iceGatherState webrtc.ICEGathererState ) {
157
- // Close can't be locked here, because this is triggered
158
- // when close is called. It doesn't appear this get's
159
- // executed after close though, so it shouldn't cause
160
- // problems.
161
- if c .isClosed () {
162
- return
163
- }
164
-
165
155
c .opts .Logger .Debug (context .Background (), "ice gathering state updated" ,
166
156
slog .F ("state" , iceGatherState ))
167
157
})
@@ -170,8 +160,9 @@ func (c *Conn) init() error {
170
160
return
171
161
}
172
162
json := iceCandidate .ToJSON ()
163
+ hash := sha256 .Sum224 ([]byte (json .Candidate ))
173
164
c .opts .Logger .Debug (context .Background (), "writing candidate to channel" ,
174
- slog .F ("hash" , sha256 . Sum224 ([] byte ( json . Candidate ) )),
165
+ slog .F ("hash" , string ( hash [:] )),
175
166
slog .F ("length" , len (json .Candidate )),
176
167
)
177
168
select {
@@ -189,14 +180,6 @@ func (c *Conn) init() error {
189
180
}
190
181
})
191
182
c .rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
192
- // Close must be locked here otherwise log output can appear
193
- // after the connection has been closed.
194
- c .closeMutex .Lock ()
195
- defer c .closeMutex .Unlock ()
196
- if c .isClosed () {
197
- return
198
- }
199
-
200
183
c .opts .Logger .Debug (context .Background (), "rtc connection updated" ,
201
184
slog .F ("state" , pcs ))
202
185
@@ -216,6 +199,13 @@ func (c *Conn) init() error {
216
199
}
217
200
}
218
201
}
202
+
203
+ if pcs == webrtc .PeerConnectionStateClosed {
204
+ // Pion executes event handlers after close is called
205
+ // on the RTC connection. This ensures our Close()
206
+ // handler properly cleans up before returning.
207
+ close (c .closedRTC )
208
+ }
219
209
})
220
210
_ , err := c .pingChannel ()
221
211
if err != nil {
@@ -341,7 +331,7 @@ func (c *Conn) negotiate() {
341
331
for _ , pendingCandidate := range c .pendingRemoteCandidates {
342
332
hash := sha256 .Sum224 ([]byte (pendingCandidate .Candidate ))
343
333
c .opts .Logger .Debug (context .Background (), "flushing buffered remote candidate" ,
344
- slog .F ("hash" , hash ),
334
+ slog .F ("hash" , string ( hash [:]) ),
345
335
slog .F ("length" , len (pendingCandidate .Candidate )),
346
336
)
347
337
err := c .rtc .AddICECandidate (pendingCandidate )
@@ -367,8 +357,9 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
367
357
func (c * Conn ) AddRemoteCandidate (i webrtc.ICECandidateInit ) error {
368
358
c .pendingCandidatesMutex .Lock ()
369
359
defer c .pendingCandidatesMutex .Unlock ()
360
+ hash := sha256 .Sum224 ([]byte (i .Candidate ))
370
361
fields := []slog.Field {
371
- slog .F ("hash" , sha256 . Sum224 ([] byte ( i . Candidate ) )),
362
+ slog .F ("hash" , string ( hash [:] )),
372
363
slog .F ("length" , len (i .Candidate )),
373
364
}
374
365
if ! c .pendingCandidatesFlushed {
@@ -542,5 +533,10 @@ func (c *Conn) CloseWithError(err error) error {
542
533
// All logging, goroutines, and async functionality is cleaned up after this.
543
534
c .dcClosedWaitGroup .Wait ()
544
535
536
+ if c .rtc .ConnectionState () != webrtc .PeerConnectionStateNew {
537
+ c .opts .Logger .Debug (context .Background (), "waiting for connection close..." )
538
+ <- c .closedRTC
539
+ }
540
+
545
541
return err
546
542
}
0 commit comments