@@ -2,26 +2,21 @@ package cmd
2
2
3
3
import (
4
4
"context"
5
- "encoding/json"
6
5
"fmt"
7
6
"io"
8
7
"net"
9
8
"net/url"
10
9
"os"
11
10
"strconv"
12
- "time"
13
11
14
12
"cdr.dev/slog"
15
13
"cdr.dev/slog/sloggers/sloghuman"
16
- "github.com/pion/webrtc/v3"
17
14
"github.com/spf13/cobra"
18
15
"golang.org/x/xerrors"
19
- "nhooyr.io/websocket"
20
16
21
17
"cdr.dev/coder-cli/coder-sdk"
22
18
"cdr.dev/coder-cli/internal/x/xcobra"
23
- "cdr.dev/coder-cli/internal/x/xwebrtc"
24
- "cdr.dev/coder-cli/pkg/proto"
19
+ "cdr.dev/coder-cli/xwebrtc"
25
20
)
26
21
27
22
func tunnelCmd () * cobra.Command {
@@ -41,26 +36,26 @@ coder tunnel my-dev 3000 3000
41
36
42
37
remotePort , err := strconv .ParseUint (args [1 ], 10 , 16 )
43
38
if err != nil {
44
- log . Fatal ( ctx , "parse remote port" , slog . Error ( err ) )
39
+ return xerrors . Errorf ( "parse remote port: %w " , err )
45
40
}
46
41
47
42
var localPort uint64
48
43
if args [2 ] != "stdio" {
49
44
localPort , err = strconv .ParseUint (args [2 ], 10 , 16 )
50
45
if err != nil {
51
- log . Fatal ( ctx , "parse local port" , slog . Error ( err ) )
46
+ return xerrors . Errorf ( "parse local port: %w " , err )
52
47
}
53
48
}
54
49
55
50
sdk , err := newClient (ctx )
56
51
if err != nil {
57
- return err
52
+ return xerrors . Errorf ( "getting coder client: %w" , err )
58
53
}
59
54
baseURL := sdk .BaseURL ()
60
55
61
56
envs , err := getEnvs (ctx , sdk , coder .Me )
62
57
if err != nil {
63
- return err
58
+ return xerrors . Errorf ( "get workspaces: %w" , err )
64
59
}
65
60
66
61
var envID string
@@ -74,20 +69,19 @@ coder tunnel my-dev 3000 3000
74
69
return xerrors .Errorf ("No workspace found by name '%s'" , args [0 ])
75
70
}
76
71
77
- c := & client {
78
- id : envID ,
79
- stdio : args [2 ] == "stdio" ,
80
- localPort : uint16 (localPort ),
81
- remotePort : uint16 (remotePort ),
82
- ctx : context .Background (),
83
- logger : log .Leveled (slog .LevelDebug ),
84
- brokerAddr : baseURL ,
85
- token : sdk .Token (),
72
+ c := & tunnneler {
73
+ log : log .Leveled (slog .LevelDebug ),
74
+ brokerAddr : & baseURL ,
75
+ token : sdk .Token (),
76
+ workspaceID : envID ,
77
+ stdio : args [2 ] == "stdio" ,
78
+ localPort : uint16 (localPort ),
79
+ remotePort : uint16 (remotePort ),
86
80
}
87
81
88
- err = c .start ()
82
+ err = c .start (ctx )
89
83
if err != nil {
90
- log . Fatal ( ctx , err . Error () )
84
+ return xerrors . Errorf ( "running tunnel: %w" , err )
91
85
}
92
86
93
87
return nil
@@ -97,197 +91,58 @@ coder tunnel my-dev 3000 3000
97
91
return cmd
98
92
}
99
93
100
- type client struct {
101
- ctx context.Context
102
- brokerAddr url.URL
103
- token string
104
- logger slog.Logger
105
- id string
106
- remotePort uint16
107
- localPort uint16
108
- stdio bool
94
+ type tunnneler struct {
95
+ log slog.Logger
96
+ brokerAddr * url.URL
97
+ token string
98
+ workspaceID string
99
+ remotePort uint16
100
+ localPort uint16
101
+ stdio bool
109
102
}
110
103
111
- func (c * client ) start () error {
112
- url := fmt .Sprintf ("%s%s%s%s%s" , c .brokerAddr .String (), "/api/private/envagent/" , c .id , "/connect?session_token=" , c .token )
113
- turnScheme := "turns"
114
- if c .brokerAddr .Scheme == "http" {
115
- turnScheme = "turn"
116
- }
117
- tcpProxy := fmt .Sprintf ("%s:%s:5349?transport=tcp" , turnScheme , c .brokerAddr .Host )
118
- c .logger .Info (c .ctx , "connecting to broker" , slog .F ("url" , url ), slog .F ("tcp-proxy" , tcpProxy ))
119
- conn , resp , err := websocket .Dial (c .ctx , url , nil )
120
- if err != nil && resp == nil {
121
- return fmt .Errorf ("dial: %w" , err )
122
- }
123
- if err != nil && resp != nil {
124
- return & coder.HTTPError {
125
- Response : resp ,
126
- }
127
- }
128
- nconn := websocket .NetConn (context .Background (), conn , websocket .MessageBinary )
129
-
130
- // Only enabled under a private feature flag for now,
131
- // so insecure connections are entirely fine to allow.
132
- servers := []webrtc.ICEServer {{
133
- URLs : []string {tcpProxy },
134
- Username : "insecure" ,
135
- Credential : "pass" ,
136
- CredentialType : webrtc .ICECredentialTypePassword ,
137
- }}
138
- rtc , err := xwebrtc .NewPeerConnection (servers )
139
- if err != nil {
140
- return fmt .Errorf ("create connection: %w" , err )
141
- }
142
-
143
- rtc .OnNegotiationNeeded (func () {
144
- c .logger .Debug (context .Background (), "negotiation needed..." )
145
- })
146
-
147
- rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
148
- c .logger .Info (context .Background (), "connection state changed" , slog .F ("state" , pcs ))
149
- })
150
-
151
- channel , err := xwebrtc .NewProxyDataChannel (rtc , "forwarder" , "tcp" , c .remotePort )
152
- if err != nil {
153
- return fmt .Errorf ("create data channel: %w" , err )
154
- }
155
- flushCandidates := proto .ProxyICECandidates (rtc , nconn )
156
-
157
- localDesc , err := rtc .CreateOffer (& webrtc.OfferOptions {})
158
- if err != nil {
159
- return fmt .Errorf ("create offer: %w" , err )
160
- }
161
-
162
- err = rtc .SetLocalDescription (localDesc )
163
- if err != nil {
164
- return fmt .Errorf ("set local desc: %w" , err )
165
- }
166
-
167
- c .logger .Debug (context .Background (), "writing offer" )
168
- b , _ := json .Marshal (& proto.Message {
169
- Offer : & localDesc ,
170
- Servers : servers ,
171
- })
172
- _ , err = nconn .Write (b )
104
+ func (c * tunnneler ) start (ctx context.Context ) error {
105
+ wd , err := xwebrtc .NewWorkspaceDialer (ctx , c .log , c .brokerAddr , c .token , c .workspaceID )
173
106
if err != nil {
174
- return fmt .Errorf ("write offer: %w" , err )
175
- }
176
- flushCandidates ()
177
-
178
- go func () {
179
- err = xwebrtc .WaitForDataChannelOpen (context .Background (), channel )
180
- if err != nil {
181
- c .logger .Fatal (context .Background (), "waiting for data channel open" , slog .Error (err ))
182
- }
183
- _ = conn .Close (websocket .StatusNormalClosure , "rtc connected" )
184
- }()
185
-
186
- decoder := json .NewDecoder (nconn )
187
- for {
188
- var msg proto.Message
189
- err = decoder .Decode (& msg )
190
- if err == io .EOF {
191
- break
192
- }
193
- if websocket .CloseStatus (err ) == websocket .StatusNormalClosure {
194
- break
195
- }
196
- if err != nil {
197
- return fmt .Errorf ("read msg: %w" , err )
198
- }
199
- if msg .Candidate != "" {
200
- c .logger .Debug (context .Background (), "accepted ice candidate" , slog .F ("candidate" , msg .Candidate ))
201
- err = proto .AcceptICECandidate (rtc , & msg )
202
- if err != nil {
203
- return fmt .Errorf ("accept ice: %w" , err )
204
- }
205
- }
206
- if msg .Answer != nil {
207
- c .logger .Debug (context .Background (), "got answer" , slog .F ("answer" , msg .Answer ))
208
- err = rtc .SetRemoteDescription (* msg .Answer )
209
- if err != nil {
210
- return fmt .Errorf ("set remote: %w" , err )
211
- }
212
- }
107
+ return xerrors .Errorf ("creating workspace dialer: %w" , wd )
213
108
}
214
-
215
- // Once we're open... let's test out the ping.
216
- pingProto := "ping"
217
- pingChannel , err := rtc .CreateDataChannel ("pinger" , & webrtc.DataChannelInit {
218
- Protocol : & pingProto ,
219
- })
109
+ nc , err := wd .DialContext (ctx , xwebrtc .NetworkTCP , fmt .Sprintf ("localhost:%d" , c .remotePort ))
220
110
if err != nil {
221
- return fmt .Errorf ("create ping channel" )
111
+ return xerrors .Errorf ("dial: %w" , err )
222
112
}
223
- pingChannel .OnOpen (func () {
224
- defer func () {
225
- _ = pingChannel .Close ()
226
- }()
227
- t1 := time .Now ()
228
- rw , _ := pingChannel .Detach ()
229
- defer func () {
230
- _ = rw .Close ()
231
- }()
232
- _ , _ = rw .Write ([]byte ("hello" ))
233
- b := make ([]byte , 64 )
234
- _ , _ = rw .Read (b )
235
- c .logger .Info (c .ctx , "your latency directly to the agent" , slog .F ("ms" , time .Since (t1 ).Milliseconds ()))
236
- })
237
113
114
+ // proxy via stdio
238
115
if c .stdio {
239
- // At this point the RTC is connected and data channel is opened...
240
- rw , err := channel .Detach ()
241
- if err != nil {
242
- return fmt .Errorf ("detach channel: %w" , err )
243
- }
244
116
go func () {
245
- _ , _ = io .Copy (rw , os .Stdin )
117
+ _ , _ = io .Copy (nc , os .Stdin )
246
118
}()
247
- _ , err = io .Copy (os .Stdout , rw )
119
+ _ , err = io .Copy (os .Stdout , nc )
248
120
if err != nil {
249
- return fmt .Errorf ("copy: %w" , err )
121
+ return xerrors .Errorf ("copy: %w" , err )
250
122
}
251
123
return nil
252
124
}
253
125
126
+ // proxy via tcp listener
254
127
listener , err := net .Listen ("tcp" , fmt .Sprintf ("localhost:%d" , c .localPort ))
255
128
if err != nil {
256
- return fmt .Errorf ("listen: %w" , err )
129
+ return xerrors .Errorf ("listen: %w" , err )
257
130
}
258
131
259
132
for {
260
- conn , err := listener .Accept ()
133
+ lc , err := listener .Accept ()
261
134
if err != nil {
262
- return fmt .Errorf ("accept: %w" , err )
135
+ return xerrors .Errorf ("accept: %w" , err )
263
136
}
264
137
go func () {
265
138
defer func () {
266
- _ = conn .Close ()
267
- }()
268
- channel , err := xwebrtc .NewProxyDataChannel (rtc , "forwarder" , "tcp" , c .remotePort )
269
- if err != nil {
270
- c .logger .Warn (context .Background (), "create data channel for proxying" , slog .Error (err ))
271
- return
272
- }
273
- defer func () {
274
- _ = channel .Close ()
139
+ _ = lc .Close ()
275
140
}()
276
- err = xwebrtc .WaitForDataChannelOpen (context .Background (), channel )
277
- if err != nil {
278
- c .logger .Warn (context .Background (), "wait for data channel open" , slog .Error (err ))
279
- return
280
- }
281
- rw , err := channel .Detach ()
282
- if err != nil {
283
- c .logger .Warn (context .Background (), "detach channel" , slog .Error (err ))
284
- return
285
- }
286
141
287
142
go func () {
288
- _ , _ = io .Copy (conn , rw )
143
+ _ , _ = io .Copy (lc , nc )
289
144
}()
290
- _ , _ = io .Copy (rw , conn )
145
+ _ , _ = io .Copy (nc , lc )
291
146
}()
292
147
}
293
148
}
0 commit comments