@@ -28,6 +28,7 @@ import (
28
28
"github.com/coder/coder/scaletest/harness"
29
29
"github.com/coder/coder/scaletest/reconnectingpty"
30
30
"github.com/coder/coder/scaletest/workspacebuild"
31
+ "github.com/coder/coder/scaletest/workspacetraffic"
31
32
)
32
33
33
34
const scaletestTracerName = "coder_scaletest"
@@ -42,6 +43,7 @@ func (r *RootCmd) scaletest() *clibase.Cmd {
42
43
Children : []* clibase.Cmd {
43
44
r .scaletestCleanup (),
44
45
r .scaletestCreateWorkspaces (),
46
+ r .scaletestWorkspaceTraffic (),
45
47
},
46
48
}
47
49
@@ -107,7 +109,10 @@ func (s *scaletestTracingFlags) provider(ctx context.Context) (trace.TracerProvi
107
109
return tracerProvider , func (ctx context.Context ) error {
108
110
var err error
109
111
closeTracingOnce .Do (func () {
110
- err = closeTracing (ctx )
112
+ // Allow time to upload traces even if ctx is canceled
113
+ traceCtx , traceCancel := context .WithTimeout (context .Background (), 10 * time .Second )
114
+ defer traceCancel ()
115
+ err = closeTracing (traceCtx )
111
116
})
112
117
113
118
return err
@@ -384,33 +389,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd {
384
389
}
385
390
386
391
cliui .Infof (inv .Stdout , "Fetching scaletest workspaces..." )
387
- var (
388
- pageNumber = 0
389
- limit = 100
390
- workspaces []codersdk.Workspace
391
- )
392
- for {
393
- page , err := client .Workspaces (ctx , codersdk.WorkspaceFilter {
394
- Name : "scaletest-" ,
395
- Offset : pageNumber * limit ,
396
- Limit : limit ,
397
- })
398
- if err != nil {
399
- return xerrors .Errorf ("fetch scaletest workspaces page %d: %w" , pageNumber , err )
400
- }
401
-
402
- pageNumber ++
403
- if len (page .Workspaces ) == 0 {
404
- break
405
- }
406
-
407
- pageWorkspaces := make ([]codersdk.Workspace , 0 , len (page .Workspaces ))
408
- for _ , w := range page .Workspaces {
409
- if isScaleTestWorkspace (w ) {
410
- pageWorkspaces = append (pageWorkspaces , w )
411
- }
412
- }
413
- workspaces = append (workspaces , pageWorkspaces ... )
392
+ workspaces , err := getScaletestWorkspaces (ctx , client )
393
+ if err != nil {
394
+ return err
414
395
}
415
396
416
397
cliui .Errorf (inv .Stderr , "Found %d scaletest workspaces\n " , len (workspaces ))
@@ -441,33 +422,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd {
441
422
}
442
423
443
424
cliui .Infof (inv .Stdout , "Fetching scaletest users..." )
444
- pageNumber = 0
445
- limit = 100
446
- var users []codersdk.User
447
- for {
448
- page , err := client .Users (ctx , codersdk.UsersRequest {
449
- Search : "scaletest-" ,
450
- Pagination : codersdk.Pagination {
451
- Offset : pageNumber * limit ,
452
- Limit : limit ,
453
- },
454
- })
455
- if err != nil {
456
- return xerrors .Errorf ("fetch scaletest users page %d: %w" , pageNumber , err )
457
- }
458
-
459
- pageNumber ++
460
- if len (page .Users ) == 0 {
461
- break
462
- }
463
-
464
- pageUsers := make ([]codersdk.User , 0 , len (page .Users ))
465
- for _ , u := range page .Users {
466
- if isScaleTestUser (u ) {
467
- pageUsers = append (pageUsers , u )
468
- }
469
- }
470
- users = append (users , pageUsers ... )
425
+ users , err := getScaletestUsers (ctx , client )
426
+ if err != nil {
427
+ return err
471
428
}
472
429
473
430
cliui .Errorf (inv .Stderr , "Found %d scaletest users\n " , len (users ))
@@ -683,10 +640,11 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
683
640
}
684
641
defer func () {
685
642
// Allow time for traces to flush even if command context is
686
- // canceled.
687
- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
688
- defer cancel ()
689
- _ = closeTracing (ctx )
643
+ // canceled. This is a no-op if tracing is not enabled.
644
+ _ , _ = fmt .Fprintln (inv .Stderr , "\n Uploading traces..." )
645
+ if err := closeTracing (ctx ); err != nil {
646
+ _ , _ = fmt .Fprintf (inv .Stderr , "\n Error uploading traces: %+v\n " , err )
647
+ }
690
648
}()
691
649
tracer := tracerProvider .Tracer (scaletestTracerName )
692
650
@@ -800,17 +758,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
800
758
return xerrors .Errorf ("cleanup tests: %w" , err )
801
759
}
802
760
803
- // Upload traces.
804
- if tracingEnabled {
805
- _ , _ = fmt .Fprintln (inv .Stderr , "\n Uploading traces..." )
806
- ctx , cancel := context .WithTimeout (ctx , 1 * time .Minute )
807
- defer cancel ()
808
- err := closeTracing (ctx )
809
- if err != nil {
810
- _ , _ = fmt .Fprintf (inv .Stderr , "\n Error uploading traces: %+v\n " , err )
811
- }
812
- }
813
-
814
761
if res .TotalFail > 0 {
815
762
return xerrors .New ("load test failed, see above for more details" )
816
763
}
@@ -947,6 +894,156 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
947
894
return cmd
948
895
}
949
896
897
+ func (r * RootCmd ) scaletestWorkspaceTraffic () * clibase.Cmd {
898
+ var (
899
+ tickInterval time.Duration
900
+ bytesPerTick int64
901
+ client = & codersdk.Client {}
902
+ tracingFlags = & scaletestTracingFlags {}
903
+ strategy = & scaletestStrategyFlags {}
904
+ cleanupStrategy = & scaletestStrategyFlags {cleanup : true }
905
+ output = & scaletestOutputFlags {}
906
+ )
907
+
908
+ cmd := & clibase.Cmd {
909
+ Use : "workspace-traffic" ,
910
+ Short : "Generate traffic to scaletest workspaces through coderd" ,
911
+ Middleware : clibase .Chain (
912
+ r .InitClient (client ),
913
+ ),
914
+ Handler : func (inv * clibase.Invocation ) error {
915
+ ctx := inv .Context ()
916
+
917
+ // Bypass rate limiting
918
+ client .HTTPClient = & http.Client {
919
+ Transport : & headerTransport {
920
+ transport : http .DefaultTransport ,
921
+ header : map [string ][]string {
922
+ codersdk .BypassRatelimitHeader : {"true" },
923
+ },
924
+ },
925
+ }
926
+
927
+ workspaces , err := getScaletestWorkspaces (inv .Context (), client )
928
+ if err != nil {
929
+ return err
930
+ }
931
+
932
+ if len (workspaces ) == 0 {
933
+ return xerrors .Errorf ("no scaletest workspaces exist" )
934
+ }
935
+
936
+ tracerProvider , closeTracing , tracingEnabled , err := tracingFlags .provider (ctx )
937
+ if err != nil {
938
+ return xerrors .Errorf ("create tracer provider: %w" , err )
939
+ }
940
+ defer func () {
941
+ // Allow time for traces to flush even if command context is
942
+ // canceled. This is a no-op if tracing is not enabled.
943
+ _ , _ = fmt .Fprintln (inv .Stderr , "\n Uploading traces..." )
944
+ if err := closeTracing (ctx ); err != nil {
945
+ _ , _ = fmt .Fprintf (inv .Stderr , "\n Error uploading traces: %+v\n " , err )
946
+ }
947
+ }()
948
+ tracer := tracerProvider .Tracer (scaletestTracerName )
949
+
950
+ outputs , err := output .parse ()
951
+ if err != nil {
952
+ return xerrors .Errorf ("could not parse --output flags" )
953
+ }
954
+
955
+ th := harness .NewTestHarness (strategy .toStrategy (), cleanupStrategy .toStrategy ())
956
+ for idx , ws := range workspaces {
957
+ var (
958
+ agentID uuid.UUID
959
+ name = "workspace-traffic"
960
+ id = strconv .Itoa (idx )
961
+ )
962
+
963
+ for _ , res := range ws .LatestBuild .Resources {
964
+ if len (res .Agents ) == 0 {
965
+ continue
966
+ }
967
+ agentID = res .Agents [0 ].ID
968
+ }
969
+
970
+ if agentID == uuid .Nil {
971
+ _ , _ = fmt .Fprintf (inv .Stderr , "WARN: skipping workspace %s: no agent\n " , ws .Name )
972
+ continue
973
+ }
974
+
975
+ // Setup our workspace agent connection.
976
+ config := workspacetraffic.Config {
977
+ AgentID : agentID ,
978
+ BytesPerTick : bytesPerTick ,
979
+ Duration : strategy .timeout ,
980
+ TickInterval : tickInterval ,
981
+ }
982
+
983
+ if err := config .Validate (); err != nil {
984
+ return xerrors .Errorf ("validate config: %w" , err )
985
+ }
986
+ var runner harness.Runnable = workspacetraffic .NewRunner (client , config )
987
+ if tracingEnabled {
988
+ runner = & runnableTraceWrapper {
989
+ tracer : tracer ,
990
+ spanName : fmt .Sprintf ("%s/%s" , name , id ),
991
+ runner : runner ,
992
+ }
993
+ }
994
+
995
+ th .AddRun (name , id , runner )
996
+ }
997
+
998
+ _ , _ = fmt .Fprintln (inv .Stderr , "Running load test..." )
999
+ testCtx , testCancel := strategy .toContext (ctx )
1000
+ defer testCancel ()
1001
+ err = th .Run (testCtx )
1002
+ if err != nil {
1003
+ return xerrors .Errorf ("run test harness (harness failure, not a test failure): %w" , err )
1004
+ }
1005
+
1006
+ res := th .Results ()
1007
+ for _ , o := range outputs {
1008
+ err = o .write (res , inv .Stdout )
1009
+ if err != nil {
1010
+ return xerrors .Errorf ("write output %q to %q: %w" , o .format , o .path , err )
1011
+ }
1012
+ }
1013
+
1014
+ if res .TotalFail > 0 {
1015
+ return xerrors .New ("load test failed, see above for more details" )
1016
+ }
1017
+
1018
+ return nil
1019
+ },
1020
+ }
1021
+
1022
+ cmd .Options = []clibase.Option {
1023
+ {
1024
+ Flag : "bytes-per-tick" ,
1025
+ Env : "CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK" ,
1026
+ Default : "1024" ,
1027
+ Description : "How much traffic to generate per tick." ,
1028
+ Value : clibase .Int64Of (& bytesPerTick ),
1029
+ },
1030
+ {
1031
+ Flag : "tick-interval" ,
1032
+ Env : "CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL" ,
1033
+ Default : "100ms" ,
1034
+ Description : "How often to send traffic." ,
1035
+ Value : clibase .DurationOf (& tickInterval ),
1036
+ },
1037
+ }
1038
+
1039
+ tracingFlags .attach (& cmd .Options )
1040
+ strategy .attach (& cmd .Options )
1041
+ cleanupStrategy .attach (& cmd .Options )
1042
+ output .attach (& cmd .Options )
1043
+
1044
+ return cmd
1045
+ }
1046
+
950
1047
type runnableTraceWrapper struct {
951
1048
tracer trace.Tracer
952
1049
spanName string
@@ -1023,3 +1120,72 @@ func isScaleTestWorkspace(workspace codersdk.Workspace) bool {
1023
1120
return strings .HasPrefix (workspace .OwnerName , "scaletest-" ) ||
1024
1121
strings .HasPrefix (workspace .Name , "scaletest-" )
1025
1122
}
1123
+
1124
+ func getScaletestWorkspaces (ctx context.Context , client * codersdk.Client ) ([]codersdk.Workspace , error ) {
1125
+ var (
1126
+ pageNumber = 0
1127
+ limit = 100
1128
+ workspaces []codersdk.Workspace
1129
+ )
1130
+
1131
+ for {
1132
+ page , err := client .Workspaces (ctx , codersdk.WorkspaceFilter {
1133
+ Name : "scaletest-" ,
1134
+ Offset : pageNumber * limit ,
1135
+ Limit : limit ,
1136
+ })
1137
+ if err != nil {
1138
+ return nil , xerrors .Errorf ("fetch scaletest workspaces page %d: %w" , pageNumber , err )
1139
+ }
1140
+
1141
+ pageNumber ++
1142
+ if len (page .Workspaces ) == 0 {
1143
+ break
1144
+ }
1145
+
1146
+ pageWorkspaces := make ([]codersdk.Workspace , 0 , len (page .Workspaces ))
1147
+ for _ , w := range page .Workspaces {
1148
+ if isScaleTestWorkspace (w ) {
1149
+ pageWorkspaces = append (pageWorkspaces , w )
1150
+ }
1151
+ }
1152
+ workspaces = append (workspaces , pageWorkspaces ... )
1153
+ }
1154
+ return workspaces , nil
1155
+ }
1156
+
1157
+ func getScaletestUsers (ctx context.Context , client * codersdk.Client ) ([]codersdk.User , error ) {
1158
+ var (
1159
+ pageNumber = 0
1160
+ limit = 100
1161
+ users []codersdk.User
1162
+ )
1163
+
1164
+ for {
1165
+ page , err := client .Users (ctx , codersdk.UsersRequest {
1166
+ Search : "scaletest-" ,
1167
+ Pagination : codersdk.Pagination {
1168
+ Offset : pageNumber * limit ,
1169
+ Limit : limit ,
1170
+ },
1171
+ })
1172
+ if err != nil {
1173
+ return nil , xerrors .Errorf ("fetch scaletest users page %d: %w" , pageNumber , err )
1174
+ }
1175
+
1176
+ pageNumber ++
1177
+ if len (page .Users ) == 0 {
1178
+ break
1179
+ }
1180
+
1181
+ pageUsers := make ([]codersdk.User , 0 , len (page .Users ))
1182
+ for _ , u := range page .Users {
1183
+ if isScaleTestUser (u ) {
1184
+ pageUsers = append (pageUsers , u )
1185
+ }
1186
+ }
1187
+ users = append (users , pageUsers ... )
1188
+ }
1189
+
1190
+ return users , nil
1191
+ }
0 commit comments