-
Notifications
You must be signed in to change notification settings - Fork 889
feat(cli): add trafficgen command for load testing #7307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
220edbf
737b475
9b26587
c56d84e
e548892
31ef743
fafca95
65c6d88
0bfa9f6
da935a2
5daa526
4f165be
31fa8be
0817204
a6d7870
935dcbd
e2efeff
b105e67
731b4db
9dc28a2
7b98b35
b9c845f
2574a00
516ffa1
655d95a
ca8b212
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import ( | |
"github.com/coder/coder/scaletest/harness" | ||
"github.com/coder/coder/scaletest/reconnectingpty" | ||
"github.com/coder/coder/scaletest/workspacebuild" | ||
"github.com/coder/coder/scaletest/workspacetraffic" | ||
) | ||
|
||
const scaletestTracerName = "coder_scaletest" | ||
|
@@ -42,6 +43,7 @@ func (r *RootCmd) scaletest() *clibase.Cmd { | |
Children: []*clibase.Cmd{ | ||
r.scaletestCleanup(), | ||
r.scaletestCreateWorkspaces(), | ||
r.scaletestWorkspaceTraffic(), | ||
}, | ||
} | ||
|
||
|
@@ -107,7 +109,10 @@ func (s *scaletestTracingFlags) provider(ctx context.Context) (trace.TracerProvi | |
return tracerProvider, func(ctx context.Context) error { | ||
var err error | ||
closeTracingOnce.Do(func() { | ||
err = closeTracing(ctx) | ||
// Allow time to upload traces even if ctx is canceled | ||
traceCtx, traceCancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
defer traceCancel() | ||
err = closeTracing(traceCtx) | ||
}) | ||
|
||
return err | ||
|
@@ -384,33 +389,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd { | |
} | ||
|
||
cliui.Infof(inv.Stdout, "Fetching scaletest workspaces...") | ||
var ( | ||
pageNumber = 0 | ||
limit = 100 | ||
workspaces []codersdk.Workspace | ||
) | ||
for { | ||
page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{ | ||
Name: "scaletest-", | ||
Offset: pageNumber * limit, | ||
Limit: limit, | ||
}) | ||
if err != nil { | ||
return xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err) | ||
} | ||
|
||
pageNumber++ | ||
if len(page.Workspaces) == 0 { | ||
break | ||
} | ||
|
||
pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces)) | ||
for _, w := range page.Workspaces { | ||
if isScaleTestWorkspace(w) { | ||
pageWorkspaces = append(pageWorkspaces, w) | ||
} | ||
} | ||
workspaces = append(workspaces, pageWorkspaces...) | ||
Comment on lines
-387
to
-413
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. review: extracted to function |
||
workspaces, err := getScaletestWorkspaces(ctx, client) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
cliui.Errorf(inv.Stderr, "Found %d scaletest workspaces\n", len(workspaces)) | ||
|
@@ -441,33 +422,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd { | |
} | ||
|
||
cliui.Infof(inv.Stdout, "Fetching scaletest users...") | ||
pageNumber = 0 | ||
limit = 100 | ||
var users []codersdk.User | ||
for { | ||
page, err := client.Users(ctx, codersdk.UsersRequest{ | ||
Search: "scaletest-", | ||
Pagination: codersdk.Pagination{ | ||
Offset: pageNumber * limit, | ||
Limit: limit, | ||
}, | ||
}) | ||
if err != nil { | ||
return xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err) | ||
} | ||
|
||
pageNumber++ | ||
if len(page.Users) == 0 { | ||
break | ||
} | ||
|
||
pageUsers := make([]codersdk.User, 0, len(page.Users)) | ||
for _, u := range page.Users { | ||
if isScaleTestUser(u) { | ||
pageUsers = append(pageUsers, u) | ||
} | ||
} | ||
users = append(users, pageUsers...) | ||
Comment on lines
-444
to
-470
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. review: extracted to function |
||
users, err := getScaletestUsers(ctx, client) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
cliui.Errorf(inv.Stderr, "Found %d scaletest users\n", len(users)) | ||
|
@@ -683,10 +640,11 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { | |
} | ||
defer func() { | ||
// Allow time for traces to flush even if command context is | ||
// canceled. | ||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
defer cancel() | ||
_ = closeTracing(ctx) | ||
// canceled. This is a no-op if tracing is not enabled. | ||
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") | ||
if err := closeTracing(ctx); err != nil { | ||
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) | ||
} | ||
}() | ||
tracer := tracerProvider.Tracer(scaletestTracerName) | ||
|
||
|
@@ -800,17 +758,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { | |
return xerrors.Errorf("cleanup tests: %w", err) | ||
} | ||
|
||
// Upload traces. | ||
if tracingEnabled { | ||
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") | ||
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) | ||
defer cancel() | ||
err := closeTracing(ctx) | ||
if err != nil { | ||
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) | ||
} | ||
} | ||
|
||
if res.TotalFail > 0 { | ||
return xerrors.New("load test failed, see above for more details") | ||
} | ||
|
@@ -947,6 +894,156 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { | |
return cmd | ||
} | ||
|
||
func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { | ||
var ( | ||
tickInterval time.Duration | ||
bytesPerTick int64 | ||
client = &codersdk.Client{} | ||
tracingFlags = &scaletestTracingFlags{} | ||
strategy = &scaletestStrategyFlags{} | ||
cleanupStrategy = &scaletestStrategyFlags{cleanup: true} | ||
output = &scaletestOutputFlags{} | ||
johnstcn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
|
||
cmd := &clibase.Cmd{ | ||
Use: "workspace-traffic", | ||
Short: "Generate traffic to scaletest workspaces through coderd", | ||
Middleware: clibase.Chain( | ||
r.InitClient(client), | ||
), | ||
Handler: func(inv *clibase.Invocation) error { | ||
ctx := inv.Context() | ||
|
||
// Bypass rate limiting | ||
client.HTTPClient = &http.Client{ | ||
Transport: &headerTransport{ | ||
transport: http.DefaultTransport, | ||
header: map[string][]string{ | ||
codersdk.BypassRatelimitHeader: {"true"}, | ||
}, | ||
}, | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion: Since header transport isn't implementing close idler, (Or alternatively, implementing defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() |
||
|
||
workspaces, err := getScaletestWorkspaces(inv.Context(), client) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if len(workspaces) == 0 { | ||
return xerrors.Errorf("no scaletest workspaces exist") | ||
} | ||
|
||
tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx) | ||
if err != nil { | ||
return xerrors.Errorf("create tracer provider: %w", err) | ||
} | ||
defer func() { | ||
// Allow time for traces to flush even if command context is | ||
// canceled. This is a no-op if tracing is not enabled. | ||
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") | ||
if err := closeTracing(ctx); err != nil { | ||
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) | ||
} | ||
}() | ||
tracer := tracerProvider.Tracer(scaletestTracerName) | ||
|
||
outputs, err := output.parse() | ||
if err != nil { | ||
return xerrors.Errorf("could not parse --output flags") | ||
} | ||
|
||
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy()) | ||
for idx, ws := range workspaces { | ||
var ( | ||
agentID uuid.UUID | ||
name = "workspace-traffic" | ||
id = strconv.Itoa(idx) | ||
) | ||
|
||
for _, res := range ws.LatestBuild.Resources { | ||
if len(res.Agents) == 0 { | ||
continue | ||
} | ||
agentID = res.Agents[0].ID | ||
} | ||
|
||
if agentID == uuid.Nil { | ||
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name) | ||
continue | ||
} | ||
|
||
// Setup our workspace agent connection. | ||
config := workspacetraffic.Config{ | ||
AgentID: agentID, | ||
BytesPerTick: bytesPerTick, | ||
Duration: strategy.timeout, | ||
TickInterval: tickInterval, | ||
} | ||
|
||
if err := config.Validate(); err != nil { | ||
return xerrors.Errorf("validate config: %w", err) | ||
} | ||
var runner harness.Runnable = workspacetraffic.NewRunner(client, config) | ||
if tracingEnabled { | ||
runner = &runnableTraceWrapper{ | ||
tracer: tracer, | ||
spanName: fmt.Sprintf("%s/%s", name, id), | ||
runner: runner, | ||
} | ||
} | ||
|
||
th.AddRun(name, id, runner) | ||
} | ||
|
||
_, _ = fmt.Fprintln(inv.Stderr, "Running load test...") | ||
testCtx, testCancel := strategy.toContext(ctx) | ||
defer testCancel() | ||
err = th.Run(testCtx) | ||
if err != nil { | ||
return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err) | ||
} | ||
|
||
res := th.Results() | ||
for _, o := range outputs { | ||
err = o.write(res, inv.Stdout) | ||
if err != nil { | ||
return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err) | ||
} | ||
} | ||
|
||
if res.TotalFail > 0 { | ||
return xerrors.New("load test failed, see above for more details") | ||
} | ||
|
||
return nil | ||
}, | ||
} | ||
|
||
cmd.Options = []clibase.Option{ | ||
{ | ||
Flag: "bytes-per-tick", | ||
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK", | ||
Default: "1024", | ||
Description: "How much traffic to generate per tick.", | ||
Value: clibase.Int64Of(&bytesPerTick), | ||
}, | ||
{ | ||
Flag: "tick-interval", | ||
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL", | ||
Default: "100ms", | ||
Description: "How often to send traffic.", | ||
Value: clibase.DurationOf(&tickInterval), | ||
}, | ||
} | ||
|
||
tracingFlags.attach(&cmd.Options) | ||
strategy.attach(&cmd.Options) | ||
cleanupStrategy.attach(&cmd.Options) | ||
output.attach(&cmd.Options) | ||
|
||
return cmd | ||
} | ||
|
||
type runnableTraceWrapper struct { | ||
tracer trace.Tracer | ||
spanName string | ||
|
@@ -1023,3 +1120,72 @@ func isScaleTestWorkspace(workspace codersdk.Workspace) bool { | |
return strings.HasPrefix(workspace.OwnerName, "scaletest-") || | ||
strings.HasPrefix(workspace.Name, "scaletest-") | ||
} | ||
|
||
func getScaletestWorkspaces(ctx context.Context, client *codersdk.Client) ([]codersdk.Workspace, error) { | ||
var ( | ||
pageNumber = 0 | ||
limit = 100 | ||
workspaces []codersdk.Workspace | ||
) | ||
|
||
for { | ||
page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{ | ||
Name: "scaletest-", | ||
Offset: pageNumber * limit, | ||
Limit: limit, | ||
}) | ||
if err != nil { | ||
return nil, xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err) | ||
} | ||
|
||
pageNumber++ | ||
if len(page.Workspaces) == 0 { | ||
break | ||
} | ||
|
||
pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces)) | ||
for _, w := range page.Workspaces { | ||
if isScaleTestWorkspace(w) { | ||
pageWorkspaces = append(pageWorkspaces, w) | ||
} | ||
} | ||
workspaces = append(workspaces, pageWorkspaces...) | ||
} | ||
return workspaces, nil | ||
} | ||
|
||
func getScaletestUsers(ctx context.Context, client *codersdk.Client) ([]codersdk.User, error) { | ||
var ( | ||
pageNumber = 0 | ||
limit = 100 | ||
users []codersdk.User | ||
) | ||
|
||
for { | ||
page, err := client.Users(ctx, codersdk.UsersRequest{ | ||
Search: "scaletest-", | ||
Pagination: codersdk.Pagination{ | ||
Offset: pageNumber * limit, | ||
Limit: limit, | ||
}, | ||
}) | ||
if err != nil { | ||
return nil, xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err) | ||
} | ||
|
||
pageNumber++ | ||
if len(page.Users) == 0 { | ||
break | ||
} | ||
|
||
pageUsers := make([]codersdk.User, 0, len(page.Users)) | ||
for _, u := range page.Users { | ||
if isScaleTestUser(u) { | ||
pageUsers = append(pageUsers, u) | ||
} | ||
} | ||
users = append(users, pageUsers...) | ||
} | ||
|
||
return users, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review: I'm assuming all of this wants to be sorted alphabetically.