Skip to content

feat(cli/exp): add app testing to scaletest workspace-traffic #11633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion cli/exp_scaletest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand Down Expand Up @@ -859,6 +860,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
tickInterval time.Duration
bytesPerTick int64
ssh bool
app string
template string

client = &codersdk.Client{}
Expand Down Expand Up @@ -911,6 +913,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
}
}

appHost, err := client.AppHost(ctx)
if err != nil {
return xerrors.Errorf("get app host: %w", err)
}

workspaces, err := getScaletestWorkspaces(inv.Context(), client, template)
if err != nil {
return err
Expand Down Expand Up @@ -949,6 +956,8 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
agentName string
name = "workspace-traffic"
id = strconv.Itoa(idx)
apps []codersdk.WorkspaceApp
appConfig workspacetraffic.AppConfig
)

for _, res := range ws.LatestBuild.Resources {
Expand All @@ -957,13 +966,34 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
}
agentID = res.Agents[0].ID
agentName = res.Agents[0].Name
apps = res.Agents[0].Apps
}

if agentID == uuid.Nil {
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name)
continue
}

if app != "" {
i := slices.IndexFunc(apps, func(a codersdk.WorkspaceApp) bool { return a.Slug == app })
if i == -1 {
return xerrors.Errorf("app %q not found in workspace %q", app, ws.Name)
}

appConfig = workspacetraffic.AppConfig{
Name: apps[i].Slug,
}
if apps[i].Subdomain {
if appHost.Host == "" {
return xerrors.Errorf("app %q is a subdomain app but no app host is configured", app)
}

appConfig.URL = fmt.Sprintf("%s://%s", client.URL.Scheme, strings.Replace(appHost.Host, "*", apps[i].SubdomainName, 1))
} else {
appConfig.URL = fmt.Sprintf("%s/@%s/%s.%s/apps/%s", client.URL.String(), ws.OwnerName, ws.Name, agentName, apps[i].Slug)
}
}

// Setup our workspace agent connection.
config := workspacetraffic.Config{
AgentID: agentID,
Expand All @@ -974,6 +1004,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agentName),
SSH: ssh,
Echo: ssh,
App: appConfig,
}

if err := config.Validate(); err != nil {
Expand Down Expand Up @@ -1046,9 +1077,16 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
Flag: "ssh",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH",
Default: "",
Description: "Send traffic over SSH.",
Description: "Send traffic over SSH, cannot be used with --app.",
Value: clibase.BoolOf(&ssh),
},
{
Flag: "app",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_APP",
Default: "",
Description: "Send WebSocket traffic to a workspace app (proxied via coderd), cannot be used with --ssh.",
Value: clibase.StringOf(&app),
},
}

tracingFlags.attach(&cmd.Options)
Expand Down
11 changes: 11 additions & 0 deletions scaletest/workspacetraffic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Config struct {
// to true will double the amount of data read from the agent for
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
Echo bool `json:"echo"`

App AppConfig `json:"app"`
}

func (c Config) Validate() error {
Expand All @@ -50,5 +52,14 @@ func (c Config) Validate() error {
return xerrors.Errorf("validate tick_interval: must be greater than zero")
}

if c.SSH && c.App.Name != "" {
return xerrors.Errorf("validate ssh: must be false when app is used")
}

return nil
}

type AppConfig struct {
Name string `json:"name"`
URL string `json:"url"`
}
121 changes: 118 additions & 3 deletions scaletest/workspacetraffic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import (
"encoding/json"
"errors"
"io"
"net"
"net/http"
"sync"
"time"

"nhooyr.io/websocket"

"github.com/coder/coder/v2/codersdk"

"github.com/google/uuid"
Expand All @@ -27,9 +31,9 @@ const (
//
// failed to write frame: WebSocket closed: received close frame: status = StatusMessageTooBig and reason = "read limited at 32769 bytes"
//
// Since we can't control fragmentation/buffer sizes, we keep it simple and
// match the conservative payload size used by agent/reconnectingpty (1024).
rptyJSONMaxDataSize = 1024
// Since we can't control fragmentation/buffer sizes, we use a conservative
// value. Derived from 1024 * 9 * 3 = <28KB.
rptyJSONMaxDataSize = 1024 * 9
)

func connectRPTY(ctx context.Context, client *codersdk.Client, agentID, reconnect uuid.UUID, cmd string) (*countReadWriteCloser, error) {
Expand Down Expand Up @@ -260,3 +264,114 @@ func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
func (w *wrappedSSHConn) Write(p []byte) (n int, err error) {
return w.stdin.Write(p)
}

func appClientConn(ctx context.Context, client *codersdk.Client, url string) (*countReadWriteCloser, error) {
headers := http.Header{}
tokenHeader := codersdk.SessionTokenHeader
if client.SessionTokenHeader != "" {
tokenHeader = client.SessionTokenHeader
}
headers.Set(tokenHeader, client.SessionToken())

//nolint:bodyclose // The websocket conn manages the body.
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
HTTPClient: client.HTTPClient,
HTTPHeader: headers,
})
if err != nil {
return nil, xerrors.Errorf("websocket dial: %w", err)
}

netConn := websocketNetConn(conn, websocket.MessageBinary)

// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
crw := &countReadWriteCloser{rwc: netConn}
return crw, nil
}

// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
// is called if a read or write error is encountered.
type wsNetConn struct {
net.Conn

writeMu sync.Mutex
readMu sync.Mutex

cancel context.CancelFunc
closeMu sync.Mutex
closed bool
}

func (c *wsNetConn) Read(b []byte) (n int, err error) {
c.readMu.Lock()
defer c.readMu.Unlock()
if c.isClosed() {
return 0, io.EOF
}
n, err = c.Conn.Read(b)
if err != nil {
if c.isClosed() {
return n, io.EOF
}
return n, err
}
return n, nil
}

func (c *wsNetConn) Write(b []byte) (n int, err error) {
c.writeMu.Lock()
defer c.writeMu.Unlock()
if c.isClosed() {
return 0, io.EOF
}

for len(b) > 0 {
bb := b
if len(bb) > rptyJSONMaxDataSize {
bb = b[:rptyJSONMaxDataSize]
}
b = b[len(bb):]
nn, err := c.Conn.Write(bb)
n += nn
if err != nil {
if c.isClosed() {
return n, io.EOF
}
return n, err
}
}
return n, nil
}

func (c *wsNetConn) isClosed() bool {
c.closeMu.Lock()
defer c.closeMu.Unlock()
return c.closed
}

func (c *wsNetConn) Close() error {
c.closeMu.Lock()
closed := c.closed
c.closed = true
c.closeMu.Unlock()

if closed {
return nil
}

c.cancel()

c.readMu.Lock()
defer c.readMu.Unlock()
c.writeMu.Lock()
defer c.writeMu.Unlock()

_ = c.Conn.Close()
return nil
}

func websocketNetConn(conn *websocket.Conn, msgType websocket.MessageType) net.Conn {
ctx, cancel := context.WithCancel(context.Background())
nc := websocket.NetConn(ctx, conn, msgType)
return &wsNetConn{cancel: cancel, Conn: nc}
}
35 changes: 22 additions & 13 deletions scaletest/workspacetraffic/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
command := fmt.Sprintf("dd if=/dev/stdin of=%s bs=%d status=none", output, bytesPerTick)

var conn *countReadWriteCloser
if r.cfg.SSH {
switch {
case r.cfg.App.Name != "":
logger.Info(ctx, "sending traffic to workspace app", slog.F("app", r.cfg.App.Name))
conn, err = appClientConn(ctx, r.client, r.cfg.App.URL)
if err != nil {
logger.Error(ctx, "connect to workspace app", slog.Error(err))
return xerrors.Errorf("connect to workspace app: %w", err)
}

case r.cfg.SSH:
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "ssh"))
// If echo is enabled, disable PTY to avoid double echo and
// reduce CPU usage.
Expand All @@ -101,7 +110,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
logger.Error(ctx, "connect to workspace agent via ssh", slog.Error(err))
return xerrors.Errorf("connect to workspace via ssh: %w", err)
}
} else {

default:
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "reconnectingpty"))
conn, err = connectRPTY(ctx, r.client, agentID, reconnect, command)
if err != nil {
Expand All @@ -114,8 +124,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
closeConn := func() error {
closeOnce.Do(func() {
closeErr = conn.Close()
if err != nil {
logger.Error(ctx, "close agent connection", slog.Error(err))
if closeErr != nil {
logger.Error(ctx, "close agent connection", slog.Error(closeErr))
}
})
return closeErr
Expand All @@ -142,7 +152,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

// Read until connection is closed.
go func() {
rch := rch // Shadowed for reassignment.
logger.Debug(ctx, "reading from agent")
rch <- drain(conn)
logger.Debug(ctx, "done reading from agent")
Expand All @@ -151,7 +160,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

// Write random data to the conn every tick.
go func() {
wch := wch // Shadowed for reassignment.
logger.Debug(ctx, "writing to agent")
wch <- writeRandomData(conn, bytesPerTick, tick.C)
logger.Debug(ctx, "done writing to agent")
Expand All @@ -160,16 +168,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

var waitCloseTimeoutCh <-chan struct{}
deadlineCtxCh := deadlineCtx.Done()
wchRef, rchRef := wch, rch
for {
if wch == nil && rch == nil {
if wchRef == nil && rchRef == nil {
return nil
}

select {
case <-waitCloseTimeoutCh:
logger.Warn(ctx, "timed out waiting for read/write to complete",
slog.F("write_done", wch == nil),
slog.F("read_done", rch == nil),
slog.F("write_done", wchRef == nil),
slog.F("read_done", rchRef == nil),
)
return xerrors.Errorf("timed out waiting for read/write to complete: %w", ctx.Err())
case <-deadlineCtxCh:
Expand All @@ -181,16 +190,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
waitCtx, cancel := context.WithTimeout(context.Background(), waitCloseTimeout)
defer cancel() //nolint:revive // Only called once.
waitCloseTimeoutCh = waitCtx.Done()
case err = <-wch:
case err = <-wchRef:
if err != nil {
return xerrors.Errorf("write to agent: %w", err)
}
wch = nil
case err = <-rch:
wchRef = nil
case err = <-rchRef:
if err != nil {
return xerrors.Errorf("read from agent: %w", err)
}
rch = nil
rchRef = nil
}
}
}
Expand Down
Loading