Skip to content

Commit f20ce9d

Browse files
committed
Revert to multi-channel setup
1 parent 679e4aa commit f20ce9d

File tree

6 files changed

+251
-267
lines changed

6 files changed

+251
-267
lines changed

peer/conn.go

Lines changed: 48 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -63,22 +63,21 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
6363
return nil, xerrors.Errorf("create peer connection: %w", err)
6464
}
6565
conn := &Conn{
66-
pingChannelID: 1,
67-
pingEchoChannelID: 2,
68-
opts: opts,
69-
rtc: rtc,
70-
offerrer: client,
71-
closed: make(chan struct{}),
72-
closedRTC: make(chan struct{}),
73-
closedICE: make(chan struct{}),
74-
dcOpenChannel: make(chan *webrtc.DataChannel),
75-
dcDisconnectChannel: make(chan struct{}),
76-
dcFailedChannel: make(chan struct{}),
77-
// This channel needs to be bufferred otherwise slow consumers
78-
// of this will cause a connection failure.
79-
localNegotiator: make(chan Negotiation, 8),
80-
remoteSessionDescription: make(chan webrtc.SessionDescription),
81-
pendingCandidatesToSend: make([]webrtc.ICECandidateInit, 0),
66+
pingChannelID: 1,
67+
pingEchoChannelID: 2,
68+
opts: opts,
69+
rtc: rtc,
70+
offerrer: client,
71+
closed: make(chan struct{}),
72+
closedRTC: make(chan struct{}),
73+
closedICE: make(chan struct{}),
74+
dcOpenChannel: make(chan *webrtc.DataChannel),
75+
dcDisconnectChannel: make(chan struct{}),
76+
dcFailedChannel: make(chan struct{}),
77+
localCandidateChannel: make(chan webrtc.ICECandidateInit),
78+
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
79+
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
80+
pendingCandidatesToSend: make([]webrtc.ICECandidateInit, 0),
8281
}
8382
if client {
8483
// If we're the client, we want to flip the echo and
@@ -122,10 +121,10 @@ type Conn struct {
122121
dcFailedListeners atomic.Uint32
123122
dcClosedWaitGroup sync.WaitGroup
124123

125-
localNegotiator chan Negotiation
126-
127-
remoteSessionDescription chan webrtc.SessionDescription
128-
remoteSessionDescriptionMutex sync.Mutex
124+
localCandidateChannel chan webrtc.ICECandidateInit
125+
localSessionDescriptionChannel chan webrtc.SessionDescription
126+
remoteSessionDescriptionChannel chan webrtc.SessionDescription
127+
remoteSessionDescriptionMutex sync.Mutex
129128

130129
pendingCandidatesToSend []webrtc.ICECandidateInit
131130
pendingCandidatesToSendMutex sync.Mutex
@@ -177,17 +176,16 @@ func (c *Conn) init() error {
177176
}
178177
c.pendingCandidatesToSendMutex.Lock()
179178
defer c.pendingCandidatesToSendMutex.Unlock()
180-
json := iceCandidate.ToJSON()
181179
if !c.pendingCandidatesFlushed {
182-
c.pendingCandidatesToSend = append(c.pendingCandidatesToSend, json)
180+
c.pendingCandidatesToSend = append(c.pendingCandidatesToSend, iceCandidate.ToJSON())
183181
c.opts.Logger.Debug(context.Background(), "buffering local candidate")
184182
return
185183
}
186184
c.opts.Logger.Debug(context.Background(), "sending local candidate")
187185
select {
188186
case <-c.closed:
189187
break
190-
case c.localNegotiator <- Negotiation{nil, []webrtc.ICECandidateInit{json}}:
188+
case c.localCandidateChannel <- iceCandidate.ToJSON():
191189
}
192190
})
193191
c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) {
@@ -266,15 +264,15 @@ func (c *Conn) negotiate() {
266264
select {
267265
case <-c.closed:
268266
return
269-
case c.localNegotiator <- Negotiation{&offer, nil}:
267+
case c.localSessionDescriptionChannel <- offer:
270268
}
271269
}
272270

273271
var sessionDescription webrtc.SessionDescription
274272
select {
275273
case <-c.closed:
276274
return
277-
case sessionDescription = <-c.remoteSessionDescription:
275+
case sessionDescription = <-c.remoteSessionDescriptionChannel:
278276
}
279277

280278
// This prevents candidates from being added while
@@ -306,18 +304,19 @@ func (c *Conn) negotiate() {
306304
select {
307305
case <-c.closed:
308306
return
309-
case c.localNegotiator <- Negotiation{&answer, nil}:
307+
case c.localSessionDescriptionChannel <- answer:
310308
}
311309
}
312310

313311
c.pendingCandidatesToSendMutex.Lock()
314312
defer c.pendingCandidatesToSendMutex.Unlock()
315-
if len(c.pendingCandidatesToSend) > 0 {
313+
for _, pendingCandidate := range c.pendingCandidatesToSend {
316314
select {
317315
case <-c.closed:
318316
return
319-
case c.localNegotiator <- Negotiation{nil, c.pendingCandidatesToSend}:
317+
case c.localCandidateChannel <- pendingCandidate:
320318
}
319+
c.opts.Logger.Debug(context.Background(), "flushed buffered local candidate")
321320
}
322321
c.opts.Logger.Debug(context.Background(), "flushed buffered local candidates",
323322
slog.F("count", len(c.pendingCandidatesToSend)),
@@ -326,37 +325,32 @@ func (c *Conn) negotiate() {
326325
c.pendingCandidatesFlushed = true
327326
}
328327

329-
// LocalNegotiation returns a channel for connection negotiation.
330-
// This should be piped to another peer connection.
331-
func (c *Conn) LocalNegotiation() <-chan Negotiation {
332-
return c.localNegotiator
328+
// AddRemoteCandidate adds a remote candidate to the RTC connection.
329+
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
330+
c.remoteSessionDescriptionMutex.Lock()
331+
defer c.remoteSessionDescriptionMutex.Unlock()
332+
c.opts.Logger.Debug(context.Background(), "accepting candidate", slog.F("length", len(i.Candidate)))
333+
return c.rtc.AddICECandidate(i)
333334
}
334335

335-
// AddRemoteNegotiation accepts a negotiation message for handshaking a connection.
336-
func (c *Conn) AddRemoteNegotiation(negotiation Negotiation) error {
337-
if negotiation.SessionDescription != nil {
338-
c.opts.Logger.Debug(context.Background(), "adding remote negotiation with session description")
339-
select {
340-
case <-c.closed:
341-
return nil
342-
case c.remoteSessionDescription <- *negotiation.SessionDescription:
343-
}
336+
// SetRemoteSessionDescription sets the remote description for the WebRTC connection.
337+
func (c *Conn) SetRemoteSessionDescription(sessionDescription webrtc.SessionDescription) {
338+
select {
339+
case <-c.closed:
340+
case c.remoteSessionDescriptionChannel <- sessionDescription:
344341
}
342+
}
345343

346-
if len(negotiation.ICECandidates) > 0 {
347-
c.remoteSessionDescriptionMutex.Lock()
348-
defer c.remoteSessionDescriptionMutex.Unlock()
349-
c.opts.Logger.Debug(context.Background(), "adding remote negotiation with ice candidates",
350-
slog.F("count", len(negotiation.ICECandidates)))
351-
for _, iceCandidate := range negotiation.ICECandidates {
352-
err := c.rtc.AddICECandidate(iceCandidate)
353-
if err != nil {
354-
return err
355-
}
356-
}
357-
}
344+
// LocalSessionDescription returns a channel that emits a session description
345+
// when one is required to be exchanged.
346+
func (c *Conn) LocalSessionDescription() <-chan webrtc.SessionDescription {
347+
return c.localSessionDescriptionChannel
348+
}
358349

359-
return nil
350+
// LocalCandidate returns a channel that emits when a local candidate
351+
// needs to be exchanged with a remote connection.
352+
func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
353+
return c.localCandidateChannel
360354
}
361355

362356
func (c *Conn) pingChannel() (*Channel, error) {

peer/conn_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,10 @@ func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.R
307307
go func() {
308308
for {
309309
select {
310-
case c := <-channel2.LocalNegotiation():
311-
_ = channel1.AddRemoteNegotiation(c)
310+
case c := <-channel2.LocalCandidate():
311+
_ = channel1.AddRemoteCandidate(c)
312+
case c := <-channel2.LocalSessionDescription():
313+
channel1.SetRemoteSessionDescription(c)
312314
case <-channel2.Closed():
313315
return
314316
}
@@ -318,8 +320,10 @@ func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.R
318320
go func() {
319321
for {
320322
select {
321-
case c := <-channel1.LocalNegotiation():
322-
_ = channel2.AddRemoteNegotiation(c)
323+
case c := <-channel1.LocalCandidate():
324+
_ = channel2.AddRemoteCandidate(c)
325+
case c := <-channel1.LocalSessionDescription():
326+
channel2.SetRemoteSessionDescription(c)
323327
case <-channel1.Closed():
324328
return
325329
}

peerbroker/dial.go

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,29 @@ func Dial(stream proto.DRPCPeerBroker_NegotiateConnectionClient, iceServers []we
5353
select {
5454
case <-peerConn.Closed():
5555
return
56-
case localNegotiation := <-peerConn.LocalNegotiation():
56+
case sessionDescription := <-peerConn.LocalSessionDescription():
5757
err = stream.Send(&proto.NegotiateConnection_ClientToServer{
58-
Message: &proto.NegotiateConnection_ClientToServer_Negotiation{
59-
Negotiation: convertLocalNegotiation(localNegotiation),
58+
Message: &proto.NegotiateConnection_ClientToServer_Offer{
59+
Offer: &proto.WebRTCSessionDescription{
60+
SdpType: int32(sessionDescription.Type),
61+
Sdp: sessionDescription.SDP,
62+
},
6063
},
6164
})
6265
if err != nil {
6366
_ = peerConn.CloseWithError(xerrors.Errorf("send local session description: %w", err))
6467
return
6568
}
69+
case iceCandidate := <-peerConn.LocalCandidate():
70+
err = stream.Send(&proto.NegotiateConnection_ClientToServer{
71+
Message: &proto.NegotiateConnection_ClientToServer_IceCandidate{
72+
IceCandidate: iceCandidate.Candidate,
73+
},
74+
})
75+
if err != nil {
76+
_ = peerConn.CloseWithError(xerrors.Errorf("send local candidate: %w", err))
77+
return
78+
}
6679
}
6780
}
6881
}()
@@ -74,56 +87,27 @@ func Dial(stream proto.DRPCPeerBroker_NegotiateConnectionClient, iceServers []we
7487
_ = peerConn.CloseWithError(xerrors.Errorf("recv: %w", err))
7588
return
7689
}
77-
if serverToClientMessage.GetNegotiation() == nil {
78-
_ = peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(serverToClientMessage).String()))
79-
return
80-
}
8190

82-
err = peerConn.AddRemoteNegotiation(convertProtoNegotiation(serverToClientMessage.Negotiation))
83-
if err != nil {
84-
_ = peerConn.CloseWithError(xerrors.Errorf("add remote negotiation: %w", err))
91+
switch {
92+
case serverToClientMessage.GetAnswer() != nil:
93+
peerConn.SetRemoteSessionDescription(webrtc.SessionDescription{
94+
Type: webrtc.SDPType(serverToClientMessage.GetAnswer().SdpType),
95+
SDP: serverToClientMessage.GetAnswer().Sdp,
96+
})
97+
case serverToClientMessage.GetIceCandidate() != "":
98+
err = peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{
99+
Candidate: serverToClientMessage.GetIceCandidate(),
100+
})
101+
if err != nil {
102+
_ = peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err))
103+
return
104+
}
105+
default:
106+
_ = peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(serverToClientMessage).String()))
85107
return
86108
}
87109
}
88110
}()
89111

90112
return peerConn, nil
91113
}
92-
93-
func convertLocalNegotiation(localNegotiation peer.Negotiation) *proto.Negotiation {
94-
protoNegotation := &proto.Negotiation{}
95-
if localNegotiation.SessionDescription != nil {
96-
protoNegotation.SessionDescription = &proto.WebRTCSessionDescription{
97-
SdpType: int32(localNegotiation.SessionDescription.Type),
98-
Sdp: localNegotiation.SessionDescription.SDP,
99-
}
100-
}
101-
if len(localNegotiation.ICECandidates) > 0 {
102-
iceCandidates := make([]string, 0, len(localNegotiation.ICECandidates))
103-
for _, iceCandidate := range localNegotiation.ICECandidates {
104-
iceCandidates = append(iceCandidates, iceCandidate.Candidate)
105-
}
106-
protoNegotation.IceCandidates = iceCandidates
107-
}
108-
return protoNegotation
109-
}
110-
111-
func convertProtoNegotiation(protoNegotiation *proto.Negotiation) peer.Negotiation {
112-
localNegotiation := peer.Negotiation{}
113-
if protoNegotiation.SessionDescription != nil {
114-
localNegotiation.SessionDescription = &webrtc.SessionDescription{
115-
Type: webrtc.SDPType(protoNegotiation.SessionDescription.SdpType),
116-
SDP: protoNegotiation.SessionDescription.Sdp,
117-
}
118-
}
119-
if len(protoNegotiation.IceCandidates) > 0 {
120-
candidates := make([]webrtc.ICECandidateInit, 0, len(protoNegotiation.IceCandidates))
121-
for _, iceCandidate := range protoNegotiation.IceCandidates {
122-
candidates = append(candidates, webrtc.ICECandidateInit{
123-
Candidate: iceCandidate,
124-
})
125-
}
126-
localNegotiation.ICECandidates = candidates
127-
}
128-
return localNegotiation
129-
}

peerbroker/listen.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,27 @@ func (b *peerBrokerService) NegotiateConnection(stream proto.DRPCPeerBroker_Nego
120120
select {
121121
case <-peerConn.Closed():
122122
return
123-
case localNegotiation := <-peerConn.LocalNegotiation():
123+
case sessionDescription := <-peerConn.LocalSessionDescription():
124124
err = stream.Send(&proto.NegotiateConnection_ServerToClient{
125-
Negotiation: convertLocalNegotiation(localNegotiation),
125+
Message: &proto.NegotiateConnection_ServerToClient_Answer{
126+
Answer: &proto.WebRTCSessionDescription{
127+
SdpType: int32(sessionDescription.Type),
128+
Sdp: sessionDescription.SDP,
129+
},
130+
},
126131
})
127132
if err != nil {
128-
_ = peerConn.CloseWithError(xerrors.Errorf("send local negotiation: %w", err))
133+
_ = peerConn.CloseWithError(xerrors.Errorf("send local session description: %w", err))
134+
return
135+
}
136+
case iceCandidate := <-peerConn.LocalCandidate():
137+
err = stream.Send(&proto.NegotiateConnection_ServerToClient{
138+
Message: &proto.NegotiateConnection_ServerToClient_IceCandidate{
139+
IceCandidate: iceCandidate.Candidate,
140+
},
141+
})
142+
if err != nil {
143+
_ = peerConn.CloseWithError(xerrors.Errorf("send local candidate: %w", err))
129144
return
130145
}
131146
}
@@ -141,11 +156,11 @@ func (b *peerBrokerService) NegotiateConnection(stream proto.DRPCPeerBroker_Nego
141156
}
142157

143158
switch {
144-
case clientToServerMessage.GetNegotiation() != nil:
145-
err = peerConn.AddRemoteNegotiation(convertProtoNegotiation(clientToServerMessage.GetNegotiation()))
146-
if err != nil {
147-
return peerConn.CloseWithError(xerrors.Errorf("add remote negotiation: %w", err))
148-
}
159+
case clientToServerMessage.GetOffer() != nil:
160+
peerConn.SetRemoteSessionDescription(webrtc.SessionDescription{
161+
Type: webrtc.SDPType(clientToServerMessage.GetOffer().SdpType),
162+
SDP: clientToServerMessage.GetOffer().Sdp,
163+
})
149164
case clientToServerMessage.GetServers() != nil:
150165
// Convert protobuf ICE servers to the WebRTC type.
151166
iceServers := make([]webrtc.ICEServer, 0, len(clientToServerMessage.GetServers().Servers))
@@ -163,6 +178,13 @@ func (b *peerBrokerService) NegotiateConnection(stream proto.DRPCPeerBroker_Nego
163178
if err != nil {
164179
return peerConn.CloseWithError(xerrors.Errorf("set ice configuration: %w", err))
165180
}
181+
case clientToServerMessage.GetIceCandidate() != "":
182+
err = peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{
183+
Candidate: clientToServerMessage.GetIceCandidate(),
184+
})
185+
if err != nil {
186+
return peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err))
187+
}
166188
default:
167189
return peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(clientToServerMessage).String()))
168190
}

0 commit comments

Comments
 (0)