Skip to content

feat(cli): add trafficgen command for load testing #7307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
220edbf
feat(cli): add trafficgen command for load testing
johnstcn Apr 18, 2023
737b475
skip test for now
johnstcn Apr 27, 2023
9b26587
make fmt
johnstcn Apr 27, 2023
c56d84e
lint
johnstcn Apr 27, 2023
e548892
swap order of waiting for read and write
johnstcn Apr 27, 2023
31ef743
close connection, add output formatting
johnstcn May 2, 2023
fafca95
do what the comment says
johnstcn May 2, 2023
65c6d88
move back under scaletest cmd
johnstcn May 2, 2023
0bfa9f6
integrate with scaletest harness
johnstcn May 2, 2023
da935a2
drain connection async
johnstcn May 3, 2023
5daa526
fix cancellation
johnstcn May 3, 2023
4f165be
handle deadline exceeded in drain
johnstcn May 3, 2023
31fa8be
address PR comments
johnstcn May 3, 2023
0817204
fixup! address PR comments
johnstcn May 3, 2023
a6d7870
ACTUALLY limit traffic instead of just blasting the firehose
johnstcn May 3, 2023
935dcbd
log config
johnstcn May 3, 2023
e2efeff
lint
johnstcn May 3, 2023
b105e67
chore(cli): scaletest: move logic for flushing traces into tracing pr…
johnstcn May 4, 2023
731b4db
remove unnecessary context-based I/O
johnstcn May 4, 2023
9dc28a2
refactor bytes per second to bytes per tick and tick interval
johnstcn May 4, 2023
7b98b35
rename trafficgen -> workspace-traffic
johnstcn May 4, 2023
b9c845f
make gen
johnstcn May 4, 2023
2574a00
use strategy.timeout instead of duration
johnstcn May 4, 2023
516ffa1
rm ctx from countReadWriter
johnstcn May 4, 2023
655d95a
fixup
johnstcn May 5, 2023
ca8b212
Merge remote-tracking branch 'origin/main' into cj/scaletest-trafficgen
johnstcn May 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ func (r *RootCmd) Core() []*clibase.Cmd {

// Workspace Commands
r.configSSH(),
r.rename(),
r.ping(),
r.create(),
r.deleteWorkspace(),
r.list(),
r.parameters(),
r.ping(),
r.rename(),
r.scaletest(),
r.schedules(),
r.show(),
r.speedtest(),
Expand All @@ -100,13 +102,11 @@ func (r *RootCmd) Core() []*clibase.Cmd {
r.stop(),
r.update(),
r.restart(),
r.parameters(),

// Hidden
r.workspaceAgent(),
r.scaletest(),
r.gitssh(),
r.vscodeSSH(),
r.workspaceAgent(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: I'm assuming all of this wants to be sorted alphabetically.

}
}

Expand Down
306 changes: 236 additions & 70 deletions cli/scaletest.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/coder/coder/scaletest/harness"
"github.com/coder/coder/scaletest/reconnectingpty"
"github.com/coder/coder/scaletest/workspacebuild"
"github.com/coder/coder/scaletest/workspacetraffic"
)

const scaletestTracerName = "coder_scaletest"
Expand All @@ -42,6 +43,7 @@ func (r *RootCmd) scaletest() *clibase.Cmd {
Children: []*clibase.Cmd{
r.scaletestCleanup(),
r.scaletestCreateWorkspaces(),
r.scaletestWorkspaceTraffic(),
},
}

Expand Down Expand Up @@ -107,7 +109,10 @@ func (s *scaletestTracingFlags) provider(ctx context.Context) (trace.TracerProvi
return tracerProvider, func(ctx context.Context) error {
var err error
closeTracingOnce.Do(func() {
err = closeTracing(ctx)
// Allow time to upload traces even if ctx is canceled
traceCtx, traceCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer traceCancel()
err = closeTracing(traceCtx)
})

return err
Expand Down Expand Up @@ -384,33 +389,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd {
}

cliui.Infof(inv.Stdout, "Fetching scaletest workspaces...")
var (
pageNumber = 0
limit = 100
workspaces []codersdk.Workspace
)
for {
page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
Name: "scaletest-",
Offset: pageNumber * limit,
Limit: limit,
})
if err != nil {
return xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err)
}

pageNumber++
if len(page.Workspaces) == 0 {
break
}

pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces))
for _, w := range page.Workspaces {
if isScaleTestWorkspace(w) {
pageWorkspaces = append(pageWorkspaces, w)
}
}
workspaces = append(workspaces, pageWorkspaces...)
Comment on lines -387 to -413
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: extracted to function

workspaces, err := getScaletestWorkspaces(ctx, client)
if err != nil {
return err
}

cliui.Errorf(inv.Stderr, "Found %d scaletest workspaces\n", len(workspaces))
Expand Down Expand Up @@ -441,33 +422,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd {
}

cliui.Infof(inv.Stdout, "Fetching scaletest users...")
pageNumber = 0
limit = 100
var users []codersdk.User
for {
page, err := client.Users(ctx, codersdk.UsersRequest{
Search: "scaletest-",
Pagination: codersdk.Pagination{
Offset: pageNumber * limit,
Limit: limit,
},
})
if err != nil {
return xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err)
}

pageNumber++
if len(page.Users) == 0 {
break
}

pageUsers := make([]codersdk.User, 0, len(page.Users))
for _, u := range page.Users {
if isScaleTestUser(u) {
pageUsers = append(pageUsers, u)
}
}
users = append(users, pageUsers...)
Comment on lines -444 to -470
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: extracted to function

users, err := getScaletestUsers(ctx, client)
if err != nil {
return err
}

cliui.Errorf(inv.Stderr, "Found %d scaletest users\n", len(users))
Expand Down Expand Up @@ -683,10 +640,11 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
}
defer func() {
// Allow time for traces to flush even if command context is
// canceled.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = closeTracing(ctx)
// canceled. This is a no-op if tracing is not enabled.
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
if err := closeTracing(ctx); err != nil {
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
}
}()
tracer := tracerProvider.Tracer(scaletestTracerName)

Expand Down Expand Up @@ -800,17 +758,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
return xerrors.Errorf("cleanup tests: %w", err)
}

// Upload traces.
if tracingEnabled {
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
err := closeTracing(ctx)
if err != nil {
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
}
}

if res.TotalFail > 0 {
return xerrors.New("load test failed, see above for more details")
}
Expand Down Expand Up @@ -947,6 +894,156 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
return cmd
}

func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
var (
tickInterval time.Duration
bytesPerTick int64
client = &codersdk.Client{}
tracingFlags = &scaletestTracingFlags{}
strategy = &scaletestStrategyFlags{}
cleanupStrategy = &scaletestStrategyFlags{cleanup: true}
output = &scaletestOutputFlags{}
)

cmd := &clibase.Cmd{
Use: "workspace-traffic",
Short: "Generate traffic to scaletest workspaces through coderd",
Middleware: clibase.Chain(
r.InitClient(client),
),
Handler: func(inv *clibase.Invocation) error {
ctx := inv.Context()

// Bypass rate limiting
client.HTTPClient = &http.Client{
Transport: &headerTransport{
transport: http.DefaultTransport,
header: map[string][]string{
codersdk.BypassRatelimitHeader: {"true"},
},
},
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Since header transport isn't implementing close idler, client.HTTPClient.CloseIdleConnections() won't work. This might be a good addition to avoid leaks in tests:

(Or alternatively, implementing type closeIdler interface { CloseIdleConnections() } on headerTransport and doing the client variant.)

defer http.DefaultTransport.(*http.Transport).CloseIdleConnections()


workspaces, err := getScaletestWorkspaces(inv.Context(), client)
if err != nil {
return err
}

if len(workspaces) == 0 {
return xerrors.Errorf("no scaletest workspaces exist")
}

tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx)
if err != nil {
return xerrors.Errorf("create tracer provider: %w", err)
}
defer func() {
// Allow time for traces to flush even if command context is
// canceled. This is a no-op if tracing is not enabled.
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
if err := closeTracing(ctx); err != nil {
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
}
}()
tracer := tracerProvider.Tracer(scaletestTracerName)

outputs, err := output.parse()
if err != nil {
return xerrors.Errorf("could not parse --output flags")
}

th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
for idx, ws := range workspaces {
var (
agentID uuid.UUID
name = "workspace-traffic"
id = strconv.Itoa(idx)
)

for _, res := range ws.LatestBuild.Resources {
if len(res.Agents) == 0 {
continue
}
agentID = res.Agents[0].ID
}

if agentID == uuid.Nil {
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name)
continue
}

// Setup our workspace agent connection.
config := workspacetraffic.Config{
AgentID: agentID,
BytesPerTick: bytesPerTick,
Duration: strategy.timeout,
TickInterval: tickInterval,
}

if err := config.Validate(); err != nil {
return xerrors.Errorf("validate config: %w", err)
}
var runner harness.Runnable = workspacetraffic.NewRunner(client, config)
if tracingEnabled {
runner = &runnableTraceWrapper{
tracer: tracer,
spanName: fmt.Sprintf("%s/%s", name, id),
runner: runner,
}
}

th.AddRun(name, id, runner)
}

_, _ = fmt.Fprintln(inv.Stderr, "Running load test...")
testCtx, testCancel := strategy.toContext(ctx)
defer testCancel()
err = th.Run(testCtx)
if err != nil {
return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err)
}

res := th.Results()
for _, o := range outputs {
err = o.write(res, inv.Stdout)
if err != nil {
return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err)
}
}

if res.TotalFail > 0 {
return xerrors.New("load test failed, see above for more details")
}

return nil
},
}

cmd.Options = []clibase.Option{
{
Flag: "bytes-per-tick",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK",
Default: "1024",
Description: "How much traffic to generate per tick.",
Value: clibase.Int64Of(&bytesPerTick),
},
{
Flag: "tick-interval",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL",
Default: "100ms",
Description: "How often to send traffic.",
Value: clibase.DurationOf(&tickInterval),
},
}

tracingFlags.attach(&cmd.Options)
strategy.attach(&cmd.Options)
cleanupStrategy.attach(&cmd.Options)
output.attach(&cmd.Options)

return cmd
}

type runnableTraceWrapper struct {
tracer trace.Tracer
spanName string
Expand Down Expand Up @@ -1023,3 +1120,72 @@ func isScaleTestWorkspace(workspace codersdk.Workspace) bool {
return strings.HasPrefix(workspace.OwnerName, "scaletest-") ||
strings.HasPrefix(workspace.Name, "scaletest-")
}

func getScaletestWorkspaces(ctx context.Context, client *codersdk.Client) ([]codersdk.Workspace, error) {
var (
pageNumber = 0
limit = 100
workspaces []codersdk.Workspace
)

for {
page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
Name: "scaletest-",
Offset: pageNumber * limit,
Limit: limit,
})
if err != nil {
return nil, xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err)
}

pageNumber++
if len(page.Workspaces) == 0 {
break
}

pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces))
for _, w := range page.Workspaces {
if isScaleTestWorkspace(w) {
pageWorkspaces = append(pageWorkspaces, w)
}
}
workspaces = append(workspaces, pageWorkspaces...)
}
return workspaces, nil
}

func getScaletestUsers(ctx context.Context, client *codersdk.Client) ([]codersdk.User, error) {
var (
pageNumber = 0
limit = 100
users []codersdk.User
)

for {
page, err := client.Users(ctx, codersdk.UsersRequest{
Search: "scaletest-",
Pagination: codersdk.Pagination{
Offset: pageNumber * limit,
Limit: limit,
},
})
if err != nil {
return nil, xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err)
}

pageNumber++
if len(page.Users) == 0 {
break
}

pageUsers := make([]codersdk.User, 0, len(page.Users))
for _, u := range page.Users {
if isScaleTestUser(u) {
pageUsers = append(pageUsers, u)
}
}
users = append(users, pageUsers...)
}

return users, nil
}
Loading