Skip to content

Commit 54133fc

Browse files
committed
Initial agent
1 parent a86f2ee commit 54133fc

File tree

7 files changed

+238
-5
lines changed

7 files changed

+238
-5
lines changed

.vscode/settings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"drpcserver",
3434
"fatih",
3535
"goleak",
36+
"gossh",
3637
"hashicorp",
3738
"httpmw",
3839
"isatty",
@@ -54,6 +55,7 @@
5455
"retrier",
5556
"sdkproto",
5657
"stretchr",
58+
"tcpip",
5759
"tfexec",
5860
"tfstate",
5961
"unconvert",

agent/agent.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"errors"
6+
"io"
7+
"sync"
8+
"time"
9+
10+
"cdr.dev/slog"
11+
"github.com/coder/coder/peer"
12+
"github.com/coder/coder/peerbroker"
13+
"github.com/coder/retry"
14+
15+
"github.com/gliderlabs/ssh"
16+
gossh "golang.org/x/crypto/ssh"
17+
)
18+
19+
type Options struct {
20+
Logger slog.Logger
21+
}
22+
23+
type Dialer func(ctx context.Context) (*peerbroker.Listener, error)
24+
25+
func Server(dialer Dialer, options *Options) io.Closer {
26+
ctx, cancelFunc := context.WithCancel(context.Background())
27+
s := &server{
28+
clientDialer: dialer,
29+
options: options,
30+
closeCancel: cancelFunc,
31+
}
32+
s.init(ctx)
33+
return s
34+
}
35+
36+
type server struct {
37+
clientDialer Dialer
38+
options *Options
39+
40+
closeCancel context.CancelFunc
41+
closeMutex sync.Mutex
42+
closed chan struct{}
43+
closeError error
44+
45+
sshServer *ssh.Server
46+
}
47+
48+
func (s *server) init(ctx context.Context) {
49+
forwardHandler := &ssh.ForwardedTCPHandler{}
50+
s.sshServer = &ssh.Server{
51+
LocalPortForwardingCallback: func(ctx ssh.Context, destinationHost string, destinationPort uint32) bool {
52+
return false
53+
},
54+
ReversePortForwardingCallback: func(ctx ssh.Context, bindHost string, bindPort uint32) bool {
55+
return false
56+
},
57+
PtyCallback: func(ctx ssh.Context, pty ssh.Pty) bool {
58+
return false
59+
},
60+
ServerConfigCallback: func(ctx ssh.Context) *gossh.ServerConfig {
61+
return &gossh.ServerConfig{
62+
Config: gossh.Config{
63+
// "arcfour" is the fastest SSH cipher. We prioritize throughput
64+
// over encryption here, because the WebRTC connection is already
65+
// encrypted. If possible, we'd disable encryption entirely here.
66+
Ciphers: []string{"arcfour"},
67+
},
68+
NoClientAuth: true,
69+
}
70+
},
71+
RequestHandlers: map[string]ssh.RequestHandler{
72+
"tcpip-forward": forwardHandler.HandleSSHRequest,
73+
"cancel-tcpip-forward": forwardHandler.HandleSSHRequest,
74+
},
75+
}
76+
77+
go s.run(ctx)
78+
}
79+
80+
func (s *server) run(ctx context.Context) {
81+
var peerListener *peerbroker.Listener
82+
var err error
83+
// An exponential back-off occurs when the connection is failing to dial.
84+
// This is to prevent server spam in case of a coderd outage.
85+
for retrier := retry.New(50*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
86+
peerListener, err = s.clientDialer(ctx)
87+
if err != nil {
88+
if errors.Is(err, context.Canceled) {
89+
return
90+
}
91+
if s.isClosed() {
92+
return
93+
}
94+
s.options.Logger.Warn(context.Background(), "failed to dial", slog.Error(err))
95+
continue
96+
}
97+
s.options.Logger.Debug(context.Background(), "connected")
98+
break
99+
}
100+
101+
for {
102+
conn, err := peerListener.Accept()
103+
if err != nil {
104+
// This is closed!
105+
return
106+
}
107+
go s.handle(ctx, conn)
108+
}
109+
}
110+
111+
func (s *server) handle(ctx context.Context, conn *peer.Conn) {
112+
for {
113+
channel, err := conn.Accept(ctx)
114+
if err != nil {
115+
// TODO: Log here!
116+
return
117+
}
118+
119+
switch channel.Protocol() {
120+
case "ssh":
121+
s.sshServer.HandleConn(channel.NetConn())
122+
case "proxy":
123+
// Proxy the port provided.
124+
}
125+
}
126+
}
127+
128+
// isClosed returns whether the API is closed or not.
129+
func (s *server) isClosed() bool {
130+
select {
131+
case <-s.closed:
132+
return true
133+
default:
134+
return false
135+
}
136+
}
137+
138+
func (s *server) Close() error {
139+
return nil
140+
}

agent/agent_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package agent_test
2+
3+
import (
4+
"context"
5+
"net"
6+
"os"
7+
"testing"
8+
9+
"cdr.dev/slog/sloggers/slogtest"
10+
"github.com/coder/coder/agent"
11+
"github.com/coder/coder/peer"
12+
"github.com/coder/coder/peerbroker"
13+
"github.com/coder/coder/peerbroker/proto"
14+
"github.com/coder/coder/provisionersdk"
15+
"github.com/pion/webrtc/v3"
16+
"github.com/stretchr/testify/require"
17+
"golang.org/x/crypto/ssh"
18+
)
19+
20+
func TestAgent(t *testing.T) {
21+
t.Run("asd", func(t *testing.T) {
22+
ctx := context.Background()
23+
client, server := provisionersdk.TransportPipe()
24+
defer client.Close()
25+
defer server.Close()
26+
closer := agent.Server(func(ctx context.Context) (*peerbroker.Listener, error) {
27+
return peerbroker.Listen(server, &peer.ConnOptions{
28+
Logger: slogtest.Make(t, nil),
29+
})
30+
}, &agent.Options{
31+
Logger: slogtest.Make(t, nil),
32+
})
33+
defer closer.Close()
34+
api := proto.NewDRPCPeerBrokerClient(provisionersdk.Conn(client))
35+
stream, err := api.NegotiateConnection(ctx)
36+
require.NoError(t, err)
37+
conn, err := peerbroker.Dial(stream, []webrtc.ICEServer{}, &peer.ConnOptions{
38+
Logger: slogtest.Make(t, nil),
39+
})
40+
require.NoError(t, err)
41+
defer conn.Close()
42+
channel, err := conn.Dial(ctx, "example", &peer.ChannelOptions{
43+
Protocol: "ssh",
44+
})
45+
require.NoError(t, err)
46+
sshConn, channels, requests, err := ssh.NewClientConn(channel.NetConn(), "localhost:22", &ssh.ClientConfig{
47+
User: "kyle",
48+
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
49+
return nil
50+
},
51+
})
52+
require.NoError(t, err)
53+
sshClient := ssh.NewClient(sshConn, channels, requests)
54+
session, err := sshClient.NewSession()
55+
require.NoError(t, err)
56+
session.Stdout = os.Stdout
57+
session.Stderr = os.Stderr
58+
err = session.Run("echo test")
59+
require.NoError(t, err)
60+
})
61+
}

go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ replace github.com/chzyer/readline => github.com/kylecarbs/readline v0.0.0-20220
1616

1717
require (
1818
cdr.dev/slog v1.4.1
19-
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2
2019
github.com/briandowns/spinner v1.18.1
2120
github.com/coder/retry v1.3.0
21+
github.com/creack/pty v1.1.17
2222
github.com/fatih/color v1.13.0
23+
github.com/gliderlabs/ssh v0.3.3
2324
github.com/go-chi/chi/v5 v5.0.7
2425
github.com/go-chi/render v1.0.1
2526
github.com/go-playground/validator/v10 v10.10.0
@@ -50,6 +51,7 @@ require (
5051
go.uber.org/goleak v1.1.12
5152
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838
5253
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
54+
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9
5355
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
5456
google.golang.org/protobuf v1.27.1
5557
nhooyr.io/websocket v1.8.7
@@ -63,11 +65,11 @@ require (
6365
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
6466
github.com/agext/levenshtein v1.2.3 // indirect
6567
github.com/alecthomas/chroma v0.10.0 // indirect
68+
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
6669
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
6770
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
6871
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect
6972
github.com/containerd/continuity v0.2.2 // indirect
70-
github.com/creack/pty v1.1.17 // indirect
7173
github.com/davecgh/go-spew v1.1.1 // indirect
7274
github.com/dhui/dktest v0.3.9 // indirect
7375
github.com/dlclark/regexp2 v1.4.0 // indirect
@@ -124,7 +126,6 @@ require (
124126
github.com/zeebo/errs v1.2.2 // indirect
125127
go.opencensus.io v0.23.0 // indirect
126128
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
127-
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
128129
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
129130
golang.org/x/text v0.3.7 // indirect
130131
google.golang.org/appengine v1.6.7 // indirect

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@ github.com/Microsoft/hcsshim v0.8.23/go.mod h1:4zegtUJth7lAvFyc6cH2gGQ5B3OFQim01
103103
github.com/Microsoft/hcsshim/test v0.0.0-20201218223536-d3e5debf77da/go.mod h1:5hlzMzRKMLyo42nCZ9oml8AdTlq/0cvIaBv6tK1RehU=
104104
github.com/Microsoft/hcsshim/test v0.0.0-20210227013316-43a75bb4edd3/go.mod h1:mw7qgWloBUl75W/gVH3cQszUg1+gUITj7D6NY7ywVnY=
105105
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
106-
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63nhn5WAunQHLTznkw5W8b1Xc0dNjp83s=
107-
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w=
108106
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
109107
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
110108
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@@ -134,6 +132,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
134132
github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0=
135133
github.com/andybalholm/crlf v0.0.0-20171020200849-670099aa064f/go.mod h1:k8feO4+kXDxro6ErPXBRTJ/ro2mf0SsFG8s7doP9kJE=
136134
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
135+
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
136+
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
137137
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
138138
github.com/apache/arrow/go/arrow v0.0.0-20210818145353-234c94e4ce64/go.mod h1:2qMFB56yOP3KzkB3PbYZ4AlUFg3a88F67TIx5lB/WwY=
139139
github.com/apache/arrow/go/arrow v0.0.0-20211013220434-5962184e7a30/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs=
@@ -443,6 +443,8 @@ github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm
443443
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
444444
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
445445
github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
446+
github.com/gliderlabs/ssh v0.3.3 h1:mBQ8NiOgDkINJrZtoizkC3nDNYgSaWtxyem6S2XHBtA=
447+
github.com/gliderlabs/ssh v0.3.3/go.mod h1:ZSS+CUoKHDrqVakTfTWUlKSr9MtMFkC4UvtQKD7O914=
446448
github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
447449
github.com/go-chi/chi/v5 v5.0.7/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
448450
github.com/go-chi/render v1.0.1 h1:4/5tis2cKaNdnv9zFLfXzcquC9HbeZgCnxGnKrltBS8=

peer/channel.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package peer
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"net"
8+
"runtime/debug"
79
"sync"
810
"time"
911

@@ -186,6 +188,7 @@ func (c *Channel) Read(bytes []byte) (int, error) {
186188
if c.isClosed() {
187189
return 0, c.closeError
188190
}
191+
debug.PrintStack()
189192
// An EOF always occurs when the connection is closed.
190193
// Alternative close errors will occur first if an unexpected
191194
// close has occurred.
@@ -233,6 +236,8 @@ func (c *Channel) Write(bytes []byte) (n int, err error) {
233236
// See: https://github.com/pion/sctp/issues/181
234237
time.Sleep(time.Microsecond)
235238

239+
fmt.Printf("Writing %d\n", len(bytes))
240+
236241
return c.rwc.Write(bytes)
237242
}
238243

@@ -246,6 +251,11 @@ func (c *Channel) Label() string {
246251
return c.dc.Label()
247252
}
248253

254+
// Protocol returns the protocol of the underlying DataChannel.
255+
func (c *Channel) Protocol() string {
256+
return c.dc.Protocol()
257+
}
258+
249259
// NetConn wraps the DataChannel in a struct fulfilling net.Conn.
250260
// Read, Write, and Close operations can still be used on the *Channel struct.
251261
func (c *Channel) NetConn() net.Conn {

peer/conn_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,23 @@ func TestConn(t *testing.T) {
267267
_, err := client.Ping()
268268
require.NoError(t, err)
269269
})
270+
271+
t.Run("ShortBuffer", func(t *testing.T) {
272+
t.Parallel()
273+
client, server, _ := createPair(t)
274+
exchange(client, server)
275+
go func() {
276+
channel, err := client.Dial(context.Background(), "test", nil)
277+
require.NoError(t, err)
278+
_, err = channel.Write([]byte{'1', '2'})
279+
require.NoError(t, err)
280+
}()
281+
282+
channel, err := server.Accept(context.Background())
283+
require.NoError(t, err)
284+
_, err = channel.Read(make([]byte, 1))
285+
require.NoError(t, err)
286+
})
270287
}
271288

272289
func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.Router) {

0 commit comments

Comments
 (0)