From 40d204f9875cfff2500cfd023025b7831da086eb Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Sun, 30 Jan 2022 01:00:54 +0000 Subject: [PATCH 1/4] ci: Improve peer logging to help identify race --- peer/conn.go | 45 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index fa6081c1ab8c6..d996e88d968cf 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "crypto/sha256" "io" "sync" "time" @@ -140,6 +141,26 @@ type Conn struct { func (c *Conn) init() error { c.rtc.OnNegotiationNeeded(c.negotiate) + c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) { + c.closeMutex.Lock() + defer c.closeMutex.Unlock() + if c.isClosed() { + return + } + + c.opts.Logger.Debug(context.Background(), "ice connection updated", + slog.F("state", iceConnectionState)) + }) + c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) { + c.closeMutex.Lock() + defer c.closeMutex.Unlock() + if c.isClosed() { + return + } + + c.opts.Logger.Debug(context.Background(), "ice gathering state updated", + slog.F("state", iceGatherState)) + }) c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) { if iceCandidate == nil { return @@ -169,8 +190,7 @@ func (c *Conn) init() error { } c.opts.Logger.Debug(context.Background(), "rtc connection updated", - slog.F("state", pcs), - slog.F("ice", c.rtc.ICEConnectionState())) + slog.F("state", pcs)) switch pcs { case webrtc.PeerConnectionStateDisconnected: @@ -311,15 +331,22 @@ func (c *Conn) negotiate() { c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() for _, pendingCandidate := range c.pendingRemoteCandidates { - c.opts.Logger.Debug(context.Background(), "flushing remote candidate") + hash := sha256.Sum224([]byte(pendingCandidate.Candidate)) + c.opts.Logger.Debug(context.Background(), "flushing buffered remote candidate", + slog.F("hash", hash), + slog.F("length", len(pendingCandidate.Candidate)), + ) err := c.rtc.AddICECandidate(pendingCandidate) if err != nil { - _ = c.CloseWithError(xerrors.Errorf("flush pending candidates: %w", err)) + _ = c.CloseWithError(xerrors.Errorf("flush pending remote candidate: %w", err)) return } } + c.opts.Logger.Debug(context.Background(), "flushed buffered remote candidates", + slog.F("count", len(c.pendingRemoteCandidates)), + ) c.pendingCandidatesFlushed = true - c.opts.Logger.Debug(context.Background(), "flushed remote candidates") + c.pendingRemoteCandidates = make([]webrtc.ICECandidateInit, 0) } // LocalCandidate returns a channel that emits when a local candidate @@ -332,12 +359,16 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() + fields := []slog.Field{ + slog.F("hash", sha256.Sum224([]byte(i.Candidate))), + slog.F("length", len(i.Candidate)), + } if !c.pendingCandidatesFlushed { - c.opts.Logger.Debug(context.Background(), "adding remote candidate to buffer") + c.opts.Logger.Debug(context.Background(), "bufferring remote candidate", fields...) c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) return nil } - c.opts.Logger.Debug(context.Background(), "adding remote candidate") + c.opts.Logger.Debug(context.Background(), "adding remote candidate", fields...) return c.rtc.AddICECandidate(i) } From 83434046d84d3d9147e0cd64a0a78cd45386987c Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Sun, 30 Jan 2022 01:15:58 +0000 Subject: [PATCH 2/4] Remove mutex locks --- peer/conn.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index d996e88d968cf..f6082ffc8a6a5 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -142,8 +142,6 @@ type Conn struct { func (c *Conn) init() error { c.rtc.OnNegotiationNeeded(c.negotiate) c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) { - c.closeMutex.Lock() - defer c.closeMutex.Unlock() if c.isClosed() { return } @@ -152,8 +150,6 @@ func (c *Conn) init() error { slog.F("state", iceConnectionState)) }) c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) { - c.closeMutex.Lock() - defer c.closeMutex.Unlock() if c.isClosed() { return } From f4c9dc2038ef5fa04e2553a91c826f43cd23d1b9 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Sun, 30 Jan 2022 01:20:44 +0000 Subject: [PATCH 3/4] Add hash to write --- peer/conn.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/peer/conn.go b/peer/conn.go index f6082ffc8a6a5..52efe778a5583 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -161,11 +161,15 @@ func (c *Conn) init() error { if iceCandidate == nil { return } - c.opts.Logger.Debug(context.Background(), "adding local candidate") + json := iceCandidate.ToJSON() + c.opts.Logger.Debug(context.Background(), "writing candidate to channel", + slog.F("hash", sha256.Sum224([]byte(json.Candidate))), + slog.F("length", len(json.Candidate)), + ) select { case <-c.closed: break - case c.localCandidateChannel <- iceCandidate.ToJSON(): + case c.localCandidateChannel <- json: } }) c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) { From f65589771f466d25e9c3786d9dc082d8eda3e6ec Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Sun, 30 Jan 2022 01:27:31 +0000 Subject: [PATCH 4/4] Improve comment --- peer/conn.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/peer/conn.go b/peer/conn.go index 52efe778a5583..77a561a91c35a 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -142,14 +142,22 @@ type Conn struct { func (c *Conn) init() error { c.rtc.OnNegotiationNeeded(c.negotiate) c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) { + // Close must be locked here otherwise log output can appear + // after the connection has been closed. + c.closeMutex.Lock() + defer c.closeMutex.Unlock() if c.isClosed() { return } - c.opts.Logger.Debug(context.Background(), "ice connection updated", + c.opts.Logger.Debug(context.Background(), "ice connection state updated", slog.F("state", iceConnectionState)) }) c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) { + // Close can't be locked here, because this is triggered + // when close is called. It doesn't appear this get's + // executed after close though, so it shouldn't cause + // problems. if c.isClosed() { return }