Skip to content

feat(cli/exp): add app testing to scaletest workspace-traffic #11633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions cli/exp_scaletest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand Down Expand Up @@ -859,6 +860,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
tickInterval time.Duration
bytesPerTick int64
ssh bool
app string
template string

client = &codersdk.Client{}
Expand Down Expand Up @@ -911,6 +913,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
}
}

appHost, err := client.AppHost(ctx)
if err != nil {
return xerrors.Errorf("get app host: %w", err)
}

workspaces, err := getScaletestWorkspaces(inv.Context(), client, template)
if err != nil {
return err
Expand Down Expand Up @@ -945,35 +952,39 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
for idx, ws := range workspaces {
var (
agentID uuid.UUID
agentName string
name = "workspace-traffic"
id = strconv.Itoa(idx)
agent codersdk.WorkspaceAgent
name = "workspace-traffic"
id = strconv.Itoa(idx)
)

for _, res := range ws.LatestBuild.Resources {
if len(res.Agents) == 0 {
continue
}
agentID = res.Agents[0].ID
agentName = res.Agents[0].Name
agent = res.Agents[0]
}

if agentID == uuid.Nil {
if agent.ID == uuid.Nil {
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name)
continue
}

appConfig, err := createWorkspaceAppConfig(client, appHost.Host, app, ws, agent)
if err != nil {
return xerrors.Errorf("configure workspace app: %w", err)
}

// Setup our workspace agent connection.
config := workspacetraffic.Config{
AgentID: agentID,
AgentID: agent.ID,
BytesPerTick: bytesPerTick,
Duration: strategy.timeout,
TickInterval: tickInterval,
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name, agentName),
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agentName),
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name, agent.Name),
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agent.Name),
SSH: ssh,
Echo: ssh,
App: appConfig,
}

if err := config.Validate(); err != nil {
Expand Down Expand Up @@ -1046,9 +1057,16 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
Flag: "ssh",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH",
Default: "",
Description: "Send traffic over SSH.",
Description: "Send traffic over SSH, cannot be used with --app.",
Value: clibase.BoolOf(&ssh),
},
{
Flag: "app",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_APP",
Default: "",
Description: "Send WebSocket traffic to a workspace app (proxied via coderd), cannot be used with --ssh.",
Value: clibase.StringOf(&app),
},
}

tracingFlags.attach(&cmd.Options)
Expand Down Expand Up @@ -1411,3 +1429,29 @@ func parseTemplate(ctx context.Context, client *codersdk.Client, organizationIDs

return tpl, nil
}

func createWorkspaceAppConfig(client *codersdk.Client, appHost, app string, workspace codersdk.Workspace, agent codersdk.WorkspaceAgent) (workspacetraffic.AppConfig, error) {
if app == "" {
return workspacetraffic.AppConfig{}, nil
}

i := slices.IndexFunc(agent.Apps, func(a codersdk.WorkspaceApp) bool { return a.Slug == app })
if i == -1 {
return workspacetraffic.AppConfig{}, xerrors.Errorf("app %q not found in workspace %q", app, workspace.Name)
}

c := workspacetraffic.AppConfig{
Name: agent.Apps[i].Slug,
}
if agent.Apps[i].Subdomain {
if appHost == "" {
return workspacetraffic.AppConfig{}, xerrors.Errorf("app %q is a subdomain app but no app host is configured", app)
}

c.URL = fmt.Sprintf("%s://%s", client.URL.Scheme, strings.Replace(appHost, "*", agent.Apps[i].SubdomainName, 1))
} else {
c.URL = fmt.Sprintf("%s/@%s/%s.%s/apps/%s", client.URL.String(), workspace.OwnerName, workspace.Name, agent.Name, agent.Apps[i].Slug)
}

return c, nil
}
11 changes: 11 additions & 0 deletions scaletest/workspacetraffic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Config struct {
// to true will double the amount of data read from the agent for
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
Echo bool `json:"echo"`

App AppConfig `json:"app"`
}

func (c Config) Validate() error {
Expand All @@ -50,5 +52,14 @@ func (c Config) Validate() error {
return xerrors.Errorf("validate tick_interval: must be greater than zero")
}

if c.SSH && c.App.Name != "" {
return xerrors.Errorf("validate ssh: must be false when app is used")
}

return nil
}

type AppConfig struct {
Name string `json:"name"`
URL string `json:"url"`
}
119 changes: 119 additions & 0 deletions scaletest/workspacetraffic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import (
"encoding/json"
"errors"
"io"
"net"
"net/http"
"sync"
"time"

"nhooyr.io/websocket"

"github.com/coder/coder/v2/codersdk"

"github.com/google/uuid"
Expand Down Expand Up @@ -260,3 +264,118 @@ func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
func (w *wrappedSSHConn) Write(p []byte) (n int, err error) {
return w.stdin.Write(p)
}

func appClientConn(ctx context.Context, client *codersdk.Client, url string) (*countReadWriteCloser, error) {
headers := http.Header{}
tokenHeader := codersdk.SessionTokenHeader
if client.SessionTokenHeader != "" {
tokenHeader = client.SessionTokenHeader
}
headers.Set(tokenHeader, client.SessionToken())

//nolint:bodyclose // The websocket conn manages the body.
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
HTTPClient: client.HTTPClient,
HTTPHeader: headers,
})
if err != nil {
return nil, xerrors.Errorf("websocket dial: %w", err)
}

netConn := websocketNetConn(conn, websocket.MessageBinary)

// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
crw := &countReadWriteCloser{rwc: netConn}
return crw, nil
}

// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
// is called if a read or write error is encountered.
type wsNetConn struct {
net.Conn

writeMu sync.Mutex
readMu sync.Mutex

cancel context.CancelFunc
closeMu sync.Mutex
closed bool
}

func (c *wsNetConn) Read(b []byte) (n int, err error) {
c.readMu.Lock()
defer c.readMu.Unlock()
if c.isClosed() {
return 0, io.EOF
}
n, err = c.Conn.Read(b)
if err != nil {
if c.isClosed() {
return n, io.EOF
}
return n, err
}
return n, nil
}

func (c *wsNetConn) Write(b []byte) (n int, err error) {
c.writeMu.Lock()
defer c.writeMu.Unlock()
if c.isClosed() {
return 0, io.EOF
}

for len(b) > 0 {
bb := b
if len(bb) > rptyJSONMaxDataSize {
bb = b[:rptyJSONMaxDataSize]
}
b = b[len(bb):]
nn, err := c.Conn.Write(bb)
n += nn
if err != nil {
if c.isClosed() {
return n, io.EOF
}
return n, err
}
}
return n, nil
}

func (c *wsNetConn) isClosed() bool {
c.closeMu.Lock()
defer c.closeMu.Unlock()
return c.closed
}

func (c *wsNetConn) Close() error {
c.closeMu.Lock()
closed := c.closed
c.closed = true
c.closeMu.Unlock()

if closed {
return nil
}

// Cancel before acquiring locks to speed up teardown.
c.cancel()

c.readMu.Lock()
defer c.readMu.Unlock()
c.writeMu.Lock()
defer c.writeMu.Unlock()

_ = c.Conn.Close()
return nil
}

func websocketNetConn(conn *websocket.Conn, msgType websocket.MessageType) net.Conn {
// Since `websocket.NetConn` binds to a context for the lifetime of the
// connection, we need to create a new context that can be canceled when
// the connection is closed.
ctx, cancel := context.WithCancel(context.Background())
nc := websocket.NetConn(ctx, conn, msgType)
return &wsNetConn{cancel: cancel, Conn: nc}
}
35 changes: 22 additions & 13 deletions scaletest/workspacetraffic/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
command := fmt.Sprintf("dd if=/dev/stdin of=%s bs=%d status=none", output, bytesPerTick)

var conn *countReadWriteCloser
if r.cfg.SSH {
switch {
case r.cfg.App.Name != "":
logger.Info(ctx, "sending traffic to workspace app", slog.F("app", r.cfg.App.Name))
conn, err = appClientConn(ctx, r.client, r.cfg.App.URL)
if err != nil {
logger.Error(ctx, "connect to workspace app", slog.Error(err))
return xerrors.Errorf("connect to workspace app: %w", err)
}

case r.cfg.SSH:
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "ssh"))
// If echo is enabled, disable PTY to avoid double echo and
// reduce CPU usage.
Expand All @@ -101,7 +110,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
logger.Error(ctx, "connect to workspace agent via ssh", slog.Error(err))
return xerrors.Errorf("connect to workspace via ssh: %w", err)
}
} else {

default:
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "reconnectingpty"))
conn, err = connectRPTY(ctx, r.client, agentID, reconnect, command)
if err != nil {
Expand All @@ -114,8 +124,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
closeConn := func() error {
closeOnce.Do(func() {
closeErr = conn.Close()
if err != nil {
logger.Error(ctx, "close agent connection", slog.Error(err))
if closeErr != nil {
logger.Error(ctx, "close agent connection", slog.Error(closeErr))
}
})
return closeErr
Expand All @@ -142,7 +152,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

// Read until connection is closed.
go func() {
rch := rch // Shadowed for reassignment.
logger.Debug(ctx, "reading from agent")
rch <- drain(conn)
logger.Debug(ctx, "done reading from agent")
Expand All @@ -151,7 +160,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

// Write random data to the conn every tick.
go func() {
wch := wch // Shadowed for reassignment.
logger.Debug(ctx, "writing to agent")
wch <- writeRandomData(conn, bytesPerTick, tick.C)
logger.Debug(ctx, "done writing to agent")
Expand All @@ -160,16 +168,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

var waitCloseTimeoutCh <-chan struct{}
deadlineCtxCh := deadlineCtx.Done()
wchRef, rchRef := wch, rch
for {
if wch == nil && rch == nil {
if wchRef == nil && rchRef == nil {
return nil
}

select {
case <-waitCloseTimeoutCh:
logger.Warn(ctx, "timed out waiting for read/write to complete",
slog.F("write_done", wch == nil),
slog.F("read_done", rch == nil),
slog.F("write_done", wchRef == nil),
slog.F("read_done", rchRef == nil),
)
return xerrors.Errorf("timed out waiting for read/write to complete: %w", ctx.Err())
case <-deadlineCtxCh:
Expand All @@ -181,16 +190,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
waitCtx, cancel := context.WithTimeout(context.Background(), waitCloseTimeout)
defer cancel() //nolint:revive // Only called once.
waitCloseTimeoutCh = waitCtx.Done()
case err = <-wch:
case err = <-wchRef:
if err != nil {
return xerrors.Errorf("write to agent: %w", err)
}
wch = nil
case err = <-rch:
wchRef = nil
case err = <-rchRef:
if err != nil {
return xerrors.Errorf("read from agent: %w", err)
}
rch = nil
rchRef = nil
}
}
}
Expand Down
Loading