Skip to content

Commit 08fb9a6

Browse files
authored
feat(cli): add trafficgen command for load testing (#7307)
This PR adds a scaletest workspace-traffic command for load testing. This opens a ReconnectingPTY connection to each scaletest workspace (via coderd) and concurrently writes and reads random data to/from the PTY. Payloads are of the form #${RANDOM_ALPHANUMERIC_STRING}, which essentially drops garbage comments in the remote shell, and should not result in any commands being executed.
1 parent a172e07 commit 08fb9a6

11 files changed

+787
-79
lines changed

cli/root.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,13 @@ func (r *RootCmd) Core() []*clibase.Cmd {
8787

8888
// Workspace Commands
8989
r.configSSH(),
90-
r.rename(),
91-
r.ping(),
9290
r.create(),
9391
r.deleteWorkspace(),
9492
r.list(),
93+
r.parameters(),
94+
r.ping(),
95+
r.rename(),
96+
r.scaletest(),
9597
r.schedules(),
9698
r.show(),
9799
r.speedtest(),
@@ -100,13 +102,11 @@ func (r *RootCmd) Core() []*clibase.Cmd {
100102
r.stop(),
101103
r.update(),
102104
r.restart(),
103-
r.parameters(),
104105

105106
// Hidden
106-
r.workspaceAgent(),
107-
r.scaletest(),
108107
r.gitssh(),
109108
r.vscodeSSH(),
109+
r.workspaceAgent(),
110110
}
111111
}
112112

cli/scaletest.go

+236-70
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/coder/coder/scaletest/harness"
2929
"github.com/coder/coder/scaletest/reconnectingpty"
3030
"github.com/coder/coder/scaletest/workspacebuild"
31+
"github.com/coder/coder/scaletest/workspacetraffic"
3132
)
3233

3334
const scaletestTracerName = "coder_scaletest"
@@ -42,6 +43,7 @@ func (r *RootCmd) scaletest() *clibase.Cmd {
4243
Children: []*clibase.Cmd{
4344
r.scaletestCleanup(),
4445
r.scaletestCreateWorkspaces(),
46+
r.scaletestWorkspaceTraffic(),
4547
},
4648
}
4749

@@ -107,7 +109,10 @@ func (s *scaletestTracingFlags) provider(ctx context.Context) (trace.TracerProvi
107109
return tracerProvider, func(ctx context.Context) error {
108110
var err error
109111
closeTracingOnce.Do(func() {
110-
err = closeTracing(ctx)
112+
// Allow time to upload traces even if ctx is canceled
113+
traceCtx, traceCancel := context.WithTimeout(context.Background(), 10*time.Second)
114+
defer traceCancel()
115+
err = closeTracing(traceCtx)
111116
})
112117

113118
return err
@@ -384,33 +389,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd {
384389
}
385390

386391
cliui.Infof(inv.Stdout, "Fetching scaletest workspaces...")
387-
var (
388-
pageNumber = 0
389-
limit = 100
390-
workspaces []codersdk.Workspace
391-
)
392-
for {
393-
page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
394-
Name: "scaletest-",
395-
Offset: pageNumber * limit,
396-
Limit: limit,
397-
})
398-
if err != nil {
399-
return xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err)
400-
}
401-
402-
pageNumber++
403-
if len(page.Workspaces) == 0 {
404-
break
405-
}
406-
407-
pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces))
408-
for _, w := range page.Workspaces {
409-
if isScaleTestWorkspace(w) {
410-
pageWorkspaces = append(pageWorkspaces, w)
411-
}
412-
}
413-
workspaces = append(workspaces, pageWorkspaces...)
392+
workspaces, err := getScaletestWorkspaces(ctx, client)
393+
if err != nil {
394+
return err
414395
}
415396

416397
cliui.Errorf(inv.Stderr, "Found %d scaletest workspaces\n", len(workspaces))
@@ -441,33 +422,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd {
441422
}
442423

443424
cliui.Infof(inv.Stdout, "Fetching scaletest users...")
444-
pageNumber = 0
445-
limit = 100
446-
var users []codersdk.User
447-
for {
448-
page, err := client.Users(ctx, codersdk.UsersRequest{
449-
Search: "scaletest-",
450-
Pagination: codersdk.Pagination{
451-
Offset: pageNumber * limit,
452-
Limit: limit,
453-
},
454-
})
455-
if err != nil {
456-
return xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err)
457-
}
458-
459-
pageNumber++
460-
if len(page.Users) == 0 {
461-
break
462-
}
463-
464-
pageUsers := make([]codersdk.User, 0, len(page.Users))
465-
for _, u := range page.Users {
466-
if isScaleTestUser(u) {
467-
pageUsers = append(pageUsers, u)
468-
}
469-
}
470-
users = append(users, pageUsers...)
425+
users, err := getScaletestUsers(ctx, client)
426+
if err != nil {
427+
return err
471428
}
472429

473430
cliui.Errorf(inv.Stderr, "Found %d scaletest users\n", len(users))
@@ -683,10 +640,11 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
683640
}
684641
defer func() {
685642
// Allow time for traces to flush even if command context is
686-
// canceled.
687-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
688-
defer cancel()
689-
_ = closeTracing(ctx)
643+
// canceled. This is a no-op if tracing is not enabled.
644+
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
645+
if err := closeTracing(ctx); err != nil {
646+
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
647+
}
690648
}()
691649
tracer := tracerProvider.Tracer(scaletestTracerName)
692650

@@ -800,17 +758,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
800758
return xerrors.Errorf("cleanup tests: %w", err)
801759
}
802760

803-
// Upload traces.
804-
if tracingEnabled {
805-
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
806-
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
807-
defer cancel()
808-
err := closeTracing(ctx)
809-
if err != nil {
810-
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
811-
}
812-
}
813-
814761
if res.TotalFail > 0 {
815762
return xerrors.New("load test failed, see above for more details")
816763
}
@@ -947,6 +894,156 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
947894
return cmd
948895
}
949896

897+
func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
898+
var (
899+
tickInterval time.Duration
900+
bytesPerTick int64
901+
client = &codersdk.Client{}
902+
tracingFlags = &scaletestTracingFlags{}
903+
strategy = &scaletestStrategyFlags{}
904+
cleanupStrategy = &scaletestStrategyFlags{cleanup: true}
905+
output = &scaletestOutputFlags{}
906+
)
907+
908+
cmd := &clibase.Cmd{
909+
Use: "workspace-traffic",
910+
Short: "Generate traffic to scaletest workspaces through coderd",
911+
Middleware: clibase.Chain(
912+
r.InitClient(client),
913+
),
914+
Handler: func(inv *clibase.Invocation) error {
915+
ctx := inv.Context()
916+
917+
// Bypass rate limiting
918+
client.HTTPClient = &http.Client{
919+
Transport: &headerTransport{
920+
transport: http.DefaultTransport,
921+
header: map[string][]string{
922+
codersdk.BypassRatelimitHeader: {"true"},
923+
},
924+
},
925+
}
926+
927+
workspaces, err := getScaletestWorkspaces(inv.Context(), client)
928+
if err != nil {
929+
return err
930+
}
931+
932+
if len(workspaces) == 0 {
933+
return xerrors.Errorf("no scaletest workspaces exist")
934+
}
935+
936+
tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx)
937+
if err != nil {
938+
return xerrors.Errorf("create tracer provider: %w", err)
939+
}
940+
defer func() {
941+
// Allow time for traces to flush even if command context is
942+
// canceled. This is a no-op if tracing is not enabled.
943+
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
944+
if err := closeTracing(ctx); err != nil {
945+
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
946+
}
947+
}()
948+
tracer := tracerProvider.Tracer(scaletestTracerName)
949+
950+
outputs, err := output.parse()
951+
if err != nil {
952+
return xerrors.Errorf("could not parse --output flags")
953+
}
954+
955+
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
956+
for idx, ws := range workspaces {
957+
var (
958+
agentID uuid.UUID
959+
name = "workspace-traffic"
960+
id = strconv.Itoa(idx)
961+
)
962+
963+
for _, res := range ws.LatestBuild.Resources {
964+
if len(res.Agents) == 0 {
965+
continue
966+
}
967+
agentID = res.Agents[0].ID
968+
}
969+
970+
if agentID == uuid.Nil {
971+
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name)
972+
continue
973+
}
974+
975+
// Setup our workspace agent connection.
976+
config := workspacetraffic.Config{
977+
AgentID: agentID,
978+
BytesPerTick: bytesPerTick,
979+
Duration: strategy.timeout,
980+
TickInterval: tickInterval,
981+
}
982+
983+
if err := config.Validate(); err != nil {
984+
return xerrors.Errorf("validate config: %w", err)
985+
}
986+
var runner harness.Runnable = workspacetraffic.NewRunner(client, config)
987+
if tracingEnabled {
988+
runner = &runnableTraceWrapper{
989+
tracer: tracer,
990+
spanName: fmt.Sprintf("%s/%s", name, id),
991+
runner: runner,
992+
}
993+
}
994+
995+
th.AddRun(name, id, runner)
996+
}
997+
998+
_, _ = fmt.Fprintln(inv.Stderr, "Running load test...")
999+
testCtx, testCancel := strategy.toContext(ctx)
1000+
defer testCancel()
1001+
err = th.Run(testCtx)
1002+
if err != nil {
1003+
return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err)
1004+
}
1005+
1006+
res := th.Results()
1007+
for _, o := range outputs {
1008+
err = o.write(res, inv.Stdout)
1009+
if err != nil {
1010+
return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err)
1011+
}
1012+
}
1013+
1014+
if res.TotalFail > 0 {
1015+
return xerrors.New("load test failed, see above for more details")
1016+
}
1017+
1018+
return nil
1019+
},
1020+
}
1021+
1022+
cmd.Options = []clibase.Option{
1023+
{
1024+
Flag: "bytes-per-tick",
1025+
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK",
1026+
Default: "1024",
1027+
Description: "How much traffic to generate per tick.",
1028+
Value: clibase.Int64Of(&bytesPerTick),
1029+
},
1030+
{
1031+
Flag: "tick-interval",
1032+
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL",
1033+
Default: "100ms",
1034+
Description: "How often to send traffic.",
1035+
Value: clibase.DurationOf(&tickInterval),
1036+
},
1037+
}
1038+
1039+
tracingFlags.attach(&cmd.Options)
1040+
strategy.attach(&cmd.Options)
1041+
cleanupStrategy.attach(&cmd.Options)
1042+
output.attach(&cmd.Options)
1043+
1044+
return cmd
1045+
}
1046+
9501047
type runnableTraceWrapper struct {
9511048
tracer trace.Tracer
9521049
spanName string
@@ -1023,3 +1120,72 @@ func isScaleTestWorkspace(workspace codersdk.Workspace) bool {
10231120
return strings.HasPrefix(workspace.OwnerName, "scaletest-") ||
10241121
strings.HasPrefix(workspace.Name, "scaletest-")
10251122
}
1123+
1124+
func getScaletestWorkspaces(ctx context.Context, client *codersdk.Client) ([]codersdk.Workspace, error) {
1125+
var (
1126+
pageNumber = 0
1127+
limit = 100
1128+
workspaces []codersdk.Workspace
1129+
)
1130+
1131+
for {
1132+
page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
1133+
Name: "scaletest-",
1134+
Offset: pageNumber * limit,
1135+
Limit: limit,
1136+
})
1137+
if err != nil {
1138+
return nil, xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err)
1139+
}
1140+
1141+
pageNumber++
1142+
if len(page.Workspaces) == 0 {
1143+
break
1144+
}
1145+
1146+
pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces))
1147+
for _, w := range page.Workspaces {
1148+
if isScaleTestWorkspace(w) {
1149+
pageWorkspaces = append(pageWorkspaces, w)
1150+
}
1151+
}
1152+
workspaces = append(workspaces, pageWorkspaces...)
1153+
}
1154+
return workspaces, nil
1155+
}
1156+
1157+
func getScaletestUsers(ctx context.Context, client *codersdk.Client) ([]codersdk.User, error) {
1158+
var (
1159+
pageNumber = 0
1160+
limit = 100
1161+
users []codersdk.User
1162+
)
1163+
1164+
for {
1165+
page, err := client.Users(ctx, codersdk.UsersRequest{
1166+
Search: "scaletest-",
1167+
Pagination: codersdk.Pagination{
1168+
Offset: pageNumber * limit,
1169+
Limit: limit,
1170+
},
1171+
})
1172+
if err != nil {
1173+
return nil, xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err)
1174+
}
1175+
1176+
pageNumber++
1177+
if len(page.Users) == 0 {
1178+
break
1179+
}
1180+
1181+
pageUsers := make([]codersdk.User, 0, len(page.Users))
1182+
for _, u := range page.Users {
1183+
if isScaleTestUser(u) {
1184+
pageUsers = append(pageUsers, u)
1185+
}
1186+
}
1187+
users = append(users, pageUsers...)
1188+
}
1189+
1190+
return users, nil
1191+
}

0 commit comments

Comments
 (0)