Skip to content

Commit 53cfa8a

Browse files
feat: Create broker for negotiating connections (#14)
* feat: Create broker for negotiating connections WebRTC require an exchange of encryption keys and network hops to connect. This package pipes the exchange over gRPC. This will be used in all connecting clients and agents. * Regenerate protobuf definition * Cache Go build and test * Fix gRPC language with dRPC Co-authored-by: Bryan <bryan@coder.com> Co-authored-by: Bryan <bryan@coder.com>
1 parent 7c260f8 commit 53cfa8a

File tree

14 files changed

+1258
-1
lines changed

14 files changed

+1258
-1
lines changed

.gitattributes

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
# Generated files
2+
peerbroker/proto/*.go linguist-generated=true
23
provisionersdk/proto/*.go linguist-generated=true

.github/workflows/coder.yaml

+12
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ jobs:
113113
with:
114114
go-version: "^1.17"
115115

116+
- uses: actions/cache@v2
117+
with:
118+
# Go mod cache, Linux build cache, Mac build cache, Windows build cache
119+
path: |
120+
~/go/pkg/mod
121+
~/.cache/go-build
122+
~/Library/Caches/go-build
123+
%LocalAppData%\go-build
124+
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
125+
restore-keys: |
126+
${{ runner.os }}-go-
127+
116128
- run: go install gotest.tools/gotestsum@latest
117129

118130
- run:

Makefile

+11-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,19 @@ endif
2222
fmt: fmt/prettier
2323
.PHONY: fmt
2424

25-
gen: database/generate provisionersdk/proto
25+
gen: database/generate peerbroker/proto provisionersdk/proto
2626
.PHONY: gen
2727

28+
# Generates the protocol files.
29+
peerbroker/proto: peerbroker/proto/peerbroker.proto
30+
cd peerbroker/proto && protoc \
31+
--go_out=. \
32+
--go_opt=paths=source_relative \
33+
--go-drpc_out=. \
34+
--go-drpc_opt=paths=source_relative \
35+
./peerbroker.proto
36+
.PHONY: peerbroker/proto
37+
2838
# Generates the protocol files.
2939
provisionersdk/proto: provisionersdk/proto/provisioner.proto
3040
cd provisionersdk/proto && protoc \

codecov.yml

+1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ coverage:
2121

2222
ignore:
2323
# This is generated code.
24+
- peerbroker/proto
2425
- provisionersdk/proto

peer/conn.go

+14
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,12 @@ func (c *Conn) LocalSessionDescription() <-chan webrtc.SessionDescription {
337337
return c.localSessionDescriptionChannel
338338
}
339339

340+
// SetConfiguration applies options to the WebRTC connection.
341+
// Generally used for updating transport options, like ICE servers.
342+
func (c *Conn) SetConfiguration(configuration webrtc.Configuration) error {
343+
return c.rtc.SetConfiguration(configuration)
344+
}
345+
340346
// SetRemoteSessionDescription sets the remote description for the WebRTC connection.
341347
func (c *Conn) SetRemoteSessionDescription(s webrtc.SessionDescription) {
342348
if c.isClosed() {
@@ -388,6 +394,9 @@ func (c *Conn) dialChannel(ctx context.Context, label string, opts *ChannelOpts)
388394
if opts.OpenOnDisconnect && !opts.Negotiated {
389395
return nil, xerrors.New("OpenOnDisconnect is only allowed for Negotiated channels")
390396
}
397+
if c.isClosed() {
398+
return nil, xerrors.Errorf("closed: %w", c.closeError)
399+
}
391400

392401
dc, err := c.rtc.CreateDataChannel(label, &webrtc.DataChannelInit{
393402
ID: id,
@@ -446,6 +455,11 @@ func (c *Conn) Close() error {
446455
return c.closeWithError(nil)
447456
}
448457

458+
// CloseWithError closes the connection; subsequent reads/writes will return the error err.
459+
func (c *Conn) CloseWithError(err error) error {
460+
return c.closeWithError(err)
461+
}
462+
449463
func (c *Conn) isClosed() bool {
450464
select {
451465
case <-c.closed:

peer/conn_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package peer_test
22

33
import (
44
"context"
5+
"errors"
56
"io"
67
"net"
78
"net/http"
@@ -193,6 +194,15 @@ func TestConn(t *testing.T) {
193194
require.NoError(t, err)
194195
})
195196

197+
t.Run("CloseWithError", func(t *testing.T) {
198+
conn, err := peer.Client([]webrtc.ICEServer{}, nil)
199+
require.NoError(t, err)
200+
expectedErr := errors.New("wow")
201+
_ = conn.CloseWithError(expectedErr)
202+
_, err = conn.Dial(context.Background(), "", nil)
203+
require.ErrorIs(t, err, expectedErr)
204+
})
205+
196206
t.Run("PingConcurrent", func(t *testing.T) {
197207
t.Parallel()
198208
client, server, _ := createPair(t)

peerbroker/dial.go

+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package peerbroker
2+
3+
import (
4+
"reflect"
5+
6+
"github.com/pion/webrtc/v3"
7+
"golang.org/x/xerrors"
8+
9+
"github.com/coder/coder/peer"
10+
"github.com/coder/coder/peerbroker/proto"
11+
)
12+
13+
// Dial consumes the PeerBroker gRPC connection negotiation stream to produce a WebRTC peered connection.
14+
func Dial(stream proto.DRPCPeerBroker_NegotiateConnectionClient, iceServers []webrtc.ICEServer, opts *peer.ConnOpts) (*peer.Conn, error) {
15+
// Convert WebRTC ICE servers to the protobuf type.
16+
protoIceServers := make([]*proto.WebRTCICEServer, 0, len(iceServers))
17+
for _, iceServer := range iceServers {
18+
var credentialString string
19+
if value, ok := iceServer.Credential.(string); ok {
20+
credentialString = value
21+
}
22+
protoIceServers = append(protoIceServers, &proto.WebRTCICEServer{
23+
Urls: iceServer.URLs,
24+
Username: iceServer.Username,
25+
Credential: credentialString,
26+
CredentialType: int32(iceServer.CredentialType),
27+
})
28+
}
29+
if len(protoIceServers) > 0 {
30+
// Send ICE servers to connect with.
31+
// Client sends ICE servers so clients can determine the node
32+
// servers will meet at. eg. us-west1.coder.com could be a TURN server.
33+
err := stream.Send(&proto.NegotiateConnection_ClientToServer{
34+
Message: &proto.NegotiateConnection_ClientToServer_Servers{
35+
Servers: &proto.WebRTCICEServers{
36+
Servers: protoIceServers,
37+
},
38+
},
39+
})
40+
if err != nil {
41+
return nil, xerrors.Errorf("write ice servers: %w", err)
42+
}
43+
}
44+
45+
peerConn, err := peer.Client(iceServers, opts)
46+
if err != nil {
47+
return nil, xerrors.Errorf("create peer connection: %w", err)
48+
}
49+
go func() {
50+
defer stream.Close()
51+
// Exchanging messages from the peer connection to negotiate a connection.
52+
for {
53+
select {
54+
case <-peerConn.Closed():
55+
return
56+
case sessionDescription := <-peerConn.LocalSessionDescription():
57+
err = stream.Send(&proto.NegotiateConnection_ClientToServer{
58+
Message: &proto.NegotiateConnection_ClientToServer_Offer{
59+
Offer: &proto.WebRTCSessionDescription{
60+
SdpType: int32(sessionDescription.Type),
61+
Sdp: sessionDescription.SDP,
62+
},
63+
},
64+
})
65+
if err != nil {
66+
_ = peerConn.CloseWithError(xerrors.Errorf("send local session description: %w", err))
67+
return
68+
}
69+
case iceCandidate := <-peerConn.LocalCandidate():
70+
err = stream.Send(&proto.NegotiateConnection_ClientToServer{
71+
Message: &proto.NegotiateConnection_ClientToServer_IceCandidate{
72+
IceCandidate: iceCandidate.Candidate,
73+
},
74+
})
75+
if err != nil {
76+
_ = peerConn.CloseWithError(xerrors.Errorf("send local candidate: %w", err))
77+
return
78+
}
79+
}
80+
}
81+
}()
82+
go func() {
83+
// Exchanging messages from the server to negotiate a connection.
84+
for {
85+
serverToClientMessage, err := stream.Recv()
86+
if err != nil {
87+
_ = peerConn.CloseWithError(err)
88+
return
89+
}
90+
91+
switch {
92+
case serverToClientMessage.GetAnswer() != nil:
93+
peerConn.SetRemoteSessionDescription(webrtc.SessionDescription{
94+
Type: webrtc.SDPType(serverToClientMessage.GetAnswer().SdpType),
95+
SDP: serverToClientMessage.GetAnswer().Sdp,
96+
})
97+
case serverToClientMessage.GetIceCandidate() != "":
98+
err = peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{
99+
Candidate: serverToClientMessage.GetIceCandidate(),
100+
})
101+
if err != nil {
102+
_ = peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err))
103+
return
104+
}
105+
default:
106+
_ = peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(serverToClientMessage).String()))
107+
return
108+
}
109+
}
110+
}()
111+
112+
return peerConn, nil
113+
}

peerbroker/dial_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package peerbroker_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/coder/coder/peerbroker"
8+
"github.com/coder/coder/peerbroker/proto"
9+
"github.com/coder/coder/provisionersdk"
10+
"github.com/pion/webrtc/v3"
11+
"github.com/stretchr/testify/require"
12+
"go.uber.org/goleak"
13+
"storj.io/drpc/drpcconn"
14+
)
15+
16+
func TestMain(m *testing.M) {
17+
goleak.VerifyTestMain(m)
18+
}
19+
20+
func TestDial(t *testing.T) {
21+
t.Run("Connect", func(t *testing.T) {
22+
ctx := context.Background()
23+
client, server := provisionersdk.TransportPipe()
24+
defer client.Close()
25+
defer server.Close()
26+
27+
listener, err := peerbroker.Listen(server, nil)
28+
require.NoError(t, err)
29+
30+
api := proto.NewDRPCPeerBrokerClient(drpcconn.New(client))
31+
stream, err := api.NegotiateConnection(ctx)
32+
require.NoError(t, err)
33+
clientConn, err := peerbroker.Dial(stream, []webrtc.ICEServer{{
34+
URLs: []string{"stun:stun.l.google.com:19302"},
35+
}}, nil)
36+
require.NoError(t, err)
37+
defer clientConn.Close()
38+
39+
serverConn, err := listener.Accept()
40+
require.NoError(t, err)
41+
defer serverConn.Close()
42+
_, err = serverConn.Ping()
43+
require.NoError(t, err)
44+
45+
_, err = clientConn.Ping()
46+
require.NoError(t, err)
47+
})
48+
}

0 commit comments

Comments
 (0)