Skip to content

Commit 93b1ea4

Browse files
committed
Merge branch 'main' into bryphe/refactor/add-stability-workflow
2 parents 636c911 + b58e168 commit 93b1ea4

File tree

6 files changed

+365
-49
lines changed

6 files changed

+365
-49
lines changed

go.mod

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ replace github.com/pion/ice/v2 => github.com/kylecarbs/ice/v2 v2.1.8-0.202202211
2222

2323
require (
2424
cdr.dev/slog v1.4.1
25-
cloud.google.com/go/compute v1.3.0
25+
cloud.google.com/go/compute v1.4.0
2626
github.com/briandowns/spinner v1.18.1
2727
github.com/coder/retry v1.3.0
2828
github.com/creack/pty v1.1.17
@@ -49,7 +49,7 @@ require (
4949
github.com/pion/datachannel v1.5.2
5050
github.com/pion/logging v0.2.2
5151
github.com/pion/transport v0.13.0
52-
github.com/pion/webrtc/v3 v3.1.23
52+
github.com/pion/webrtc/v3 v3.1.24
5353
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8
5454
github.com/quasilyte/go-ruleguard/dsl v0.3.17
5555
github.com/spf13/cobra v1.3.0
@@ -63,7 +63,7 @@ require (
6363
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
6464
golang.org/x/sys v0.0.0-20220209214540-3681064d5158
6565
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
66-
google.golang.org/api v0.69.0
66+
google.golang.org/api v0.70.0
6767
google.golang.org/protobuf v1.27.1
6868
nhooyr.io/websocket v1.8.7
6969
storj.io/drpc v0.0.29
@@ -112,8 +112,8 @@ require (
112112
github.com/opencontainers/go-digest v1.0.0 // indirect
113113
github.com/opencontainers/image-spec v1.0.2 // indirect
114114
github.com/opencontainers/runc v1.1.0 // indirect
115-
github.com/pion/dtls/v2 v2.1.2 // indirect
116-
github.com/pion/ice/v2 v2.1.20 // indirect
115+
github.com/pion/dtls/v2 v2.1.3 // indirect
116+
github.com/pion/ice/v2 v2.2.1 // indirect
117117
github.com/pion/interceptor v0.1.7 // indirect
118118
github.com/pion/mdns v0.0.5 // indirect
119119
github.com/pion/randutil v0.1.0 // indirect
@@ -138,7 +138,7 @@ require (
138138
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
139139
golang.org/x/text v0.3.7 // indirect
140140
google.golang.org/appengine v1.6.7 // indirect
141-
google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c // indirect
141+
google.golang.org/genproto v0.0.0-20220222154240-daf995802d7b // indirect
142142
google.golang.org/grpc v1.44.0 // indirect
143143
gopkg.in/yaml.v2 v2.4.0 // indirect
144144
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect

go.sum

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4g
4242
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
4343
cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
4444
cloud.google.com/go/compute v1.2.0/go.mod h1:xlogom/6gr8RJGBe7nT2eGsQYAFUbbv8dbC29qE3Xmw=
45-
cloud.google.com/go/compute v1.3.0 h1:mPL/MzDDYHsh5tHRS9mhmhWlcgClCrCa6ApQCU6wnHI=
4645
cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM=
46+
cloud.google.com/go/compute v1.4.0 h1:tzSyCe254NKkL8zshJUSoVvI9mcgbFdSpCC44uUNjT0=
47+
cloud.google.com/go/compute v1.4.0/go.mod h1:TcrKl8VipL9ZM0wEjdooJ1eet/6YsEV/E/larxxkAdg=
4748
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
4849
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
4950
cloud.google.com/go/firestore v1.6.1/go.mod h1:asNXNOzBdyVQmEU+ggO8UPodTkEVFW5Qx+rwHnAz+EY=
@@ -1036,8 +1037,9 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
10361037
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
10371038
github.com/pion/datachannel v1.5.2 h1:piB93s8LGmbECrpO84DnkIVWasRMk3IimbcXkTQLE6E=
10381039
github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ=
1039-
github.com/pion/dtls/v2 v2.1.2 h1:22Q1Jk9L++Yo7BIf9130MonNPfPVb+YgdYLeyQotuAA=
10401040
github.com/pion/dtls/v2 v2.1.2/go.mod h1:o6+WvyLDAlXF7YiPB/RlskRoeK+/JtuaZa5emwQcWus=
1041+
github.com/pion/dtls/v2 v2.1.3 h1:3UF7udADqous+M2R5Uo2q/YaP4EzUoWKdfX2oscCUio=
1042+
github.com/pion/dtls/v2 v2.1.3/go.mod h1:o6+WvyLDAlXF7YiPB/RlskRoeK+/JtuaZa5emwQcWus=
10411043
github.com/pion/interceptor v0.1.7 h1:HThW0tIIKT9RRoDWGURe8rlZVOx0fJHxBHpA0ej0+bo=
10421044
github.com/pion/interceptor v0.1.7/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U=
10431045
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
@@ -1069,8 +1071,8 @@ github.com/pion/turn/v2 v2.0.8 h1:KEstL92OUN3k5k8qxsXHpr7WWfrdp7iJZHx99ud8muw=
10691071
github.com/pion/turn/v2 v2.0.8/go.mod h1:+y7xl719J8bAEVpSXBXvTxStjJv3hbz9YFflvkpcGPw=
10701072
github.com/pion/udp v0.1.1 h1:8UAPvyqmsxK8oOjloDk4wUt63TzFe9WEJkg5lChlj7o=
10711073
github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M=
1072-
github.com/pion/webrtc/v3 v3.1.23 h1:suyNiF9o2/6SBsyWA1UweraUWYkaHCNJdt/16b61I5w=
1073-
github.com/pion/webrtc/v3 v3.1.23/go.mod h1:L5S/oAhL0Fzt/rnftVQRrP80/j5jygY7XRZzWwFx6P4=
1074+
github.com/pion/webrtc/v3 v3.1.24 h1:s9PuwisrgHe1FTqfwK4p3T7rXtAHaUNhycbdMjADT28=
1075+
github.com/pion/webrtc/v3 v3.1.24/go.mod h1:mO/yv7fBN3Lp7YNlnYcTj1jtpvNvssJG+7eh6itZ4xM=
10741076
github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
10751077
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
10761078
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
@@ -1743,8 +1745,9 @@ google.golang.org/api v0.62.0/go.mod h1:dKmwPCydfsad4qCH08MSdgWjfHOyfpd4VtDGgRFd
17431745
google.golang.org/api v0.63.0/go.mod h1:gs4ij2ffTRXwuzzgJl/56BdwJaA194ijkfn++9tDuPo=
17441746
google.golang.org/api v0.66.0/go.mod h1:I1dmXYpX7HGwz/ejRxwQp2qj5bFAz93HiCU1C1oYd9M=
17451747
google.golang.org/api v0.67.0/go.mod h1:ShHKP8E60yPsKNw/w8w+VYaj9H6buA5UqDp8dhbQZ6g=
1746-
google.golang.org/api v0.69.0 h1:yHW5s2SFyDapr/43kYtIQmoaaFVW4baLMLwqV4auj2A=
17471748
google.golang.org/api v0.69.0/go.mod h1:boanBiw+h5c3s+tBPgEzLDRHfFLWV0qXxRHz3ws7C80=
1749+
google.golang.org/api v0.70.0 h1:67zQnAE0T2rB0A3CwLSas0K+SbVzSxP+zTLkQLexeiw=
1750+
google.golang.org/api v0.70.0/go.mod h1:Bs4ZM2HGifEvXwd50TtW70ovgJffJYw2oRCOFU/SkfA=
17481751
google.golang.org/appengine v1.0.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
17491752
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
17501753
google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -1833,8 +1836,9 @@ google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350/go.mod h1:5CzLGKJ6
18331836
google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
18341837
google.golang.org/genproto v0.0.0-20220207164111-0872dc986b00/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
18351838
google.golang.org/genproto v0.0.0-20220211171837-173942840c17/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
1836-
google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c h1:TU4rFa5APdKTq0s6B7WTsH6Xmx0Knj86s6Biz56mErE=
18371839
google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
1840+
google.golang.org/genproto v0.0.0-20220222154240-daf995802d7b h1:wHqTlwZVR0x5EG2S6vKlCq63+Tl/vBoQELitHxqxDOo=
1841+
google.golang.org/genproto v0.0.0-20220222154240-daf995802d7b/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
18381842
google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
18391843
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
18401844
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=

peerbroker/proxy.go

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
package peerbroker
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"net"
9+
"sync"
10+
11+
"github.com/google/uuid"
12+
"github.com/hashicorp/yamux"
13+
"golang.org/x/xerrors"
14+
protobuf "google.golang.org/protobuf/proto"
15+
"storj.io/drpc/drpcmux"
16+
"storj.io/drpc/drpcserver"
17+
18+
"cdr.dev/slog"
19+
"github.com/coder/coder/database"
20+
"github.com/coder/coder/peerbroker/proto"
21+
)
22+
23+
var (
24+
// Each NegotiateConnection() function call spawns a new stream.
25+
streamIDLength = len(uuid.NewString())
26+
// We shouldn't PubSub anything larger than this!
27+
maxPayloadSizeBytes = 8192
28+
)
29+
30+
// ProxyOptions provides values to configure a proxy.
31+
type ProxyOptions struct {
32+
ChannelID string
33+
Logger slog.Logger
34+
Pubsub database.Pubsub
35+
}
36+
37+
// ProxyDial writes client negotiation streams over PubSub.
38+
//
39+
// PubSub is used to geodistribute WebRTC handshakes. All negotiation
40+
// messages are small in size (<=8KB), and we don't require delivery
41+
// guarantees because connections can always be renegotiated.
42+
// ┌────────────────────┐ ┌─────────────────────────────┐
43+
// │ coderd │ │ coderd │
44+
// ┌─────────────────────┐ │/<agent-id>/connect │ │ /<agent-id>/listen │
45+
// │ client │ │ │ │ │ ┌─────┐
46+
// │ ├──►│Creates a stream ID │◄─►│Subscribe() to the <agent-id>│◄──┤agent│
47+
// │NegotiateConnection()│ │and Publish() to the│ │channel. Parse the stream ID │ └─────┘
48+
// └─────────────────────┘ │<agent-id> channel: │ │from payloads to create new │
49+
// │ │ │NegotiateConnection() streams│
50+
// │<stream-id><payload>│ │or write to existing ones. │
51+
// └────────────────────┘ └─────────────────────────────┘
52+
func ProxyDial(client proto.DRPCPeerBrokerClient, options ProxyOptions) (io.Closer, error) {
53+
proxyDial := &proxyDial{
54+
channelID: options.ChannelID,
55+
logger: options.Logger,
56+
pubsub: options.Pubsub,
57+
connection: client,
58+
streams: make(map[string]proto.DRPCPeerBroker_NegotiateConnectionClient),
59+
}
60+
return proxyDial, proxyDial.listen()
61+
}
62+
63+
// ProxyListen accepts client negotiation streams over PubSub and writes them to the listener
64+
// as new NegotiateConnection() streams.
65+
func ProxyListen(ctx context.Context, connListener net.Listener, options ProxyOptions) error {
66+
mux := drpcmux.New()
67+
err := proto.DRPCRegisterPeerBroker(mux, &proxyListen{
68+
channelID: options.ChannelID,
69+
pubsub: options.Pubsub,
70+
logger: options.Logger,
71+
})
72+
if err != nil {
73+
return xerrors.Errorf("register peer broker: %w", err)
74+
}
75+
server := drpcserver.New(mux)
76+
err = server.Serve(ctx, connListener)
77+
if err != nil {
78+
if errors.Is(err, yamux.ErrSessionShutdown) {
79+
return nil
80+
}
81+
return xerrors.Errorf("serve: %w", err)
82+
}
83+
return nil
84+
}
85+
86+
type proxyListen struct {
87+
channelID string
88+
pubsub database.Pubsub
89+
logger slog.Logger
90+
}
91+
92+
func (p *proxyListen) NegotiateConnection(stream proto.DRPCPeerBroker_NegotiateConnectionStream) error {
93+
streamID := uuid.NewString()
94+
var err error
95+
closeSubscribe, err := p.pubsub.Subscribe(proxyInID(p.channelID), func(ctx context.Context, message []byte) {
96+
err := p.onServerToClientMessage(streamID, stream, message)
97+
if err != nil {
98+
p.logger.Debug(ctx, "failed to accept server message", slog.Error(err))
99+
}
100+
})
101+
if err != nil {
102+
return xerrors.Errorf("subscribe: %w", err)
103+
}
104+
defer closeSubscribe()
105+
for {
106+
clientToServerMessage, err := stream.Recv()
107+
if err != nil {
108+
if errors.Is(err, io.EOF) {
109+
break
110+
}
111+
return xerrors.Errorf("recv: %w", err)
112+
}
113+
data, err := protobuf.Marshal(clientToServerMessage)
114+
if err != nil {
115+
return xerrors.Errorf("marshal: %w", err)
116+
}
117+
if len(data) > maxPayloadSizeBytes {
118+
return xerrors.Errorf("maximum payload size %d exceeded", maxPayloadSizeBytes)
119+
}
120+
data = append([]byte(streamID), data...)
121+
err = p.pubsub.Publish(proxyOutID(p.channelID), data)
122+
if err != nil {
123+
return xerrors.Errorf("publish: %w", err)
124+
}
125+
}
126+
return nil
127+
}
128+
129+
func (*proxyListen) onServerToClientMessage(streamID string, stream proto.DRPCPeerBroker_NegotiateConnectionStream, message []byte) error {
130+
if len(message) < streamIDLength {
131+
return xerrors.Errorf("got message length %d < %d", len(message), streamIDLength)
132+
}
133+
serverStreamID := string(message[0:streamIDLength])
134+
if serverStreamID != streamID {
135+
// It's not trying to communicate with this stream!
136+
return nil
137+
}
138+
var msg proto.NegotiateConnection_ServerToClient
139+
err := protobuf.Unmarshal(message[streamIDLength:], &msg)
140+
if err != nil {
141+
return xerrors.Errorf("unmarshal message: %w", err)
142+
}
143+
err = stream.Send(&msg)
144+
if err != nil {
145+
return xerrors.Errorf("send message: %w", err)
146+
}
147+
return nil
148+
}
149+
150+
type proxyDial struct {
151+
channelID string
152+
pubsub database.Pubsub
153+
logger slog.Logger
154+
155+
connection proto.DRPCPeerBrokerClient
156+
closeSubscribe func()
157+
streamMutex sync.Mutex
158+
streams map[string]proto.DRPCPeerBroker_NegotiateConnectionClient
159+
}
160+
161+
func (p *proxyDial) listen() error {
162+
var err error
163+
p.closeSubscribe, err = p.pubsub.Subscribe(proxyOutID(p.channelID), func(ctx context.Context, message []byte) {
164+
err := p.onClientToServerMessage(ctx, message)
165+
if err != nil {
166+
p.logger.Debug(ctx, "failed to accept client message", slog.Error(err))
167+
}
168+
})
169+
if err != nil {
170+
return err
171+
}
172+
return nil
173+
}
174+
175+
func (p *proxyDial) onClientToServerMessage(ctx context.Context, message []byte) error {
176+
if len(message) < streamIDLength {
177+
return xerrors.Errorf("got message length %d < %d", len(message), streamIDLength)
178+
}
179+
var err error
180+
streamID := string(message[0:streamIDLength])
181+
p.streamMutex.Lock()
182+
stream, ok := p.streams[streamID]
183+
if !ok {
184+
stream, err = p.connection.NegotiateConnection(ctx)
185+
if err != nil {
186+
p.streamMutex.Unlock()
187+
return xerrors.Errorf("negotiate connection: %w", err)
188+
}
189+
p.streams[streamID] = stream
190+
go func() {
191+
defer stream.Close()
192+
193+
err = p.onServerToClientMessage(streamID, stream)
194+
if err != nil {
195+
p.logger.Debug(ctx, "failed to accept server message", slog.Error(err))
196+
}
197+
}()
198+
go func() {
199+
<-stream.Context().Done()
200+
p.streamMutex.Lock()
201+
delete(p.streams, streamID)
202+
p.streamMutex.Unlock()
203+
}()
204+
}
205+
p.streamMutex.Unlock()
206+
207+
var msg proto.NegotiateConnection_ClientToServer
208+
err = protobuf.Unmarshal(message[streamIDLength:], &msg)
209+
if err != nil {
210+
return xerrors.Errorf("unmarshal message: %w", err)
211+
}
212+
err = stream.Send(&msg)
213+
if err != nil {
214+
return xerrors.Errorf("write message: %w", err)
215+
}
216+
return nil
217+
}
218+
219+
func (p *proxyDial) onServerToClientMessage(streamID string, stream proto.DRPCPeerBroker_NegotiateConnectionClient) error {
220+
for {
221+
serverToClientMessage, err := stream.Recv()
222+
if err != nil {
223+
if errors.Is(err, io.EOF) {
224+
break
225+
}
226+
if errors.Is(err, context.Canceled) {
227+
break
228+
}
229+
return xerrors.Errorf("recv: %w", err)
230+
}
231+
data, err := protobuf.Marshal(serverToClientMessage)
232+
if err != nil {
233+
return xerrors.Errorf("marshal: %w", err)
234+
}
235+
if len(data) > maxPayloadSizeBytes {
236+
return xerrors.Errorf("maximum payload size %d exceeded", maxPayloadSizeBytes)
237+
}
238+
data = append([]byte(streamID), data...)
239+
err = p.pubsub.Publish(proxyInID(p.channelID), data)
240+
if err != nil {
241+
return xerrors.Errorf("publish: %w", err)
242+
}
243+
}
244+
return nil
245+
}
246+
247+
func (p *proxyDial) Close() error {
248+
p.streamMutex.Lock()
249+
defer p.streamMutex.Unlock()
250+
p.closeSubscribe()
251+
return nil
252+
}
253+
254+
func proxyOutID(channelID string) string {
255+
return fmt.Sprintf("%s-out", channelID)
256+
}
257+
258+
func proxyInID(channelID string) string {
259+
return fmt.Sprintf("%s-in", channelID)
260+
}

0 commit comments

Comments
 (0)