From c86a67f269a4446f9754eeb214145faf56fc639b Mon Sep 17 00:00:00 2001 From: Garrett Date: Mon, 26 Apr 2021 22:12:38 +0000 Subject: [PATCH] Centralize webrtc dial logic into xwebrtc --- agent/server.go | 5 +- agent/stream.go | 15 +- go.mod | 1 + internal/cmd/tunnel.go | 223 +++++-------------------- internal/x/xwebrtc/channel.go | 56 ------- internal/x/xwebrtc/conn.go | 20 --- xwebrtc/channel.go | 76 +++++++++ xwebrtc/conn.go | 80 +++++++++ xwebrtc/dialer.go | 195 +++++++++++++++++++++ {internal/x/xwebrtc => xwebrtc}/doc.go | 0 10 files changed, 403 insertions(+), 268 deletions(-) delete mode 100644 internal/x/xwebrtc/channel.go delete mode 100644 internal/x/xwebrtc/conn.go create mode 100644 xwebrtc/channel.go create mode 100644 xwebrtc/conn.go create mode 100644 xwebrtc/dialer.go rename {internal/x/xwebrtc => xwebrtc}/doc.go (100%) diff --git a/agent/server.go b/agent/server.go index 33a33fdb..73d399a2 100644 --- a/agent/server.go +++ b/agent/server.go @@ -59,8 +59,6 @@ func (s *Server) Run(ctx context.Context) error { }), ).Run( func() error { - ctx, cancelFunc := context.WithTimeout(ctx, time.Second*15) - defer cancelFunc() s.log.Info(ctx, "connecting to coder", slog.F("url", s.listenURL.String())) conn, resp, err := websocket.Dial(ctx, s.listenURL.String(), nil) if err != nil && resp == nil { @@ -71,7 +69,8 @@ func (s *Server) Run(ctx context.Context) error { Response: resp, } } - nc := websocket.NetConn(context.Background(), conn, websocket.MessageBinary) + + nc := websocket.NetConn(ctx, conn, websocket.MessageBinary) session, err := yamux.Server(nc, nil) if err != nil { return fmt.Errorf("open: %w", err) diff --git a/agent/stream.go b/agent/stream.go index 01a10f77..4a940f5a 100644 --- a/agent/stream.go +++ b/agent/stream.go @@ -7,12 +7,13 @@ import ( "io" "net" + "cdr.dev/coder-cli/xwebrtc" + "cdr.dev/slog" "github.com/hashicorp/yamux" "github.com/pion/webrtc/v3" "golang.org/x/xerrors" - "cdr.dev/coder-cli/internal/x/xwebrtc" "cdr.dev/coder-cli/pkg/proto" ) @@ -128,6 +129,10 @@ func (s *stream) processMessage(msg proto.Message) { } func (s *stream) processDataChannel(channel *webrtc.DataChannel) { + if channel.Protocol() == "control" { + return + } + if channel.Protocol() == "ping" { channel.OnOpen(func() { rw, err := channel.Detach() @@ -149,7 +154,7 @@ func (s *stream) processDataChannel(channel *webrtc.DataChannel) { return } - prto, port, err := xwebrtc.ParseProxyDataChannel(channel) + prto, addr, err := xwebrtc.ParseProxyDataChannel(channel) if err != nil { s.fatal(fmt.Errorf("failed to parse proxy data channel: %w", err)) return @@ -159,14 +164,14 @@ func (s *stream) processDataChannel(channel *webrtc.DataChannel) { return } - conn, err := net.Dial(prto, fmt.Sprintf("localhost:%d", port)) + conn, err := net.Dial(prto, addr) if err != nil { - s.fatal(fmt.Errorf("failed to dial client port: %d", port)) + s.fatal(fmt.Errorf("failed to dial client addr: %s", addr)) return } channel.OnOpen(func() { - s.logger.Debug(context.Background(), "proxying data channel to local port", slog.F("port", port)) + s.logger.Debug(context.Background(), "proxying data channel", slog.F("addr", addr)) rw, err := channel.Detach() if err != nil { _ = channel.Close() diff --git a/go.mod b/go.mod index 81a93226..d3129a91 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/rjeczalik/notify v0.9.2 github.com/spf13/cobra v1.1.3 go.coder.com/retry v1.2.0 + golang.org/x/net v0.0.0-20210420210106-798c2154c571 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 diff --git a/internal/cmd/tunnel.go b/internal/cmd/tunnel.go index c1a56621..ec8e179d 100644 --- a/internal/cmd/tunnel.go +++ b/internal/cmd/tunnel.go @@ -2,26 +2,21 @@ package cmd import ( "context" - "encoding/json" "fmt" "io" "net" "net/url" "os" "strconv" - "time" "cdr.dev/slog" "cdr.dev/slog/sloggers/sloghuman" - "github.com/pion/webrtc/v3" "github.com/spf13/cobra" "golang.org/x/xerrors" - "nhooyr.io/websocket" "cdr.dev/coder-cli/coder-sdk" "cdr.dev/coder-cli/internal/x/xcobra" - "cdr.dev/coder-cli/internal/x/xwebrtc" - "cdr.dev/coder-cli/pkg/proto" + "cdr.dev/coder-cli/xwebrtc" ) func tunnelCmd() *cobra.Command { @@ -41,26 +36,26 @@ coder tunnel my-dev 3000 3000 remotePort, err := strconv.ParseUint(args[1], 10, 16) if err != nil { - log.Fatal(ctx, "parse remote port", slog.Error(err)) + return xerrors.Errorf("parse remote port: %w", err) } var localPort uint64 if args[2] != "stdio" { localPort, err = strconv.ParseUint(args[2], 10, 16) if err != nil { - log.Fatal(ctx, "parse local port", slog.Error(err)) + return xerrors.Errorf("parse local port: %w", err) } } sdk, err := newClient(ctx) if err != nil { - return err + return xerrors.Errorf("getting coder client: %w", err) } baseURL := sdk.BaseURL() envs, err := getEnvs(ctx, sdk, coder.Me) if err != nil { - return err + return xerrors.Errorf("get workspaces: %w", err) } var envID string @@ -74,20 +69,19 @@ coder tunnel my-dev 3000 3000 return xerrors.Errorf("No workspace found by name '%s'", args[0]) } - c := &client{ - id: envID, - stdio: args[2] == "stdio", - localPort: uint16(localPort), - remotePort: uint16(remotePort), - ctx: context.Background(), - logger: log.Leveled(slog.LevelDebug), - brokerAddr: baseURL, - token: sdk.Token(), + c := &tunnneler{ + log: log.Leveled(slog.LevelDebug), + brokerAddr: &baseURL, + token: sdk.Token(), + workspaceID: envID, + stdio: args[2] == "stdio", + localPort: uint16(localPort), + remotePort: uint16(remotePort), } - err = c.start() + err = c.start(ctx) if err != nil { - log.Fatal(ctx, err.Error()) + return xerrors.Errorf("running tunnel: %w", err) } return nil @@ -97,197 +91,58 @@ coder tunnel my-dev 3000 3000 return cmd } -type client struct { - ctx context.Context - brokerAddr url.URL - token string - logger slog.Logger - id string - remotePort uint16 - localPort uint16 - stdio bool +type tunnneler struct { + log slog.Logger + brokerAddr *url.URL + token string + workspaceID string + remotePort uint16 + localPort uint16 + stdio bool } -func (c *client) start() error { - url := fmt.Sprintf("%s%s%s%s%s", c.brokerAddr.String(), "/api/private/envagent/", c.id, "/connect?session_token=", c.token) - turnScheme := "turns" - if c.brokerAddr.Scheme == "http" { - turnScheme = "turn" - } - tcpProxy := fmt.Sprintf("%s:%s:5349?transport=tcp", turnScheme, c.brokerAddr.Host) - c.logger.Info(c.ctx, "connecting to broker", slog.F("url", url), slog.F("tcp-proxy", tcpProxy)) - conn, resp, err := websocket.Dial(c.ctx, url, nil) - if err != nil && resp == nil { - return fmt.Errorf("dial: %w", err) - } - if err != nil && resp != nil { - return &coder.HTTPError{ - Response: resp, - } - } - nconn := websocket.NetConn(context.Background(), conn, websocket.MessageBinary) - - // Only enabled under a private feature flag for now, - // so insecure connections are entirely fine to allow. - servers := []webrtc.ICEServer{{ - URLs: []string{tcpProxy}, - Username: "insecure", - Credential: "pass", - CredentialType: webrtc.ICECredentialTypePassword, - }} - rtc, err := xwebrtc.NewPeerConnection(servers) - if err != nil { - return fmt.Errorf("create connection: %w", err) - } - - rtc.OnNegotiationNeeded(func() { - c.logger.Debug(context.Background(), "negotiation needed...") - }) - - rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { - c.logger.Info(context.Background(), "connection state changed", slog.F("state", pcs)) - }) - - channel, err := xwebrtc.NewProxyDataChannel(rtc, "forwarder", "tcp", c.remotePort) - if err != nil { - return fmt.Errorf("create data channel: %w", err) - } - flushCandidates := proto.ProxyICECandidates(rtc, nconn) - - localDesc, err := rtc.CreateOffer(&webrtc.OfferOptions{}) - if err != nil { - return fmt.Errorf("create offer: %w", err) - } - - err = rtc.SetLocalDescription(localDesc) - if err != nil { - return fmt.Errorf("set local desc: %w", err) - } - - c.logger.Debug(context.Background(), "writing offer") - b, _ := json.Marshal(&proto.Message{ - Offer: &localDesc, - Servers: servers, - }) - _, err = nconn.Write(b) +func (c *tunnneler) start(ctx context.Context) error { + wd, err := xwebrtc.NewWorkspaceDialer(ctx, c.log, c.brokerAddr, c.token, c.workspaceID) if err != nil { - return fmt.Errorf("write offer: %w", err) - } - flushCandidates() - - go func() { - err = xwebrtc.WaitForDataChannelOpen(context.Background(), channel) - if err != nil { - c.logger.Fatal(context.Background(), "waiting for data channel open", slog.Error(err)) - } - _ = conn.Close(websocket.StatusNormalClosure, "rtc connected") - }() - - decoder := json.NewDecoder(nconn) - for { - var msg proto.Message - err = decoder.Decode(&msg) - if err == io.EOF { - break - } - if websocket.CloseStatus(err) == websocket.StatusNormalClosure { - break - } - if err != nil { - return fmt.Errorf("read msg: %w", err) - } - if msg.Candidate != "" { - c.logger.Debug(context.Background(), "accepted ice candidate", slog.F("candidate", msg.Candidate)) - err = proto.AcceptICECandidate(rtc, &msg) - if err != nil { - return fmt.Errorf("accept ice: %w", err) - } - } - if msg.Answer != nil { - c.logger.Debug(context.Background(), "got answer", slog.F("answer", msg.Answer)) - err = rtc.SetRemoteDescription(*msg.Answer) - if err != nil { - return fmt.Errorf("set remote: %w", err) - } - } + return xerrors.Errorf("creating workspace dialer: %w", wd) } - - // Once we're open... let's test out the ping. - pingProto := "ping" - pingChannel, err := rtc.CreateDataChannel("pinger", &webrtc.DataChannelInit{ - Protocol: &pingProto, - }) + nc, err := wd.DialContext(ctx, xwebrtc.NetworkTCP, fmt.Sprintf("localhost:%d", c.remotePort)) if err != nil { - return fmt.Errorf("create ping channel") + return xerrors.Errorf("dial: %w", err) } - pingChannel.OnOpen(func() { - defer func() { - _ = pingChannel.Close() - }() - t1 := time.Now() - rw, _ := pingChannel.Detach() - defer func() { - _ = rw.Close() - }() - _, _ = rw.Write([]byte("hello")) - b := make([]byte, 64) - _, _ = rw.Read(b) - c.logger.Info(c.ctx, "your latency directly to the agent", slog.F("ms", time.Since(t1).Milliseconds())) - }) + // proxy via stdio if c.stdio { - // At this point the RTC is connected and data channel is opened... - rw, err := channel.Detach() - if err != nil { - return fmt.Errorf("detach channel: %w", err) - } go func() { - _, _ = io.Copy(rw, os.Stdin) + _, _ = io.Copy(nc, os.Stdin) }() - _, err = io.Copy(os.Stdout, rw) + _, err = io.Copy(os.Stdout, nc) if err != nil { - return fmt.Errorf("copy: %w", err) + return xerrors.Errorf("copy: %w", err) } return nil } + // proxy via tcp listener listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", c.localPort)) if err != nil { - return fmt.Errorf("listen: %w", err) + return xerrors.Errorf("listen: %w", err) } for { - conn, err := listener.Accept() + lc, err := listener.Accept() if err != nil { - return fmt.Errorf("accept: %w", err) + return xerrors.Errorf("accept: %w", err) } go func() { defer func() { - _ = conn.Close() - }() - channel, err := xwebrtc.NewProxyDataChannel(rtc, "forwarder", "tcp", c.remotePort) - if err != nil { - c.logger.Warn(context.Background(), "create data channel for proxying", slog.Error(err)) - return - } - defer func() { - _ = channel.Close() + _ = lc.Close() }() - err = xwebrtc.WaitForDataChannelOpen(context.Background(), channel) - if err != nil { - c.logger.Warn(context.Background(), "wait for data channel open", slog.Error(err)) - return - } - rw, err := channel.Detach() - if err != nil { - c.logger.Warn(context.Background(), "detach channel", slog.Error(err)) - return - } go func() { - _, _ = io.Copy(conn, rw) + _, _ = io.Copy(lc, nc) }() - _, _ = io.Copy(rw, conn) + _, _ = io.Copy(nc, lc) }() } } diff --git a/internal/x/xwebrtc/channel.go b/internal/x/xwebrtc/channel.go deleted file mode 100644 index 08442d9e..00000000 --- a/internal/x/xwebrtc/channel.go +++ /dev/null @@ -1,56 +0,0 @@ -package xwebrtc - -import ( - "context" - "errors" - "fmt" - "net" - "strconv" - "time" - - "github.com/pion/webrtc/v3" -) - -// WaitForDataChannelOpen waits for the data channel to have the open state. -// By default, it waits 15 seconds. -func WaitForDataChannelOpen(ctx context.Context, channel *webrtc.DataChannel) error { - if channel.ReadyState() == webrtc.DataChannelStateOpen { - return nil - } - ctx, cancelFunc := context.WithTimeout(ctx, time.Second*15) - defer cancelFunc() - channel.OnOpen(func() { - cancelFunc() - }) - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - return ctx.Err() - } - return nil -} - -// NewProxyDataChannel creates a new data channel for proxying. -func NewProxyDataChannel(conn *webrtc.PeerConnection, name, protocol string, port uint16) (*webrtc.DataChannel, error) { - proto := fmt.Sprintf("%s:%d", protocol, port) - ordered := true - return conn.CreateDataChannel(name, &webrtc.DataChannelInit{ - Protocol: &proto, - Ordered: &ordered, - }) -} - -// ParseProxyDataChannel parses a data channel to get the protocol and port. -func ParseProxyDataChannel(channel *webrtc.DataChannel) (string, uint16, error) { - if channel.Protocol() == "" { - return "", 0, errors.New("data channel is not a proxy") - } - host, port, err := net.SplitHostPort(channel.Protocol()) - if err != nil { - return "", 0, fmt.Errorf("split protocol: %w", err) - } - p, err := strconv.ParseInt(port, 10, 16) - if err != nil { - return "", 0, fmt.Errorf("parse port: %w", err) - } - return host, uint16(p), nil -} diff --git a/internal/x/xwebrtc/conn.go b/internal/x/xwebrtc/conn.go deleted file mode 100644 index 5237cb8f..00000000 --- a/internal/x/xwebrtc/conn.go +++ /dev/null @@ -1,20 +0,0 @@ -package xwebrtc - -import ( - "time" - - "github.com/pion/webrtc/v3" -) - -// NewPeerConnection creates a new peer connection. -// It uses the Google stun server by default. -func NewPeerConnection(servers []webrtc.ICEServer) (*webrtc.PeerConnection, error) { - se := webrtc.SettingEngine{} - se.DetachDataChannels() - se.SetICETimeouts(time.Second*5, time.Second*5, time.Second*2) - api := webrtc.NewAPI(webrtc.WithSettingEngine(se)) - - return api.NewPeerConnection(webrtc.Configuration{ - ICEServers: servers, - }) -} diff --git a/xwebrtc/channel.go b/xwebrtc/channel.go new file mode 100644 index 00000000..35938a1d --- /dev/null +++ b/xwebrtc/channel.go @@ -0,0 +1,76 @@ +package xwebrtc + +import ( + "context" + "fmt" + "strings" + "time" + + "golang.org/x/xerrors" + + "github.com/pion/webrtc/v3" +) + +// ParseProxyDataChannel parses a data channel to get the network and addr. +func ParseProxyDataChannel(channel *webrtc.DataChannel) (string, string, error) { + if channel.Protocol() == "" { + return "", "", xerrors.New("data channel is not a proxy") + } + segments := strings.SplitN(channel.Protocol(), ":", 2) + if len(segments) != 2 { + return "", "", xerrors.Errorf("protocol is malformed: %s", channel.Protocol()) + } + + return segments[0], segments[1], nil +} + +// NewPeerConnection creates a new peer connection. +// It uses the Google stun server by default. +func NewPeerConnection(servers []webrtc.ICEServer) (*webrtc.PeerConnection, error) { + se := webrtc.SettingEngine{} + se.DetachDataChannels() + se.SetICETimeouts(time.Second*5, time.Second*5, time.Second*2) + api := webrtc.NewAPI(webrtc.WithSettingEngine(se)) + + return api.NewPeerConnection(webrtc.Configuration{ + ICEServers: servers, + }) +} + +// waitForDataChannelOpen waits for the data channel to have the open state. +// By default, it waits 15 seconds. +func waitForDataChannelOpen(ctx context.Context, channel *webrtc.DataChannel) error { + if channel.ReadyState() == webrtc.DataChannelStateOpen { + return nil + } + ctx, cancelFunc := context.WithTimeout(ctx, time.Second*15) + defer cancelFunc() + channel.OnOpen(func() { + cancelFunc() + }) + <-ctx.Done() + if ctx.Err() == context.DeadlineExceeded { + return ctx.Err() + } + return nil +} + +// newProxyDataChannel creates a new data channel for proxying. +func newProxyDataChannel(conn *webrtc.PeerConnection, protocol string, addr string) (*webrtc.DataChannel, error) { + proto := fmt.Sprintf("%s:%s", protocol, addr) + ordered := true + return conn.CreateDataChannel(proto, &webrtc.DataChannelInit{ + Protocol: &proto, + Ordered: &ordered, + }) +} + +// newControlDataChannel creates a new data channel for starting a new peer connection. +func newControlDataChannel(conn *webrtc.PeerConnection) (*webrtc.DataChannel, error) { + proto := "control" + ordered := true + return conn.CreateDataChannel(proto, &webrtc.DataChannelInit{ + Protocol: &proto, + Ordered: &ordered, + }) +} diff --git a/xwebrtc/conn.go b/xwebrtc/conn.go new file mode 100644 index 00000000..0311da98 --- /dev/null +++ b/xwebrtc/conn.go @@ -0,0 +1,80 @@ +package xwebrtc + +import ( + "context" + "io" + "net" + "time" + + "github.com/pion/webrtc/v3" + "golang.org/x/xerrors" +) + +// Conn is a net.Conn based on a data channel. +type Conn struct { + channel *webrtc.DataChannel + rwc io.ReadWriteCloser +} + +// NewConn creates a new data channel on the peer connection and returns it as a net.Conn. +func NewConn(ctx context.Context, rtc *webrtc.PeerConnection, network string, addr string) (net.Conn, error) { + channel, err := newProxyDataChannel(rtc, network, addr) + if err != nil { + return nil, xerrors.Errorf("creating data channel: %w", err) + } + err = waitForDataChannelOpen(ctx, channel) + if err != nil { + return nil, xerrors.Errorf("waiting for open data channel: %w", err) + } + + rwc, err := channel.Detach() + if err != nil { + return nil, xerrors.Errorf("detaching data channel: %w", err) + } + + return &Conn{ + channel: channel, + rwc: rwc, + }, nil +} + +// Read reads data from the connection. +func (c *Conn) Read(b []byte) (n int, err error) { + return c.rwc.Read(b) +} + +// Write writes data to the connection. +func (c *Conn) Write(b []byte) (n int, err error) { + return c.rwc.Write(b) +} + +// Close closes the connection. +// Any blocked Read or Write operations will be unblocked and return errors. +func (c *Conn) Close() error { + return c.rwc.Close() +} + +// LocalAddr is not implemented. +func (c *Conn) LocalAddr() net.Addr { + return nil +} + +// RemoteAddr is not implemented. +func (c *Conn) RemoteAddr() net.Addr { + return nil +} + +// SetDeadline is not implemented. +func (c *Conn) SetDeadline(t time.Time) error { + return nil +} + +// SetReadDeadline is not implemented. +func (c *Conn) SetReadDeadline(t time.Time) error { + return nil +} + +// SetWriteDeadline is not implemented. +func (c *Conn) SetWriteDeadline(t time.Time) error { + return nil +} diff --git a/xwebrtc/dialer.go b/xwebrtc/dialer.go new file mode 100644 index 00000000..24bf9b98 --- /dev/null +++ b/xwebrtc/dialer.go @@ -0,0 +1,195 @@ +package xwebrtc + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/url" + + "golang.org/x/net/proxy" + + "cdr.dev/slog" + "github.com/pion/webrtc/v3" + "golang.org/x/xerrors" + "nhooyr.io/websocket" + + "cdr.dev/coder-cli/coder-sdk" + "cdr.dev/coder-cli/pkg/proto" +) + +const ( + // NetworkTCP is the protocol for tcp tunnels. + NetworkTCP = "tcp" +) + +// WorkspaceDialer dials workspace agents and represents peer connections as a http.Client. +type WorkspaceDialer struct { + log slog.Logger + brokerAddr *url.URL + token string + workspaceID string + peerConn *webrtc.PeerConnection +} + +// NewWorkspaceDialer creates a new workspace client to dial to agents. +func NewWorkspaceDialer(ctx context.Context, log slog.Logger, brokerAddr *url.URL, token string, workspaceID string) (proxy.ContextDialer, error) { + client := &WorkspaceDialer{ + log: log, + brokerAddr: brokerAddr, + token: token, + workspaceID: workspaceID, + } + + var err error + client.peerConn, err = client.peerConnection(ctx, workspaceID) + if err != nil { + return nil, xerrors.Errorf("getting peer connection: %w", err) + } + + return client, nil +} + +// DialContext will create a new peer connection with the workspace agent, make a new data channel, and return it as +// a net.Conn. +func (wc *WorkspaceDialer) DialContext(ctx context.Context, network string, workspaceAddr string) (net.Conn, error) { + wc.log.Debug(ctx, "making net conn", slog.F("addr", workspaceAddr)) + nc, err := NewConn(ctx, wc.peerConn, network, workspaceAddr) + if err != nil { + return nil, xerrors.Errorf("creating net conn: %w", err) + } + + return nc, nil +} + +// peerConnection connects to a workspace agent and gives a instantiated connection with the agent. +func (wc *WorkspaceDialer) peerConnection(ctx context.Context, workspaceID string) (*webrtc.PeerConnection, error) { + // Only enabled under a private feature flag for now, + // so insecure connections are entirely fine to allow. + var servers = []webrtc.ICEServer{{ + URLs: []string{turnAddr(wc.brokerAddr)}, + Username: "insecure", + Credential: "pass", + CredentialType: webrtc.ICECredentialTypePassword, + }} + + wc.log.Debug(ctx, "dialing broker", slog.F("url", connnectAddr(wc.brokerAddr, workspaceID, wc.token)), slog.F("servers", servers)) + conn, resp, err := websocket.Dial(ctx, connnectAddr(wc.brokerAddr, workspaceID, wc.token), nil) + if err != nil && resp == nil { + return nil, xerrors.Errorf("dial: %w", err) + } + if err != nil && resp != nil { + defer func() { + _ = resp.Body.Close() + }() + return nil, &coder.HTTPError{ + Response: resp, + } + } + nconn := websocket.NetConn(ctx, conn, websocket.MessageBinary) + defer func() { + _ = nconn.Close() + _ = conn.Close(websocket.StatusNormalClosure, "webrtc handshake complete") + }() + + rtc, err := NewPeerConnection(servers) + if err != nil { + return nil, xerrors.Errorf("create connection: %w", err) + } + + rtc.OnNegotiationNeeded(func() { + wc.log.Debug(ctx, "negotiation needed...") + }) + + rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { + wc.log.Info(ctx, "connection state changed", slog.F("state", pcs)) + }) + + flushCandidates := proto.ProxyICECandidates(rtc, nconn) + + // we make a channel so the handshake actually fires + // but we do nothing with it + control, err := newControlDataChannel(rtc) + if err != nil { + return nil, xerrors.Errorf("create connect data channel: %w", err) + } + go func() { + err = waitForDataChannelOpen(ctx, control) + if err != nil { + wc.log.Fatal(ctx, "waiting for data channel open", slog.Error(err)) + } + _ = control.Close() + _ = conn.Close(websocket.StatusNormalClosure, "rtc connected") + }() + + localDesc, err := rtc.CreateOffer(&webrtc.OfferOptions{}) + if err != nil { + return nil, xerrors.Errorf("create offer: %w", err) + } + + err = rtc.SetLocalDescription(localDesc) + if err != nil { + return nil, xerrors.Errorf("set local desc: %w", err) + } + + b, _ := json.Marshal(&proto.Message{ + Offer: &localDesc, + Servers: servers, + }) + + _, err = nconn.Write(b) + if err != nil { + return nil, xerrors.Errorf("write offer: %w", err) + } + flushCandidates() + + decoder := json.NewDecoder(nconn) + for { + var msg proto.Message + err = decoder.Decode(&msg) + if xerrors.Is(err, io.EOF) { + break + } + if websocket.CloseStatus(err) == websocket.StatusNormalClosure { + break + } + if err != nil { + return nil, xerrors.Errorf("read msg: %w", err) + } + if msg.Candidate != "" { + wc.log.Debug(ctx, "accepted ice candidate", slog.F("candidate", msg.Candidate)) + err = proto.AcceptICECandidate(rtc, &msg) + if err != nil { + return nil, xerrors.Errorf("accept ice: %w", err) + } + continue + } + if msg.Answer != nil { + wc.log.Debug(ctx, "got answer", slog.F("answer", msg.Answer)) + err = rtc.SetRemoteDescription(*msg.Answer) + if err != nil { + return nil, xerrors.Errorf("set remote: %w", err) + } + continue + } + if msg.Error != "" { + return nil, xerrors.Errorf("got error: %s", msg.Error) + } + wc.log.Error(ctx, "unknown message", slog.F("msg", msg)) + } + + return rtc, nil +} + +func turnAddr(u *url.URL) string { + turnScheme := "turns" + if u.Scheme == "http" { + turnScheme = "turn" + } + return fmt.Sprintf("%s:%s:5349?transport=tcp", turnScheme, u.Host) +} + +func connnectAddr(baseURL *url.URL, id string, token string) string { + return fmt.Sprintf("%s%s%s%s%s", baseURL.String(), "/api/private/envagent/", id, "/connect?session_token=", token) +} diff --git a/internal/x/xwebrtc/doc.go b/xwebrtc/doc.go similarity index 100% rename from internal/x/xwebrtc/doc.go rename to xwebrtc/doc.go