diff --git a/peer/conn.go b/peer/conn.go index 800699f2f7766..8ee3e9dd490c9 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "crypto/sha256" "io" "sync" "time" @@ -140,15 +141,43 @@ type Conn struct { func (c *Conn) init() error { c.rtc.OnNegotiationNeeded(c.negotiate) + c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) { + // Close must be locked here otherwise log output can appear + // after the connection has been closed. + c.closeMutex.Lock() + defer c.closeMutex.Unlock() + if c.isClosed() { + return + } + + c.opts.Logger.Debug(context.Background(), "ice connection state updated", + slog.F("state", iceConnectionState)) + }) + c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) { + // Close can't be locked here, because this is triggered + // when close is called. It doesn't appear this get's + // executed after close though, so it shouldn't cause + // problems. + if c.isClosed() { + return + } + + c.opts.Logger.Debug(context.Background(), "ice gathering state updated", + slog.F("state", iceGatherState)) + }) c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) { if iceCandidate == nil { return } - c.opts.Logger.Debug(context.Background(), "adding local candidate") + json := iceCandidate.ToJSON() + c.opts.Logger.Debug(context.Background(), "writing candidate to channel", + slog.F("hash", sha256.Sum224([]byte(json.Candidate))), + slog.F("length", len(json.Candidate)), + ) select { case <-c.closed: break - case c.localCandidateChannel <- iceCandidate.ToJSON(): + case c.localCandidateChannel <- json: } }) c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) { @@ -169,8 +198,7 @@ func (c *Conn) init() error { } c.opts.Logger.Debug(context.Background(), "rtc connection updated", - slog.F("state", pcs), - slog.F("ice", c.rtc.ICEConnectionState())) + slog.F("state", pcs)) switch pcs { case webrtc.PeerConnectionStateDisconnected: @@ -311,15 +339,22 @@ func (c *Conn) negotiate() { c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() for _, pendingCandidate := range c.pendingRemoteCandidates { - c.opts.Logger.Debug(context.Background(), "flushing remote candidate") + hash := sha256.Sum224([]byte(pendingCandidate.Candidate)) + c.opts.Logger.Debug(context.Background(), "flushing buffered remote candidate", + slog.F("hash", hash), + slog.F("length", len(pendingCandidate.Candidate)), + ) err := c.rtc.AddICECandidate(pendingCandidate) if err != nil { - _ = c.CloseWithError(xerrors.Errorf("flush pending candidates: %w", err)) + _ = c.CloseWithError(xerrors.Errorf("flush pending remote candidate: %w", err)) return } } + c.opts.Logger.Debug(context.Background(), "flushed buffered remote candidates", + slog.F("count", len(c.pendingRemoteCandidates)), + ) c.pendingCandidatesFlushed = true - c.opts.Logger.Debug(context.Background(), "flushed remote candidates") + c.pendingRemoteCandidates = make([]webrtc.ICECandidateInit, 0) } // LocalCandidate returns a channel that emits when a local candidate @@ -332,12 +367,16 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() + fields := []slog.Field{ + slog.F("hash", sha256.Sum224([]byte(i.Candidate))), + slog.F("length", len(i.Candidate)), + } if !c.pendingCandidatesFlushed { - c.opts.Logger.Debug(context.Background(), "adding remote candidate to buffer") + c.opts.Logger.Debug(context.Background(), "bufferring remote candidate", fields...) c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) return nil } - c.opts.Logger.Debug(context.Background(), "adding remote candidate") + c.opts.Logger.Debug(context.Background(), "adding remote candidate", fields...) return c.rtc.AddICECandidate(i) }