Skip to content

Commit 73e6bbf

Browse files
authored
feat(cli/exp): add app testing to scaletest workspace-traffic (#11633)
1 parent 1f63a11 commit 73e6bbf

File tree

5 files changed

+313
-34
lines changed

5 files changed

+313
-34
lines changed

cli/exp_scaletest.go

+55-11
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/prometheus/client_golang/prometheus"
2222
"github.com/prometheus/client_golang/prometheus/promhttp"
2323
"go.opentelemetry.io/otel/trace"
24+
"golang.org/x/exp/slices"
2425
"golang.org/x/xerrors"
2526

2627
"cdr.dev/slog"
@@ -859,6 +860,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
859860
tickInterval time.Duration
860861
bytesPerTick int64
861862
ssh bool
863+
app string
862864
template string
863865

864866
client = &codersdk.Client{}
@@ -911,6 +913,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
911913
}
912914
}
913915

916+
appHost, err := client.AppHost(ctx)
917+
if err != nil {
918+
return xerrors.Errorf("get app host: %w", err)
919+
}
920+
914921
workspaces, err := getScaletestWorkspaces(inv.Context(), client, template)
915922
if err != nil {
916923
return err
@@ -945,35 +952,39 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
945952
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
946953
for idx, ws := range workspaces {
947954
var (
948-
agentID uuid.UUID
949-
agentName string
950-
name = "workspace-traffic"
951-
id = strconv.Itoa(idx)
955+
agent codersdk.WorkspaceAgent
956+
name = "workspace-traffic"
957+
id = strconv.Itoa(idx)
952958
)
953959

954960
for _, res := range ws.LatestBuild.Resources {
955961
if len(res.Agents) == 0 {
956962
continue
957963
}
958-
agentID = res.Agents[0].ID
959-
agentName = res.Agents[0].Name
964+
agent = res.Agents[0]
960965
}
961966

962-
if agentID == uuid.Nil {
967+
if agent.ID == uuid.Nil {
963968
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name)
964969
continue
965970
}
966971

972+
appConfig, err := createWorkspaceAppConfig(client, appHost.Host, app, ws, agent)
973+
if err != nil {
974+
return xerrors.Errorf("configure workspace app: %w", err)
975+
}
976+
967977
// Setup our workspace agent connection.
968978
config := workspacetraffic.Config{
969-
AgentID: agentID,
979+
AgentID: agent.ID,
970980
BytesPerTick: bytesPerTick,
971981
Duration: strategy.timeout,
972982
TickInterval: tickInterval,
973-
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name, agentName),
974-
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agentName),
983+
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name, agent.Name),
984+
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agent.Name),
975985
SSH: ssh,
976986
Echo: ssh,
987+
App: appConfig,
977988
}
978989

979990
if err := config.Validate(); err != nil {
@@ -1046,9 +1057,16 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
10461057
Flag: "ssh",
10471058
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH",
10481059
Default: "",
1049-
Description: "Send traffic over SSH.",
1060+
Description: "Send traffic over SSH, cannot be used with --app.",
10501061
Value: clibase.BoolOf(&ssh),
10511062
},
1063+
{
1064+
Flag: "app",
1065+
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_APP",
1066+
Default: "",
1067+
Description: "Send WebSocket traffic to a workspace app (proxied via coderd), cannot be used with --ssh.",
1068+
Value: clibase.StringOf(&app),
1069+
},
10521070
}
10531071

10541072
tracingFlags.attach(&cmd.Options)
@@ -1411,3 +1429,29 @@ func parseTemplate(ctx context.Context, client *codersdk.Client, organizationIDs
14111429

14121430
return tpl, nil
14131431
}
1432+
1433+
func createWorkspaceAppConfig(client *codersdk.Client, appHost, app string, workspace codersdk.Workspace, agent codersdk.WorkspaceAgent) (workspacetraffic.AppConfig, error) {
1434+
if app == "" {
1435+
return workspacetraffic.AppConfig{}, nil
1436+
}
1437+
1438+
i := slices.IndexFunc(agent.Apps, func(a codersdk.WorkspaceApp) bool { return a.Slug == app })
1439+
if i == -1 {
1440+
return workspacetraffic.AppConfig{}, xerrors.Errorf("app %q not found in workspace %q", app, workspace.Name)
1441+
}
1442+
1443+
c := workspacetraffic.AppConfig{
1444+
Name: agent.Apps[i].Slug,
1445+
}
1446+
if agent.Apps[i].Subdomain {
1447+
if appHost == "" {
1448+
return workspacetraffic.AppConfig{}, xerrors.Errorf("app %q is a subdomain app but no app host is configured", app)
1449+
}
1450+
1451+
c.URL = fmt.Sprintf("%s://%s", client.URL.Scheme, strings.Replace(appHost, "*", agent.Apps[i].SubdomainName, 1))
1452+
} else {
1453+
c.URL = fmt.Sprintf("%s/@%s/%s.%s/apps/%s", client.URL.String(), workspace.OwnerName, workspace.Name, agent.Name, agent.Apps[i].Slug)
1454+
}
1455+
1456+
return c, nil
1457+
}

scaletest/workspacetraffic/config.go

+11
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type Config struct {
3131
// to true will double the amount of data read from the agent for
3232
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
3333
Echo bool `json:"echo"`
34+
35+
App AppConfig `json:"app"`
3436
}
3537

3638
func (c Config) Validate() error {
@@ -50,5 +52,14 @@ func (c Config) Validate() error {
5052
return xerrors.Errorf("validate tick_interval: must be greater than zero")
5153
}
5254

55+
if c.SSH && c.App.Name != "" {
56+
return xerrors.Errorf("validate ssh: must be false when app is used")
57+
}
58+
5359
return nil
5460
}
61+
62+
type AppConfig struct {
63+
Name string `json:"name"`
64+
URL string `json:"url"`
65+
}

scaletest/workspacetraffic/conn.go

+119
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import (
55
"encoding/json"
66
"errors"
77
"io"
8+
"net"
9+
"net/http"
810
"sync"
911
"time"
1012

13+
"nhooyr.io/websocket"
14+
1115
"github.com/coder/coder/v2/codersdk"
1216

1317
"github.com/google/uuid"
@@ -260,3 +264,118 @@ func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
260264
func (w *wrappedSSHConn) Write(p []byte) (n int, err error) {
261265
return w.stdin.Write(p)
262266
}
267+
268+
func appClientConn(ctx context.Context, client *codersdk.Client, url string) (*countReadWriteCloser, error) {
269+
headers := http.Header{}
270+
tokenHeader := codersdk.SessionTokenHeader
271+
if client.SessionTokenHeader != "" {
272+
tokenHeader = client.SessionTokenHeader
273+
}
274+
headers.Set(tokenHeader, client.SessionToken())
275+
276+
//nolint:bodyclose // The websocket conn manages the body.
277+
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
278+
HTTPClient: client.HTTPClient,
279+
HTTPHeader: headers,
280+
})
281+
if err != nil {
282+
return nil, xerrors.Errorf("websocket dial: %w", err)
283+
}
284+
285+
netConn := websocketNetConn(conn, websocket.MessageBinary)
286+
287+
// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
288+
crw := &countReadWriteCloser{rwc: netConn}
289+
return crw, nil
290+
}
291+
292+
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
293+
// is called if a read or write error is encountered.
294+
type wsNetConn struct {
295+
net.Conn
296+
297+
writeMu sync.Mutex
298+
readMu sync.Mutex
299+
300+
cancel context.CancelFunc
301+
closeMu sync.Mutex
302+
closed bool
303+
}
304+
305+
func (c *wsNetConn) Read(b []byte) (n int, err error) {
306+
c.readMu.Lock()
307+
defer c.readMu.Unlock()
308+
if c.isClosed() {
309+
return 0, io.EOF
310+
}
311+
n, err = c.Conn.Read(b)
312+
if err != nil {
313+
if c.isClosed() {
314+
return n, io.EOF
315+
}
316+
return n, err
317+
}
318+
return n, nil
319+
}
320+
321+
func (c *wsNetConn) Write(b []byte) (n int, err error) {
322+
c.writeMu.Lock()
323+
defer c.writeMu.Unlock()
324+
if c.isClosed() {
325+
return 0, io.EOF
326+
}
327+
328+
for len(b) > 0 {
329+
bb := b
330+
if len(bb) > rptyJSONMaxDataSize {
331+
bb = b[:rptyJSONMaxDataSize]
332+
}
333+
b = b[len(bb):]
334+
nn, err := c.Conn.Write(bb)
335+
n += nn
336+
if err != nil {
337+
if c.isClosed() {
338+
return n, io.EOF
339+
}
340+
return n, err
341+
}
342+
}
343+
return n, nil
344+
}
345+
346+
func (c *wsNetConn) isClosed() bool {
347+
c.closeMu.Lock()
348+
defer c.closeMu.Unlock()
349+
return c.closed
350+
}
351+
352+
func (c *wsNetConn) Close() error {
353+
c.closeMu.Lock()
354+
closed := c.closed
355+
c.closed = true
356+
c.closeMu.Unlock()
357+
358+
if closed {
359+
return nil
360+
}
361+
362+
// Cancel before acquiring locks to speed up teardown.
363+
c.cancel()
364+
365+
c.readMu.Lock()
366+
defer c.readMu.Unlock()
367+
c.writeMu.Lock()
368+
defer c.writeMu.Unlock()
369+
370+
_ = c.Conn.Close()
371+
return nil
372+
}
373+
374+
func websocketNetConn(conn *websocket.Conn, msgType websocket.MessageType) net.Conn {
375+
// Since `websocket.NetConn` binds to a context for the lifetime of the
376+
// connection, we need to create a new context that can be canceled when
377+
// the connection is closed.
378+
ctx, cancel := context.WithCancel(context.Background())
379+
nc := websocket.NetConn(ctx, conn, msgType)
380+
return &wsNetConn{cancel: cancel, Conn: nc}
381+
}

scaletest/workspacetraffic/run.go

+22-13
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
9191
command := fmt.Sprintf("dd if=/dev/stdin of=%s bs=%d status=none", output, bytesPerTick)
9292

9393
var conn *countReadWriteCloser
94-
if r.cfg.SSH {
94+
switch {
95+
case r.cfg.App.Name != "":
96+
logger.Info(ctx, "sending traffic to workspace app", slog.F("app", r.cfg.App.Name))
97+
conn, err = appClientConn(ctx, r.client, r.cfg.App.URL)
98+
if err != nil {
99+
logger.Error(ctx, "connect to workspace app", slog.Error(err))
100+
return xerrors.Errorf("connect to workspace app: %w", err)
101+
}
102+
103+
case r.cfg.SSH:
95104
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "ssh"))
96105
// If echo is enabled, disable PTY to avoid double echo and
97106
// reduce CPU usage.
@@ -101,7 +110,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
101110
logger.Error(ctx, "connect to workspace agent via ssh", slog.Error(err))
102111
return xerrors.Errorf("connect to workspace via ssh: %w", err)
103112
}
104-
} else {
113+
114+
default:
105115
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "reconnectingpty"))
106116
conn, err = connectRPTY(ctx, r.client, agentID, reconnect, command)
107117
if err != nil {
@@ -114,8 +124,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
114124
closeConn := func() error {
115125
closeOnce.Do(func() {
116126
closeErr = conn.Close()
117-
if err != nil {
118-
logger.Error(ctx, "close agent connection", slog.Error(err))
127+
if closeErr != nil {
128+
logger.Error(ctx, "close agent connection", slog.Error(closeErr))
119129
}
120130
})
121131
return closeErr
@@ -142,7 +152,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
142152

143153
// Read until connection is closed.
144154
go func() {
145-
rch := rch // Shadowed for reassignment.
146155
logger.Debug(ctx, "reading from agent")
147156
rch <- drain(conn)
148157
logger.Debug(ctx, "done reading from agent")
@@ -151,7 +160,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
151160

152161
// Write random data to the conn every tick.
153162
go func() {
154-
wch := wch // Shadowed for reassignment.
155163
logger.Debug(ctx, "writing to agent")
156164
wch <- writeRandomData(conn, bytesPerTick, tick.C)
157165
logger.Debug(ctx, "done writing to agent")
@@ -160,16 +168,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
160168

161169
var waitCloseTimeoutCh <-chan struct{}
162170
deadlineCtxCh := deadlineCtx.Done()
171+
wchRef, rchRef := wch, rch
163172
for {
164-
if wch == nil && rch == nil {
173+
if wchRef == nil && rchRef == nil {
165174
return nil
166175
}
167176

168177
select {
169178
case <-waitCloseTimeoutCh:
170179
logger.Warn(ctx, "timed out waiting for read/write to complete",
171-
slog.F("write_done", wch == nil),
172-
slog.F("read_done", rch == nil),
180+
slog.F("write_done", wchRef == nil),
181+
slog.F("read_done", rchRef == nil),
173182
)
174183
return xerrors.Errorf("timed out waiting for read/write to complete: %w", ctx.Err())
175184
case <-deadlineCtxCh:
@@ -181,16 +190,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
181190
waitCtx, cancel := context.WithTimeout(context.Background(), waitCloseTimeout)
182191
defer cancel() //nolint:revive // Only called once.
183192
waitCloseTimeoutCh = waitCtx.Done()
184-
case err = <-wch:
193+
case err = <-wchRef:
185194
if err != nil {
186195
return xerrors.Errorf("write to agent: %w", err)
187196
}
188-
wch = nil
189-
case err = <-rch:
197+
wchRef = nil
198+
case err = <-rchRef:
190199
if err != nil {
191200
return xerrors.Errorf("read from agent: %w", err)
192201
}
193-
rch = nil
202+
rchRef = nil
194203
}
195204
}
196205
}

0 commit comments

Comments
 (0)