Skip to content

feat(cli): add trafficgen command for load testing #7307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
220edbf
feat(cli): add trafficgen command for load testing
johnstcn Apr 18, 2023
737b475
skip test for now
johnstcn Apr 27, 2023
9b26587
make fmt
johnstcn Apr 27, 2023
c56d84e
lint
johnstcn Apr 27, 2023
e548892
swap order of waiting for read and write
johnstcn Apr 27, 2023
31ef743
close connection, add output formatting
johnstcn May 2, 2023
fafca95
do what the comment says
johnstcn May 2, 2023
65c6d88
move back under scaletest cmd
johnstcn May 2, 2023
0bfa9f6
integrate with scaletest harness
johnstcn May 2, 2023
da935a2
drain connection async
johnstcn May 3, 2023
5daa526
fix cancellation
johnstcn May 3, 2023
4f165be
handle deadline exceeded in drain
johnstcn May 3, 2023
31fa8be
address PR comments
johnstcn May 3, 2023
0817204
fixup! address PR comments
johnstcn May 3, 2023
a6d7870
ACTUALLY limit traffic instead of just blasting the firehose
johnstcn May 3, 2023
935dcbd
log config
johnstcn May 3, 2023
e2efeff
lint
johnstcn May 3, 2023
b105e67
chore(cli): scaletest: move logic for flushing traces into tracing pr…
johnstcn May 4, 2023
731b4db
remove unnecessary context-based I/O
johnstcn May 4, 2023
9dc28a2
refactor bytes per second to bytes per tick and tick interval
johnstcn May 4, 2023
7b98b35
rename trafficgen -> workspace-traffic
johnstcn May 4, 2023
b9c845f
make gen
johnstcn May 4, 2023
2574a00
use strategy.timeout instead of duration
johnstcn May 4, 2023
516ffa1
rm ctx from countReadWriter
johnstcn May 4, 2023
655d95a
fixup
johnstcn May 5, 2023
ca8b212
Merge remote-tracking branch 'origin/main' into cj/scaletest-trafficgen
johnstcn May 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(cli): add trafficgen command for load testing
  • Loading branch information
johnstcn committed May 3, 2023
commit 220edbfeebe582eb903791160b6e0b549386c032
1 change: 1 addition & 0 deletions cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (r *RootCmd) Core() []*clibase.Cmd {
// Hidden
r.workspaceAgent(),
r.scaletest(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scaletest is no longer hidden, even if your trafficgen subcommand is

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Un-hidden!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant that coder scaletest is not hidden, so this shouldn't be in the // Hidden section of the list.

r.trafficGen(),
r.gitssh(),
r.vscodeSSH(),
}
Expand Down
2 changes: 1 addition & 1 deletion cli/scaletest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func TestScaleTest(t *testing.T) {
t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942")
// t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942")
t.Parallel()

// This test does a create-workspaces scale test with --no-cleanup, checks
Expand Down
216 changes: 216 additions & 0 deletions cli/trafficgen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package cli

import (
"context"
"encoding/json"
"fmt"
"github.com/coder/coder/cli/clibase"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/cryptorand"
"github.com/google/uuid"
"golang.org/x/xerrors"
"io"
"sync/atomic"
"time"
)

func (r *RootCmd) trafficGen() *clibase.Cmd {
var (
duration time.Duration
bps int64
client = new(codersdk.Client)
)

cmd := &clibase.Cmd{
Use: "trafficgen",
Hidden: true,
Short: "Generate traffic to a Coder workspace",
Middleware: clibase.Chain(
clibase.RequireRangeArgs(1, 2),
r.InitClient(client),
),
Handler: func(inv *clibase.Invocation) error {
var agentName string
ws, err := namedWorkspace(inv.Context(), client, inv.Args[0])
if err != nil {
return err
}

var agentID uuid.UUID
for _, res := range ws.LatestBuild.Resources {
if len(res.Agents) == 0 {
continue
}
if agentName != "" && agentName != res.Agents[0].Name {
continue
}
agentID = res.Agents[0].ID
}

if agentID == uuid.Nil {
return xerrors.Errorf("no agent found for workspace %s", ws.Name)
}

reconnect := uuid.New()
conn, err := client.WorkspaceAgentReconnectingPTY(inv.Context(), codersdk.WorkspaceAgentReconnectingPTYOpts{
AgentID: agentID,
Reconnect: reconnect,
Height: 65535,
Width: 65535,
Command: "/bin/sh",
})
if err != nil {
return xerrors.Errorf("connect to workspace: %w", err)
}

defer func() {
_ = conn.Close()
}()
start := time.Now()
ctx, cancel := context.WithDeadline(inv.Context(), start.Add(duration))
defer cancel()
crw := countReadWriter{ReadWriter: conn}
// First, write a comment to the pty so we don't execute anything.
data, err := json.Marshal(codersdk.ReconnectingPTYRequest{
Data: "#",
})
if err != nil {
return xerrors.Errorf("write comment to pty: %w", err)
}
_, err = crw.Write(data)
// Now we begin writing random data to the pty.
writeSize := int(bps / 10)
rch := make(chan error)
wch := make(chan error)
go func() {
rch <- readForever(ctx, &crw)
close(rch)
}()
go func() {
wch <- writeRandomData(ctx, &crw, writeSize, 100*time.Millisecond)
close(wch)
}()

if rErr := <-rch; rErr != nil {
return xerrors.Errorf("read from pty: %w", rErr)
}
if wErr := <-wch; wErr != nil {
return xerrors.Errorf("write to pty: %w", wErr)
}

_, _ = fmt.Fprintf(inv.Stdout, "Test results:\n")
_, _ = fmt.Fprintf(inv.Stdout, "Took: %.2fs\n", time.Since(start).Seconds())
_, _ = fmt.Fprintf(inv.Stdout, "Sent: %d bytes\n", crw.BytesWritten())
_, _ = fmt.Fprintf(inv.Stdout, "Rcvd: %d bytes\n", crw.BytesRead())
return nil
},
}

cmd.Options = []clibase.Option{
{
Flag: "duration",
Env: "CODER_TRAFFICGEN_DURATION",
Default: "10s",
Description: "How long to generate traffic for.",
Value: clibase.DurationOf(&duration),
},
{
Flag: "bps",
Env: "CODER_TRAFFICGEN_BPS",
Default: "1024",
Description: "How much traffic to generate in bytes per second.",
Value: clibase.Int64Of(&bps),
},
}

return cmd
}

func readForever(ctx context.Context, src io.Reader) error {
buf := make([]byte, 1024)
for {
select {
case <-ctx.Done():
return nil
default:
_, err := src.Read(buf)
if err != nil && err != io.EOF {
return err
}
}
}
}

func writeRandomData(ctx context.Context, dst io.Writer, size int, period time.Duration) error {
tick := time.NewTicker(period)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-tick.C:
randStr, err := cryptorand.String(size)
if err != nil {
return err
}
data, err := json.Marshal(codersdk.ReconnectingPTYRequest{
Data: randStr,
})
if err != nil {
return err
}
err = copyContext(ctx, dst, data)
if err != nil {
return err
}
}
}
}

func copyContext(ctx context.Context, dst io.Writer, src []byte) error {
for idx := range src {
select {
case <-ctx.Done():
return nil
default:
_, err := dst.Write(src[idx : idx+1])
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
}
return nil
}

type countReadWriter struct {
io.ReadWriter
bytesRead atomic.Int64
bytesWritten atomic.Int64
}

func (w *countReadWriter) Read(p []byte) (int, error) {
n, err := w.ReadWriter.Read(p)
if err == nil {
w.bytesRead.Add(int64(n))
}
return n, err
}

func (w *countReadWriter) Write(p []byte) (int, error) {
n, err := w.ReadWriter.Write(p)
if err == nil {
w.bytesWritten.Add(int64(n))
}
return n, err
}

func (w *countReadWriter) BytesRead() int64 {
return w.bytesRead.Load()
}

func (w *countReadWriter) BytesWritten() int64 {
return w.bytesWritten.Load()
}
90 changes: 90 additions & 0 deletions cli/trafficgen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package cli_test

import (
"bytes"
"context"
"github.com/coder/coder/agent"
"github.com/coder/coder/cli/clitest"
"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/codersdk/agentsdk"
"github.com/coder/coder/provisioner/echo"
"github.com/coder/coder/provisionersdk/proto"
"github.com/coder/coder/testutil"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"strings"
"testing"
)

// This test pretends to stand up a workspace and run a no-op traffic generation test.
// It's not a real test, but it's useful for debugging.
// We do not perform any cleanup.
func TestTrafficGen(t *testing.T) {
t.Parallel()

ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancelFunc()

client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
user := coderdtest.CreateFirstUser(t, client)

authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionPlan: echo.ProvisionComplete,
ProvisionApply: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Name: "agent",
Auth: &proto.Agent_Token{
Token: authToken,
},
Apps: []*proto.App{},
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)

ws := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, ws.LatestBuild.ID)

agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(authToken)
agentCloser := agent.New(agent.Options{
Client: agentClient,
})
t.Cleanup(func() {
_ = agentCloser.Close()
})

coderdtest.AwaitWorkspaceAgents(t, client, ws.ID)

inv, root := clitest.New(t, "trafficgen", ws.Name,
"--duration", "1s",
"--bps", "100",
)
clitest.SetupConfig(t, client, root)
var stdout, stderr bytes.Buffer
inv.Stdout = &stdout
inv.Stderr = &stderr
err := inv.WithContext(ctx).Run()
require.NoError(t, err)
stdoutStr := stdout.String()
stderrStr := stderr.String()
require.Empty(t, stderrStr)
lines := strings.Split(strings.TrimSpace(stdoutStr), "\n")
require.Len(t, lines, 4)
require.Equal(t, "Test results:", lines[0])
require.Regexp(t, `Took:\s+\d+\.\d+s`, lines[1])
require.Regexp(t, `Sent:\s+\d+ bytes`, lines[2])
require.Regexp(t, `Rcvd:\s+\d+ bytes`, lines[3])
}
6 changes: 3 additions & 3 deletions codersdk/workspaceagentconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ type WorkspaceAgentReconnectingPTYInit struct {
// to pipe data to a PTY.
// @typescript-ignore ReconnectingPTYRequest
type ReconnectingPTYRequest struct {
Data string `json:"data"`
Height uint16 `json:"height"`
Width uint16 `json:"width"`
Data string `json:"data,omitempty"`
Height uint16 `json:"height,omitempty"`
Width uint16 `json:"width,omitempty"`
Comment on lines +168 to +170
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: Added omitempty here to reduce the size of the payload.

}

// ReconnectingPTY spawns a new reconnecting terminal session.
Expand Down