Skip to content

Commit 96f25c3

Browse files
committed
feat(cli/exp): add app testing to scaletest workspace-traffic
1 parent 5bfbf9f commit 96f25c3

File tree

5 files changed

+269
-22
lines changed

5 files changed

+269
-22
lines changed

cli/exp_scaletest.go

+39-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/prometheus/client_golang/prometheus"
2222
"github.com/prometheus/client_golang/prometheus/promhttp"
2323
"go.opentelemetry.io/otel/trace"
24+
"golang.org/x/exp/slices"
2425
"golang.org/x/xerrors"
2526

2627
"cdr.dev/slog"
@@ -859,6 +860,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
859860
tickInterval time.Duration
860861
bytesPerTick int64
861862
ssh bool
863+
app string
862864
template string
863865

864866
client = &codersdk.Client{}
@@ -911,6 +913,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
911913
}
912914
}
913915

916+
appHost, err := client.AppHost(ctx)
917+
if err != nil {
918+
return xerrors.Errorf("get app host: %w", err)
919+
}
920+
914921
workspaces, err := getScaletestWorkspaces(inv.Context(), client, template)
915922
if err != nil {
916923
return err
@@ -949,6 +956,8 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
949956
agentName string
950957
name = "workspace-traffic"
951958
id = strconv.Itoa(idx)
959+
apps []codersdk.WorkspaceApp
960+
appConfig workspacetraffic.AppConfig
952961
)
953962

954963
for _, res := range ws.LatestBuild.Resources {
@@ -957,13 +966,34 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
957966
}
958967
agentID = res.Agents[0].ID
959968
agentName = res.Agents[0].Name
969+
apps = res.Agents[0].Apps
960970
}
961971

962972
if agentID == uuid.Nil {
963973
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name)
964974
continue
965975
}
966976

977+
if app != "" {
978+
i := slices.IndexFunc(apps, func(a codersdk.WorkspaceApp) bool { return a.Slug == app })
979+
if i == -1 {
980+
return xerrors.Errorf("app %q not found in workspace %q", app, ws.Name)
981+
}
982+
983+
appConfig = workspacetraffic.AppConfig{
984+
Name: apps[i].Slug,
985+
}
986+
if apps[i].Subdomain {
987+
if appHost.Host == "" {
988+
return xerrors.Errorf("app %q is a subdomain app but no app host is configured", app)
989+
}
990+
991+
appConfig.URL = fmt.Sprintf("%s://%s", client.URL.Scheme, strings.Replace(appHost.Host, "*", apps[i].SubdomainName, 1))
992+
} else {
993+
appConfig.URL = fmt.Sprintf("%s/@%s/%s.%s/apps/%s", client.URL.String(), ws.OwnerName, ws.Name, agentName, apps[i].Slug)
994+
}
995+
}
996+
967997
// Setup our workspace agent connection.
968998
config := workspacetraffic.Config{
969999
AgentID: agentID,
@@ -974,6 +1004,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
9741004
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agentName),
9751005
SSH: ssh,
9761006
Echo: ssh,
1007+
App: appConfig,
9771008
}
9781009

9791010
if err := config.Validate(); err != nil {
@@ -1046,9 +1077,16 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
10461077
Flag: "ssh",
10471078
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH",
10481079
Default: "",
1049-
Description: "Send traffic over SSH.",
1080+
Description: "Send traffic over SSH, cannot be used with --app.",
10501081
Value: clibase.BoolOf(&ssh),
10511082
},
1083+
{
1084+
Flag: "app",
1085+
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_APP",
1086+
Default: "",
1087+
Description: "Send WebSocket traffic to a workspace app (proxied via coderd), cannot be used with --ssh.",
1088+
Value: clibase.StringOf(&app),
1089+
},
10521090
}
10531091

10541092
tracingFlags.attach(&cmd.Options)

scaletest/workspacetraffic/config.go

+11
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type Config struct {
3131
// to true will double the amount of data read from the agent for
3232
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
3333
Echo bool `json:"echo"`
34+
35+
App AppConfig `json:"app"`
3436
}
3537

3638
func (c Config) Validate() error {
@@ -50,5 +52,14 @@ func (c Config) Validate() error {
5052
return xerrors.Errorf("validate tick_interval: must be greater than zero")
5153
}
5254

55+
if c.SSH && c.App.Name != "" {
56+
return xerrors.Errorf("validate ssh: must be false when app is used")
57+
}
58+
5359
return nil
5460
}
61+
62+
type AppConfig struct {
63+
Name string `json:"name"`
64+
URL string `json:"url"`
65+
}

scaletest/workspacetraffic/conn.go

+96-3
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import (
55
"encoding/json"
66
"errors"
77
"io"
8+
"net"
9+
"net/http"
810
"sync"
911
"time"
1012

13+
"nhooyr.io/websocket"
14+
1115
"github.com/coder/coder/v2/codersdk"
1216

1317
"github.com/google/uuid"
@@ -27,9 +31,9 @@ const (
2731
//
2832
// failed to write frame: WebSocket closed: received close frame: status = StatusMessageTooBig and reason = "read limited at 32769 bytes"
2933
//
30-
// Since we can't control fragmentation/buffer sizes, we keep it simple and
31-
// match the conservative payload size used by agent/reconnectingpty (1024).
32-
rptyJSONMaxDataSize = 1024
34+
// Since we can't control fragmentation/buffer sizes, we use a conservative
35+
// value. Derived from 1024 * 9 * 3 = <28KB.
36+
rptyJSONMaxDataSize = 1024 * 9
3337
)
3438

3539
func connectRPTY(ctx context.Context, client *codersdk.Client, agentID, reconnect uuid.UUID, cmd string) (*countReadWriteCloser, error) {
@@ -260,3 +264,92 @@ func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
260264
func (w *wrappedSSHConn) Write(p []byte) (n int, err error) {
261265
return w.stdin.Write(p)
262266
}
267+
268+
func appClientConn(ctx context.Context, client *codersdk.Client, url string) (*countReadWriteCloser, error) {
269+
headers := http.Header{}
270+
tokenHeader := codersdk.SessionTokenHeader
271+
if client.SessionTokenHeader != "" {
272+
tokenHeader = client.SessionTokenHeader
273+
}
274+
headers.Set(tokenHeader, client.SessionToken())
275+
276+
//nolint:bodyclose // The websocket conn manages the body.
277+
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
278+
HTTPClient: client.HTTPClient,
279+
HTTPHeader: headers,
280+
})
281+
if err != nil {
282+
return nil, xerrors.Errorf("websocket dial: %w", err)
283+
}
284+
285+
netConn := websocketNetConn(conn, websocket.MessageBinary)
286+
287+
// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
288+
crw := &countReadWriteCloser{rwc: netConn}
289+
return crw, nil
290+
}
291+
292+
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
293+
// is called if a read or write error is encountered.
294+
type wsNetConn struct {
295+
mu sync.Mutex
296+
closed bool
297+
closeErr error
298+
net.Conn
299+
}
300+
301+
func (c *wsNetConn) Read(b []byte) (n int, err error) {
302+
c.mu.Lock()
303+
if c.closed {
304+
c.mu.Unlock()
305+
return 0, io.EOF
306+
}
307+
c.mu.Unlock()
308+
309+
n, err = c.Conn.Read(b)
310+
if err != nil {
311+
_ = c.Close()
312+
}
313+
return n, err
314+
}
315+
316+
func (c *wsNetConn) Write(b []byte) (n int, err error) {
317+
c.mu.Lock()
318+
if c.closed {
319+
c.mu.Unlock()
320+
return 0, io.EOF
321+
}
322+
c.mu.Unlock()
323+
324+
for len(b) > 0 {
325+
bb := b
326+
if len(bb) > rptyJSONMaxDataSize {
327+
bb = b[:rptyJSONMaxDataSize]
328+
}
329+
b = b[len(bb):]
330+
nn, err := c.Conn.Write(bb)
331+
n += nn
332+
if err != nil {
333+
_ = c.Close()
334+
return n, err
335+
}
336+
}
337+
return n, nil
338+
}
339+
340+
func (c *wsNetConn) Close() error {
341+
c.mu.Lock()
342+
if c.closed {
343+
c.mu.Unlock()
344+
return c.closeErr
345+
}
346+
c.closed = true
347+
c.closeErr = c.Conn.Close()
348+
c.mu.Unlock()
349+
return c.closeErr
350+
}
351+
352+
func websocketNetConn(conn *websocket.Conn, msgType websocket.MessageType) net.Conn {
353+
nc := websocket.NetConn(context.Background(), conn, msgType)
354+
return &wsNetConn{Conn: nc}
355+
}

scaletest/workspacetraffic/run.go

+22-13
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
9191
command := fmt.Sprintf("dd if=/dev/stdin of=%s bs=%d status=none", output, bytesPerTick)
9292

9393
var conn *countReadWriteCloser
94-
if r.cfg.SSH {
94+
switch {
95+
case r.cfg.App.Name != "":
96+
logger.Info(ctx, "sending traffic to workspace app", slog.F("app", r.cfg.App.Name))
97+
conn, err = appClientConn(ctx, r.client, r.cfg.App.URL)
98+
if err != nil {
99+
logger.Error(ctx, "connect to workspace app", slog.Error(err))
100+
return xerrors.Errorf("connect to workspace app: %w", err)
101+
}
102+
103+
case r.cfg.SSH:
95104
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "ssh"))
96105
// If echo is enabled, disable PTY to avoid double echo and
97106
// reduce CPU usage.
@@ -101,7 +110,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
101110
logger.Error(ctx, "connect to workspace agent via ssh", slog.Error(err))
102111
return xerrors.Errorf("connect to workspace via ssh: %w", err)
103112
}
104-
} else {
113+
114+
default:
105115
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "reconnectingpty"))
106116
conn, err = connectRPTY(ctx, r.client, agentID, reconnect, command)
107117
if err != nil {
@@ -114,8 +124,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
114124
closeConn := func() error {
115125
closeOnce.Do(func() {
116126
closeErr = conn.Close()
117-
if err != nil {
118-
logger.Error(ctx, "close agent connection", slog.Error(err))
127+
if closeErr != nil {
128+
logger.Error(ctx, "close agent connection", slog.Error(closeErr))
119129
}
120130
})
121131
return closeErr
@@ -142,7 +152,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
142152

143153
// Read until connection is closed.
144154
go func() {
145-
rch := rch // Shadowed for reassignment.
146155
logger.Debug(ctx, "reading from agent")
147156
rch <- drain(conn)
148157
logger.Debug(ctx, "done reading from agent")
@@ -151,7 +160,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
151160

152161
// Write random data to the conn every tick.
153162
go func() {
154-
wch := wch // Shadowed for reassignment.
155163
logger.Debug(ctx, "writing to agent")
156164
wch <- writeRandomData(conn, bytesPerTick, tick.C)
157165
logger.Debug(ctx, "done writing to agent")
@@ -160,16 +168,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
160168

161169
var waitCloseTimeoutCh <-chan struct{}
162170
deadlineCtxCh := deadlineCtx.Done()
171+
wchRef, rchRef := wch, rch
163172
for {
164-
if wch == nil && rch == nil {
173+
if wchRef == nil && rchRef == nil {
165174
return nil
166175
}
167176

168177
select {
169178
case <-waitCloseTimeoutCh:
170179
logger.Warn(ctx, "timed out waiting for read/write to complete",
171-
slog.F("write_done", wch == nil),
172-
slog.F("read_done", rch == nil),
180+
slog.F("write_done", wchRef == nil),
181+
slog.F("read_done", rchRef == nil),
173182
)
174183
return xerrors.Errorf("timed out waiting for read/write to complete: %w", ctx.Err())
175184
case <-deadlineCtxCh:
@@ -181,16 +190,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
181190
waitCtx, cancel := context.WithTimeout(context.Background(), waitCloseTimeout)
182191
defer cancel() //nolint:revive // Only called once.
183192
waitCloseTimeoutCh = waitCtx.Done()
184-
case err = <-wch:
193+
case err = <-wchRef:
185194
if err != nil {
186195
return xerrors.Errorf("write to agent: %w", err)
187196
}
188-
wch = nil
189-
case err = <-rch:
197+
wchRef = nil
198+
case err = <-rchRef:
190199
if err != nil {
191200
return xerrors.Errorf("read from agent: %w", err)
192201
}
193-
rch = nil
202+
rchRef = nil
194203
}
195204
}
196205
}

0 commit comments

Comments
 (0)