From 220edbfeebe582eb903791160b6e0b549386c032 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 18 Apr 2023 16:42:05 +0100 Subject: [PATCH 01/25] feat(cli): add trafficgen command for load testing --- cli/root.go | 1 + cli/scaletest_test.go | 2 +- cli/trafficgen.go | 216 +++++++++++++++++++++++++++++++++ cli/trafficgen_test.go | 90 ++++++++++++++ codersdk/workspaceagentconn.go | 6 +- 5 files changed, 311 insertions(+), 4 deletions(-) create mode 100644 cli/trafficgen.go create mode 100644 cli/trafficgen_test.go diff --git a/cli/root.go b/cli/root.go index aa61a4eb571fe..30b89461e7446 100644 --- a/cli/root.go +++ b/cli/root.go @@ -105,6 +105,7 @@ func (r *RootCmd) Core() []*clibase.Cmd { // Hidden r.workspaceAgent(), r.scaletest(), + r.trafficGen(), r.gitssh(), r.vscodeSSH(), } diff --git a/cli/scaletest_test.go b/cli/scaletest_test.go index 3636b8ef40dc4..49b6ac467b986 100644 --- a/cli/scaletest_test.go +++ b/cli/scaletest_test.go @@ -19,7 +19,7 @@ import ( ) func TestScaleTest(t *testing.T) { - t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") + // t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") t.Parallel() // This test does a create-workspaces scale test with --no-cleanup, checks diff --git a/cli/trafficgen.go b/cli/trafficgen.go new file mode 100644 index 0000000000000..aaa0b7b31de17 --- /dev/null +++ b/cli/trafficgen.go @@ -0,0 +1,216 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "github.com/coder/coder/cli/clibase" + "github.com/coder/coder/codersdk" + "github.com/coder/coder/cryptorand" + "github.com/google/uuid" + "golang.org/x/xerrors" + "io" + "sync/atomic" + "time" +) + +func (r *RootCmd) trafficGen() *clibase.Cmd { + var ( + duration time.Duration + bps int64 + client = new(codersdk.Client) + ) + + cmd := &clibase.Cmd{ + Use: "trafficgen", + Hidden: true, + Short: "Generate traffic to a Coder workspace", + Middleware: clibase.Chain( + clibase.RequireRangeArgs(1, 2), + r.InitClient(client), + ), + Handler: func(inv *clibase.Invocation) error { + var agentName string + ws, err := namedWorkspace(inv.Context(), client, inv.Args[0]) + if err != nil { + return err + } + + var agentID uuid.UUID + for _, res := range ws.LatestBuild.Resources { + if len(res.Agents) == 0 { + continue + } + if agentName != "" && agentName != res.Agents[0].Name { + continue + } + agentID = res.Agents[0].ID + } + + if agentID == uuid.Nil { + return xerrors.Errorf("no agent found for workspace %s", ws.Name) + } + + reconnect := uuid.New() + conn, err := client.WorkspaceAgentReconnectingPTY(inv.Context(), codersdk.WorkspaceAgentReconnectingPTYOpts{ + AgentID: agentID, + Reconnect: reconnect, + Height: 65535, + Width: 65535, + Command: "/bin/sh", + }) + if err != nil { + return xerrors.Errorf("connect to workspace: %w", err) + } + + defer func() { + _ = conn.Close() + }() + start := time.Now() + ctx, cancel := context.WithDeadline(inv.Context(), start.Add(duration)) + defer cancel() + crw := countReadWriter{ReadWriter: conn} + // First, write a comment to the pty so we don't execute anything. + data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ + Data: "#", + }) + if err != nil { + return xerrors.Errorf("write comment to pty: %w", err) + } + _, err = crw.Write(data) + // Now we begin writing random data to the pty. + writeSize := int(bps / 10) + rch := make(chan error) + wch := make(chan error) + go func() { + rch <- readForever(ctx, &crw) + close(rch) + }() + go func() { + wch <- writeRandomData(ctx, &crw, writeSize, 100*time.Millisecond) + close(wch) + }() + + if rErr := <-rch; rErr != nil { + return xerrors.Errorf("read from pty: %w", rErr) + } + if wErr := <-wch; wErr != nil { + return xerrors.Errorf("write to pty: %w", wErr) + } + + _, _ = fmt.Fprintf(inv.Stdout, "Test results:\n") + _, _ = fmt.Fprintf(inv.Stdout, "Took: %.2fs\n", time.Since(start).Seconds()) + _, _ = fmt.Fprintf(inv.Stdout, "Sent: %d bytes\n", crw.BytesWritten()) + _, _ = fmt.Fprintf(inv.Stdout, "Rcvd: %d bytes\n", crw.BytesRead()) + return nil + }, + } + + cmd.Options = []clibase.Option{ + { + Flag: "duration", + Env: "CODER_TRAFFICGEN_DURATION", + Default: "10s", + Description: "How long to generate traffic for.", + Value: clibase.DurationOf(&duration), + }, + { + Flag: "bps", + Env: "CODER_TRAFFICGEN_BPS", + Default: "1024", + Description: "How much traffic to generate in bytes per second.", + Value: clibase.Int64Of(&bps), + }, + } + + return cmd +} + +func readForever(ctx context.Context, src io.Reader) error { + buf := make([]byte, 1024) + for { + select { + case <-ctx.Done(): + return nil + default: + _, err := src.Read(buf) + if err != nil && err != io.EOF { + return err + } + } + } +} + +func writeRandomData(ctx context.Context, dst io.Writer, size int, period time.Duration) error { + tick := time.NewTicker(period) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-tick.C: + randStr, err := cryptorand.String(size) + if err != nil { + return err + } + data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ + Data: randStr, + }) + if err != nil { + return err + } + err = copyContext(ctx, dst, data) + if err != nil { + return err + } + } + } +} + +func copyContext(ctx context.Context, dst io.Writer, src []byte) error { + for idx := range src { + select { + case <-ctx.Done(): + return nil + default: + _, err := dst.Write(src[idx : idx+1]) + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } + } + return nil +} + +type countReadWriter struct { + io.ReadWriter + bytesRead atomic.Int64 + bytesWritten atomic.Int64 +} + +func (w *countReadWriter) Read(p []byte) (int, error) { + n, err := w.ReadWriter.Read(p) + if err == nil { + w.bytesRead.Add(int64(n)) + } + return n, err +} + +func (w *countReadWriter) Write(p []byte) (int, error) { + n, err := w.ReadWriter.Write(p) + if err == nil { + w.bytesWritten.Add(int64(n)) + } + return n, err +} + +func (w *countReadWriter) BytesRead() int64 { + return w.bytesRead.Load() +} + +func (w *countReadWriter) BytesWritten() int64 { + return w.bytesWritten.Load() +} diff --git a/cli/trafficgen_test.go b/cli/trafficgen_test.go new file mode 100644 index 0000000000000..1b337d7c05d41 --- /dev/null +++ b/cli/trafficgen_test.go @@ -0,0 +1,90 @@ +package cli_test + +import ( + "bytes" + "context" + "github.com/coder/coder/agent" + "github.com/coder/coder/cli/clitest" + "github.com/coder/coder/coderd/coderdtest" + "github.com/coder/coder/codersdk/agentsdk" + "github.com/coder/coder/provisioner/echo" + "github.com/coder/coder/provisionersdk/proto" + "github.com/coder/coder/testutil" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "strings" + "testing" +) + +// This test pretends to stand up a workspace and run a no-op traffic generation test. +// It's not a real test, but it's useful for debugging. +// We do not perform any cleanup. +func TestTrafficGen(t *testing.T) { + t.Parallel() + + ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium) + defer cancelFunc() + + client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) + user := coderdtest.CreateFirstUser(t, client) + + authToken := uuid.NewString() + version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ + Parse: echo.ParseComplete, + ProvisionPlan: echo.ProvisionComplete, + ProvisionApply: []*proto.Provision_Response{{ + Type: &proto.Provision_Response_Complete{ + Complete: &proto.Provision_Complete{ + Resources: []*proto.Resource{{ + Name: "example", + Type: "aws_instance", + Agents: []*proto.Agent{{ + Id: uuid.NewString(), + Name: "agent", + Auth: &proto.Agent_Token{ + Token: authToken, + }, + Apps: []*proto.App{}, + }}, + }}, + }, + }, + }}, + }) + template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) + coderdtest.AwaitTemplateVersionJob(t, client, version.ID) + + ws := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) + coderdtest.AwaitWorkspaceBuildJob(t, client, ws.LatestBuild.ID) + + agentClient := agentsdk.New(client.URL) + agentClient.SetSessionToken(authToken) + agentCloser := agent.New(agent.Options{ + Client: agentClient, + }) + t.Cleanup(func() { + _ = agentCloser.Close() + }) + + coderdtest.AwaitWorkspaceAgents(t, client, ws.ID) + + inv, root := clitest.New(t, "trafficgen", ws.Name, + "--duration", "1s", + "--bps", "100", + ) + clitest.SetupConfig(t, client, root) + var stdout, stderr bytes.Buffer + inv.Stdout = &stdout + inv.Stderr = &stderr + err := inv.WithContext(ctx).Run() + require.NoError(t, err) + stdoutStr := stdout.String() + stderrStr := stderr.String() + require.Empty(t, stderrStr) + lines := strings.Split(strings.TrimSpace(stdoutStr), "\n") + require.Len(t, lines, 4) + require.Equal(t, "Test results:", lines[0]) + require.Regexp(t, `Took:\s+\d+\.\d+s`, lines[1]) + require.Regexp(t, `Sent:\s+\d+ bytes`, lines[2]) + require.Regexp(t, `Rcvd:\s+\d+ bytes`, lines[3]) +} diff --git a/codersdk/workspaceagentconn.go b/codersdk/workspaceagentconn.go index 0095ac0e13426..64bd4fe2f8bfa 100644 --- a/codersdk/workspaceagentconn.go +++ b/codersdk/workspaceagentconn.go @@ -165,9 +165,9 @@ type WorkspaceAgentReconnectingPTYInit struct { // to pipe data to a PTY. // @typescript-ignore ReconnectingPTYRequest type ReconnectingPTYRequest struct { - Data string `json:"data"` - Height uint16 `json:"height"` - Width uint16 `json:"width"` + Data string `json:"data,omitempty"` + Height uint16 `json:"height,omitempty"` + Width uint16 `json:"width,omitempty"` } // ReconnectingPTY spawns a new reconnecting terminal session. From 737b4758ae325b8065086072b065ff9ee010e609 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 27 Apr 2023 15:17:46 +0100 Subject: [PATCH 02/25] skip test for now --- cli/trafficgen_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/trafficgen_test.go b/cli/trafficgen_test.go index 1b337d7c05d41..0771408c37a6c 100644 --- a/cli/trafficgen_test.go +++ b/cli/trafficgen_test.go @@ -21,6 +21,7 @@ import ( // We do not perform any cleanup. func TestTrafficGen(t *testing.T) { t.Parallel() + t.Skip("TODO: this hangs in a unit test but works in the real world.") ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium) defer cancelFunc() From 9b2658739882c8938fa3094b45d05147ef0103e5 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 27 Apr 2023 15:29:56 +0100 Subject: [PATCH 03/25] make fmt --- cli/trafficgen.go | 7 ++++--- cli/trafficgen_test.go | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cli/trafficgen.go b/cli/trafficgen.go index aaa0b7b31de17..01cc2d2d74ce8 100644 --- a/cli/trafficgen.go +++ b/cli/trafficgen.go @@ -4,14 +4,15 @@ import ( "context" "encoding/json" "fmt" + "io" + "sync/atomic" + "time" + "github.com/coder/coder/cli/clibase" "github.com/coder/coder/codersdk" "github.com/coder/coder/cryptorand" "github.com/google/uuid" "golang.org/x/xerrors" - "io" - "sync/atomic" - "time" ) func (r *RootCmd) trafficGen() *clibase.Cmd { diff --git a/cli/trafficgen_test.go b/cli/trafficgen_test.go index 0771408c37a6c..228008bbf55ae 100644 --- a/cli/trafficgen_test.go +++ b/cli/trafficgen_test.go @@ -3,6 +3,9 @@ package cli_test import ( "bytes" "context" + "strings" + "testing" + "github.com/coder/coder/agent" "github.com/coder/coder/cli/clitest" "github.com/coder/coder/coderd/coderdtest" @@ -12,8 +15,6 @@ import ( "github.com/coder/coder/testutil" "github.com/google/uuid" "github.com/stretchr/testify/require" - "strings" - "testing" ) // This test pretends to stand up a workspace and run a no-op traffic generation test. From c56d84e3f15200a297f61e077aba5a71ca21cf7b Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 27 Apr 2023 15:35:11 +0100 Subject: [PATCH 04/25] lint --- cli/trafficgen.go | 12 ++++++++---- cli/trafficgen_test.go | 5 +++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/cli/trafficgen.go b/cli/trafficgen.go index 01cc2d2d74ce8..67090038ef204 100644 --- a/cli/trafficgen.go +++ b/cli/trafficgen.go @@ -8,11 +8,12 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" + "golang.org/x/xerrors" + "github.com/coder/coder/cli/clibase" "github.com/coder/coder/codersdk" "github.com/coder/coder/cryptorand" - "github.com/google/uuid" - "golang.org/x/xerrors" ) func (r *RootCmd) trafficGen() *clibase.Cmd { @@ -76,9 +77,12 @@ func (r *RootCmd) trafficGen() *clibase.Cmd { Data: "#", }) if err != nil { - return xerrors.Errorf("write comment to pty: %w", err) + return xerrors.Errorf("serialize request: %w", err) } _, err = crw.Write(data) + if err != nil { + return xerrors.Errorf("write comment to pty: %w", err) + } // Now we begin writing random data to the pty. writeSize := int(bps / 10) rch := make(chan error) @@ -176,7 +180,7 @@ func copyContext(ctx context.Context, dst io.Writer, src []byte) error { default: _, err := dst.Write(src[idx : idx+1]) if err != nil { - if err == io.EOF { + if xerrors.Is(err, io.EOF) { return nil } return err diff --git a/cli/trafficgen_test.go b/cli/trafficgen_test.go index 228008bbf55ae..c04e39d8726c3 100644 --- a/cli/trafficgen_test.go +++ b/cli/trafficgen_test.go @@ -6,6 +6,9 @@ import ( "strings" "testing" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/coder/coder/agent" "github.com/coder/coder/cli/clitest" "github.com/coder/coder/coderd/coderdtest" @@ -13,8 +16,6 @@ import ( "github.com/coder/coder/provisioner/echo" "github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/testutil" - "github.com/google/uuid" - "github.com/stretchr/testify/require" ) // This test pretends to stand up a workspace and run a no-op traffic generation test. From e5488922db334554a8a0f3b47c8122ee3f9de23c Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 27 Apr 2023 16:48:20 +0100 Subject: [PATCH 05/25] swap order of waiting for read and write --- cli/trafficgen.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cli/trafficgen.go b/cli/trafficgen.go index 67090038ef204..10817b6a850b6 100644 --- a/cli/trafficgen.go +++ b/cli/trafficgen.go @@ -96,12 +96,12 @@ func (r *RootCmd) trafficGen() *clibase.Cmd { close(wch) }() - if rErr := <-rch; rErr != nil { - return xerrors.Errorf("read from pty: %w", rErr) - } if wErr := <-wch; wErr != nil { return xerrors.Errorf("write to pty: %w", wErr) } + if rErr := <-rch; rErr != nil { + return xerrors.Errorf("read from pty: %w", rErr) + } _, _ = fmt.Fprintf(inv.Stdout, "Test results:\n") _, _ = fmt.Fprintf(inv.Stdout, "Took: %.2fs\n", time.Since(start).Seconds()) From 31ef7435fe6b7e466865a39344e64d9f311d1fa8 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 2 May 2023 13:29:18 +0100 Subject: [PATCH 06/25] close connection, add output formatting --- cli/trafficgen.go | 145 ++++++++++++++++++++++++++++------------- cli/trafficgen_test.go | 23 +++---- 2 files changed, 112 insertions(+), 56 deletions(-) diff --git a/cli/trafficgen.go b/cli/trafficgen.go index 10817b6a850b6..5eb4db3ac9160 100644 --- a/cli/trafficgen.go +++ b/cli/trafficgen.go @@ -12,15 +12,32 @@ import ( "golang.org/x/xerrors" "github.com/coder/coder/cli/clibase" + "github.com/coder/coder/cli/cliui" "github.com/coder/coder/codersdk" "github.com/coder/coder/cryptorand" ) +type trafficGenOutput struct { + DurationSeconds float64 `json:"duration_s"` + SentBytes int64 `json:"sent_bytes"` + RcvdBytes int64 `json:"rcvd_bytes"` +} + +func (o trafficGenOutput) String() string { + return fmt.Sprintf("Duration: %.2fs\n", o.DurationSeconds) + + fmt.Sprintf("Sent: %dB\n", o.SentBytes) + + fmt.Sprintf("Rcvd: %dB", o.RcvdBytes) +} + func (r *RootCmd) trafficGen() *clibase.Cmd { var ( - duration time.Duration - bps int64 - client = new(codersdk.Client) + duration time.Duration + formatter = cliui.NewOutputFormatter( + cliui.TextFormat(), + cliui.JSONFormat(), + ) + bps int64 + client = new(codersdk.Client) ) cmd := &clibase.Cmd{ @@ -32,7 +49,10 @@ func (r *RootCmd) trafficGen() *clibase.Cmd { r.InitClient(client), ), Handler: func(inv *clibase.Invocation) error { - var agentName string + var ( + agentName string + tickInterval = 100 * time.Millisecond + ) ws, err := namedWorkspace(inv.Context(), client, inv.Args[0]) if err != nil { return err @@ -53,6 +73,7 @@ func (r *RootCmd) trafficGen() *clibase.Cmd { return xerrors.Errorf("no agent found for workspace %s", ws.Name) } + // Setup our workspace agent connection. reconnect := uuid.New() conn, err := client.WorkspaceAgentReconnectingPTY(inv.Context(), codersdk.WorkspaceAgentReconnectingPTYOpts{ AgentID: agentID, @@ -68,34 +89,38 @@ func (r *RootCmd) trafficGen() *clibase.Cmd { defer func() { _ = conn.Close() }() + + // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. + crw := countReadWriter{ReadWriter: conn} + + // Set a deadline for stopping the text. start := time.Now() - ctx, cancel := context.WithDeadline(inv.Context(), start.Add(duration)) + deadlineCtx, cancel := context.WithDeadline(inv.Context(), start.Add(duration)) defer cancel() - crw := countReadWriter{ReadWriter: conn} - // First, write a comment to the pty so we don't execute anything. - data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ - Data: "#", - }) - if err != nil { - return xerrors.Errorf("serialize request: %w", err) - } - _, err = crw.Write(data) - if err != nil { - return xerrors.Errorf("write comment to pty: %w", err) - } + + // Create a ticker for sending data to the PTY. + tick := time.NewTicker(tickInterval) + defer tick.Stop() + // Now we begin writing random data to the pty. writeSize := int(bps / 10) rch := make(chan error) wch := make(chan error) + + // Read forever in the background. go func() { - rch <- readForever(ctx, &crw) + rch <- readContext(deadlineCtx, &crw, writeSize*2) + conn.Close() close(rch) }() + + // Write random data to the PTY every tick. go func() { - wch <- writeRandomData(ctx, &crw, writeSize, 100*time.Millisecond) + wch <- writeRandomData(deadlineCtx, &crw, writeSize, tick.C) close(wch) }() + // Wait for both our reads and writes to be finished. if wErr := <-wch; wErr != nil { return xerrors.Errorf("write to pty: %w", wErr) } @@ -103,11 +128,21 @@ func (r *RootCmd) trafficGen() *clibase.Cmd { return xerrors.Errorf("read from pty: %w", rErr) } - _, _ = fmt.Fprintf(inv.Stdout, "Test results:\n") - _, _ = fmt.Fprintf(inv.Stdout, "Took: %.2fs\n", time.Since(start).Seconds()) - _, _ = fmt.Fprintf(inv.Stdout, "Sent: %d bytes\n", crw.BytesWritten()) - _, _ = fmt.Fprintf(inv.Stdout, "Rcvd: %d bytes\n", crw.BytesRead()) - return nil + duration := time.Since(start) + + results := trafficGenOutput{ + DurationSeconds: duration.Seconds(), + SentBytes: crw.BytesWritten(), + RcvdBytes: crw.BytesRead(), + } + + out, err := formatter.Format(inv.Context(), results) + if err != nil { + return err + } + + _, err = fmt.Fprintln(inv.Stdout, out) + return err }, } @@ -128,66 +163,78 @@ func (r *RootCmd) trafficGen() *clibase.Cmd { }, } + formatter.AttachOptions(&cmd.Options) return cmd } -func readForever(ctx context.Context, src io.Reader) error { - buf := make([]byte, 1024) +func readContext(ctx context.Context, src io.Reader, bufSize int) error { + buf := make([]byte, bufSize) for { select { case <-ctx.Done(): return nil default: + if ctx.Err() != nil { + return nil + } _, err := src.Read(buf) - if err != nil && err != io.EOF { + if err != nil { + if xerrors.Is(err, io.EOF) { + return nil + } return err } } } } -func writeRandomData(ctx context.Context, dst io.Writer, size int, period time.Duration) error { - tick := time.NewTicker(period) - defer tick.Stop() +func writeRandomData(ctx context.Context, dst io.Writer, size int, tick <-chan time.Time) error { for { select { case <-ctx.Done(): return nil - case <-tick.C: - randStr, err := cryptorand.String(size) - if err != nil { - return err - } + case <-tick: + payload := "#" + mustRandStr(size-1) data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ - Data: randStr, + Data: payload, }) if err != nil { return err } - err = copyContext(ctx, dst, data) - if err != nil { + if _, err := copyContext(ctx, dst, data); err != nil { return err } } } } -func copyContext(ctx context.Context, dst io.Writer, src []byte) error { - for idx := range src { +// copyContext copies from src to dst until ctx is canceled. +func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) { + var count int + for { select { case <-ctx.Done(): - return nil + return count, nil default: - _, err := dst.Write(src[idx : idx+1]) + if ctx.Err() != nil { + return count, nil + } + n, err := dst.Write(src) if err != nil { if xerrors.Is(err, io.EOF) { - return nil + // On an EOF, assume that all of src was consumed. + return len(src), nil } - return err + return count, err + } + count += n + if n == len(src) { + return count, nil } + // Not all of src was consumed. Update src and retry. + src = src[n:] } } - return nil } type countReadWriter struct { @@ -219,3 +266,11 @@ func (w *countReadWriter) BytesRead() int64 { func (w *countReadWriter) BytesWritten() int64 { return w.bytesWritten.Load() } + +func mustRandStr(len int) string { + randStr, err := cryptorand.String(len) + if err != nil { + panic(err) + } + return randStr +} diff --git a/cli/trafficgen_test.go b/cli/trafficgen_test.go index c04e39d8726c3..cff86a813d4fd 100644 --- a/cli/trafficgen_test.go +++ b/cli/trafficgen_test.go @@ -3,7 +3,7 @@ package cli_test import ( "bytes" "context" - "strings" + "encoding/json" "testing" "github.com/google/uuid" @@ -23,7 +23,6 @@ import ( // We do not perform any cleanup. func TestTrafficGen(t *testing.T) { t.Parallel() - t.Skip("TODO: this hangs in a unit test but works in the real world.") ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium) defer cancelFunc() @@ -74,6 +73,7 @@ func TestTrafficGen(t *testing.T) { inv, root := clitest.New(t, "trafficgen", ws.Name, "--duration", "1s", "--bps", "100", + "-o", "json", ) clitest.SetupConfig(t, client, root) var stdout, stderr bytes.Buffer @@ -81,13 +81,14 @@ func TestTrafficGen(t *testing.T) { inv.Stderr = &stderr err := inv.WithContext(ctx).Run() require.NoError(t, err) - stdoutStr := stdout.String() - stderrStr := stderr.String() - require.Empty(t, stderrStr) - lines := strings.Split(strings.TrimSpace(stdoutStr), "\n") - require.Len(t, lines, 4) - require.Equal(t, "Test results:", lines[0]) - require.Regexp(t, `Took:\s+\d+\.\d+s`, lines[1]) - require.Regexp(t, `Sent:\s+\d+ bytes`, lines[2]) - require.Regexp(t, `Rcvd:\s+\d+ bytes`, lines[3]) + // TODO: this struct is currently unexported. Put it somewhere better. + var output struct { + DurationSeconds float64 `json:"duration_s"` + SentBytes int64 `json:"sent_bytes"` + RcvdBytes int64 `json:"rcvd_bytes"` + } + require.NoError(t, json.Unmarshal(stdout.Bytes(), &output)) + require.NotZero(t, output.DurationSeconds) + require.NotZero(t, output.SentBytes) + require.NotZero(t, output.RcvdBytes) } From fafca95777a3450ec37f16792c41365d4d0c24fa Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 2 May 2023 13:29:32 +0100 Subject: [PATCH 07/25] do what the comment says --- cli/root.go | 10 +++++----- cli/scaletest_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cli/root.go b/cli/root.go index 30b89461e7446..5ce0d58366727 100644 --- a/cli/root.go +++ b/cli/root.go @@ -87,11 +87,12 @@ func (r *RootCmd) Core() []*clibase.Cmd { // Workspace Commands r.configSSH(), - r.rename(), - r.ping(), r.create(), r.deleteWorkspace(), r.list(), + r.parameters(), + r.ping(), + r.rename(), r.schedules(), r.show(), r.speedtest(), @@ -100,14 +101,13 @@ func (r *RootCmd) Core() []*clibase.Cmd { r.stop(), r.update(), r.restart(), - r.parameters(), // Hidden - r.workspaceAgent(), + r.gitssh(), r.scaletest(), r.trafficGen(), - r.gitssh(), r.vscodeSSH(), + r.workspaceAgent(), } } diff --git a/cli/scaletest_test.go b/cli/scaletest_test.go index 49b6ac467b986..3636b8ef40dc4 100644 --- a/cli/scaletest_test.go +++ b/cli/scaletest_test.go @@ -19,7 +19,7 @@ import ( ) func TestScaleTest(t *testing.T) { - // t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") + t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") t.Parallel() // This test does a create-workspaces scale test with --no-cleanup, checks From 65c6d881c08e9b2f001c726ec7cbe6d7b33b5039 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 2 May 2023 14:29:11 +0100 Subject: [PATCH 08/25] move back under scaletest cmd --- cli/root.go | 1 - cli/scaletest.go | 260 ++++++++++++++++++++++++++++++++++++++ cli/scaletest_test.go | 85 ++++++++++++- cli/trafficgen.go | 276 ----------------------------------------- cli/trafficgen_test.go | 94 -------------- 5 files changed, 343 insertions(+), 373 deletions(-) delete mode 100644 cli/trafficgen.go delete mode 100644 cli/trafficgen_test.go diff --git a/cli/root.go b/cli/root.go index 5ce0d58366727..0ea0da1b4b2c9 100644 --- a/cli/root.go +++ b/cli/root.go @@ -105,7 +105,6 @@ func (r *RootCmd) Core() []*clibase.Cmd { // Hidden r.gitssh(), r.scaletest(), - r.trafficGen(), r.vscodeSSH(), r.workspaceAgent(), } diff --git a/cli/scaletest.go b/cli/scaletest.go index be3eb22ac6a27..62f6435936e66 100644 --- a/cli/scaletest.go +++ b/cli/scaletest.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -42,6 +43,7 @@ func (r *RootCmd) scaletest() *clibase.Cmd { Children: []*clibase.Cmd{ r.scaletestCleanup(), r.scaletestCreateWorkspaces(), + r.scaletestTrafficGen(), }, } @@ -947,6 +949,156 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { return cmd } +type trafficGenOutput struct { + DurationSeconds float64 `json:"duration_s"` + SentBytes int64 `json:"sent_bytes"` + RcvdBytes int64 `json:"rcvd_bytes"` +} + +func (o trafficGenOutput) String() string { + return fmt.Sprintf("Duration: %.2fs\n", o.DurationSeconds) + + fmt.Sprintf("Sent: %dB\n", o.SentBytes) + + fmt.Sprintf("Rcvd: %dB", o.RcvdBytes) +} + +func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { + var ( + duration time.Duration + formatter = cliui.NewOutputFormatter( + cliui.TextFormat(), + cliui.JSONFormat(), + ) + bps int64 + client = new(codersdk.Client) + ) + + cmd := &clibase.Cmd{ + Use: "trafficgen", + Hidden: true, + Short: "Generate traffic to a Coder workspace", + Middleware: clibase.Chain( + clibase.RequireRangeArgs(1, 2), + r.InitClient(client), + ), + Handler: func(inv *clibase.Invocation) error { + var ( + agentName string + tickInterval = 100 * time.Millisecond + ) + ws, err := namedWorkspace(inv.Context(), client, inv.Args[0]) + if err != nil { + return err + } + + var agentID uuid.UUID + for _, res := range ws.LatestBuild.Resources { + if len(res.Agents) == 0 { + continue + } + if agentName != "" && agentName != res.Agents[0].Name { + continue + } + agentID = res.Agents[0].ID + } + + if agentID == uuid.Nil { + return xerrors.Errorf("no agent found for workspace %s", ws.Name) + } + + // Setup our workspace agent connection. + reconnect := uuid.New() + conn, err := client.WorkspaceAgentReconnectingPTY(inv.Context(), codersdk.WorkspaceAgentReconnectingPTYOpts{ + AgentID: agentID, + Reconnect: reconnect, + Height: 65535, + Width: 65535, + Command: "/bin/sh", + }) + if err != nil { + return xerrors.Errorf("connect to workspace: %w", err) + } + + defer func() { + _ = conn.Close() + }() + + // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. + crw := countReadWriter{ReadWriter: conn} + + // Set a deadline for stopping the text. + start := time.Now() + deadlineCtx, cancel := context.WithDeadline(inv.Context(), start.Add(duration)) + defer cancel() + + // Create a ticker for sending data to the PTY. + tick := time.NewTicker(tickInterval) + defer tick.Stop() + + // Now we begin writing random data to the pty. + writeSize := int(bps / 10) + rch := make(chan error) + wch := make(chan error) + + // Read forever in the background. + go func() { + rch <- readContext(deadlineCtx, &crw, writeSize*2) + conn.Close() + close(rch) + }() + + // Write random data to the PTY every tick. + go func() { + wch <- writeRandomData(deadlineCtx, &crw, writeSize, tick.C) + close(wch) + }() + + // Wait for both our reads and writes to be finished. + if wErr := <-wch; wErr != nil { + return xerrors.Errorf("write to pty: %w", wErr) + } + if rErr := <-rch; rErr != nil { + return xerrors.Errorf("read from pty: %w", rErr) + } + + duration := time.Since(start) + + results := trafficGenOutput{ + DurationSeconds: duration.Seconds(), + SentBytes: crw.BytesWritten(), + RcvdBytes: crw.BytesRead(), + } + + out, err := formatter.Format(inv.Context(), results) + if err != nil { + return err + } + + _, err = fmt.Fprintln(inv.Stdout, out) + return err + }, + } + + cmd.Options = []clibase.Option{ + { + Flag: "duration", + Env: "CODER_SCALETEST_TRAFFICGEN_DURATION", + Default: "10s", + Description: "How long to generate traffic for.", + Value: clibase.DurationOf(&duration), + }, + { + Flag: "bps", + Env: "CODER_SCALETEST_TRAFFICGEN_BPS", + Default: "1024", + Description: "How much traffic to generate in bytes per second.", + Value: clibase.Int64Of(&bps), + }, + } + + formatter.AttachOptions(&cmd.Options) + return cmd +} + type runnableTraceWrapper struct { tracer trace.Tracer spanName string @@ -1023,3 +1175,111 @@ func isScaleTestWorkspace(workspace codersdk.Workspace) bool { return strings.HasPrefix(workspace.OwnerName, "scaletest-") || strings.HasPrefix(workspace.Name, "scaletest-") } + +func readContext(ctx context.Context, src io.Reader, bufSize int) error { + buf := make([]byte, bufSize) + for { + select { + case <-ctx.Done(): + return nil + default: + if ctx.Err() != nil { + return nil + } + _, err := src.Read(buf) + if err != nil { + if xerrors.Is(err, io.EOF) { + return nil + } + return err + } + } + } +} + +func writeRandomData(ctx context.Context, dst io.Writer, size int, tick <-chan time.Time) error { + for { + select { + case <-ctx.Done(): + return nil + case <-tick: + payload := "#" + mustRandStr(size-1) + data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ + Data: payload, + }) + if err != nil { + return err + } + if _, err := copyContext(ctx, dst, data); err != nil { + return err + } + } + } +} + +// copyContext copies from src to dst until ctx is canceled. +func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) { + var count int + for { + select { + case <-ctx.Done(): + return count, nil + default: + if ctx.Err() != nil { + return count, nil + } + n, err := dst.Write(src) + if err != nil { + if xerrors.Is(err, io.EOF) { + // On an EOF, assume that all of src was consumed. + return len(src), nil + } + return count, err + } + count += n + if n == len(src) { + return count, nil + } + // Not all of src was consumed. Update src and retry. + src = src[n:] + } + } +} + +type countReadWriter struct { + io.ReadWriter + bytesRead atomic.Int64 + bytesWritten atomic.Int64 +} + +func (w *countReadWriter) Read(p []byte) (int, error) { + n, err := w.ReadWriter.Read(p) + if err == nil { + w.bytesRead.Add(int64(n)) + } + return n, err +} + +func (w *countReadWriter) Write(p []byte) (int, error) { + n, err := w.ReadWriter.Write(p) + if err == nil { + w.bytesWritten.Add(int64(n)) + } + return n, err +} + +func (w *countReadWriter) BytesRead() int64 { + return w.bytesRead.Load() +} + +func (w *countReadWriter) BytesWritten() int64 { + return w.bytesWritten.Load() +} + +func mustRandStr(len int) string { + randStr, err := cryptorand.String(len) + if err != nil { + panic(err) + } + return randStr +} diff --git a/cli/scaletest_test.go b/cli/scaletest_test.go index 3636b8ef40dc4..4994b87450a3e 100644 --- a/cli/scaletest_test.go +++ b/cli/scaletest_test.go @@ -1,25 +1,31 @@ package cli_test import ( + "bytes" "context" "encoding/json" "os" "path/filepath" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/coder/coder/agent" "github.com/coder/coder/cli/clitest" "github.com/coder/coder/coderd/coderdtest" "github.com/coder/coder/codersdk" + "github.com/coder/coder/codersdk/agentsdk" + "github.com/coder/coder/provisioner/echo" + "github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/pty/ptytest" "github.com/coder/coder/scaletest/harness" "github.com/coder/coder/testutil" ) -func TestScaleTest(t *testing.T) { - t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") +func TestScaleTestCreateWorkspaces(t *testing.T) { + // t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") t.Parallel() // This test does a create-workspaces scale test with --no-cleanup, checks @@ -198,3 +204,78 @@ param3: 1 require.Len(t, users.Users, 1) }) } + +// This test pretends to stand up a workspace and run a no-op traffic generation test. +// It's not a real test, but it's useful for debugging. +// We do not perform any cleanup. +func TestScaleTestTrafficGen(t *testing.T) { + t.Parallel() + + ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium) + defer cancelFunc() + + client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) + user := coderdtest.CreateFirstUser(t, client) + + authToken := uuid.NewString() + version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ + Parse: echo.ParseComplete, + ProvisionPlan: echo.ProvisionComplete, + ProvisionApply: []*proto.Provision_Response{{ + Type: &proto.Provision_Response_Complete{ + Complete: &proto.Provision_Complete{ + Resources: []*proto.Resource{{ + Name: "example", + Type: "aws_instance", + Agents: []*proto.Agent{{ + Id: uuid.NewString(), + Name: "agent", + Auth: &proto.Agent_Token{ + Token: authToken, + }, + Apps: []*proto.App{}, + }}, + }}, + }, + }, + }}, + }) + template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) + coderdtest.AwaitTemplateVersionJob(t, client, version.ID) + + ws := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) + coderdtest.AwaitWorkspaceBuildJob(t, client, ws.LatestBuild.ID) + + agentClient := agentsdk.New(client.URL) + agentClient.SetSessionToken(authToken) + agentCloser := agent.New(agent.Options{ + Client: agentClient, + }) + t.Cleanup(func() { + _ = agentCloser.Close() + }) + + coderdtest.AwaitWorkspaceAgents(t, client, ws.ID) + + inv, root := clitest.New(t, "scaletest", "trafficgen", ws.Name, + "--duration", "1s", + "--bps", "100", + "-o", "json", + ) + clitest.SetupConfig(t, client, root) + var stdout, stderr bytes.Buffer + inv.Stdout = &stdout + inv.Stderr = &stderr + err := inv.WithContext(ctx).Run() + require.NoError(t, err) + // TODO: this struct is currently unexported. Put it somewhere better. + var output struct { + DurationSeconds float64 `json:"duration_s"` + SentBytes int64 `json:"sent_bytes"` + RcvdBytes int64 `json:"rcvd_bytes"` + } + require.NoError(t, json.Unmarshal(stdout.Bytes(), &output)) + require.NotZero(t, output.DurationSeconds) + require.NotZero(t, output.SentBytes) + require.NotZero(t, output.RcvdBytes) +} diff --git a/cli/trafficgen.go b/cli/trafficgen.go deleted file mode 100644 index 5eb4db3ac9160..0000000000000 --- a/cli/trafficgen.go +++ /dev/null @@ -1,276 +0,0 @@ -package cli - -import ( - "context" - "encoding/json" - "fmt" - "io" - "sync/atomic" - "time" - - "github.com/google/uuid" - "golang.org/x/xerrors" - - "github.com/coder/coder/cli/clibase" - "github.com/coder/coder/cli/cliui" - "github.com/coder/coder/codersdk" - "github.com/coder/coder/cryptorand" -) - -type trafficGenOutput struct { - DurationSeconds float64 `json:"duration_s"` - SentBytes int64 `json:"sent_bytes"` - RcvdBytes int64 `json:"rcvd_bytes"` -} - -func (o trafficGenOutput) String() string { - return fmt.Sprintf("Duration: %.2fs\n", o.DurationSeconds) + - fmt.Sprintf("Sent: %dB\n", o.SentBytes) + - fmt.Sprintf("Rcvd: %dB", o.RcvdBytes) -} - -func (r *RootCmd) trafficGen() *clibase.Cmd { - var ( - duration time.Duration - formatter = cliui.NewOutputFormatter( - cliui.TextFormat(), - cliui.JSONFormat(), - ) - bps int64 - client = new(codersdk.Client) - ) - - cmd := &clibase.Cmd{ - Use: "trafficgen", - Hidden: true, - Short: "Generate traffic to a Coder workspace", - Middleware: clibase.Chain( - clibase.RequireRangeArgs(1, 2), - r.InitClient(client), - ), - Handler: func(inv *clibase.Invocation) error { - var ( - agentName string - tickInterval = 100 * time.Millisecond - ) - ws, err := namedWorkspace(inv.Context(), client, inv.Args[0]) - if err != nil { - return err - } - - var agentID uuid.UUID - for _, res := range ws.LatestBuild.Resources { - if len(res.Agents) == 0 { - continue - } - if agentName != "" && agentName != res.Agents[0].Name { - continue - } - agentID = res.Agents[0].ID - } - - if agentID == uuid.Nil { - return xerrors.Errorf("no agent found for workspace %s", ws.Name) - } - - // Setup our workspace agent connection. - reconnect := uuid.New() - conn, err := client.WorkspaceAgentReconnectingPTY(inv.Context(), codersdk.WorkspaceAgentReconnectingPTYOpts{ - AgentID: agentID, - Reconnect: reconnect, - Height: 65535, - Width: 65535, - Command: "/bin/sh", - }) - if err != nil { - return xerrors.Errorf("connect to workspace: %w", err) - } - - defer func() { - _ = conn.Close() - }() - - // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. - crw := countReadWriter{ReadWriter: conn} - - // Set a deadline for stopping the text. - start := time.Now() - deadlineCtx, cancel := context.WithDeadline(inv.Context(), start.Add(duration)) - defer cancel() - - // Create a ticker for sending data to the PTY. - tick := time.NewTicker(tickInterval) - defer tick.Stop() - - // Now we begin writing random data to the pty. - writeSize := int(bps / 10) - rch := make(chan error) - wch := make(chan error) - - // Read forever in the background. - go func() { - rch <- readContext(deadlineCtx, &crw, writeSize*2) - conn.Close() - close(rch) - }() - - // Write random data to the PTY every tick. - go func() { - wch <- writeRandomData(deadlineCtx, &crw, writeSize, tick.C) - close(wch) - }() - - // Wait for both our reads and writes to be finished. - if wErr := <-wch; wErr != nil { - return xerrors.Errorf("write to pty: %w", wErr) - } - if rErr := <-rch; rErr != nil { - return xerrors.Errorf("read from pty: %w", rErr) - } - - duration := time.Since(start) - - results := trafficGenOutput{ - DurationSeconds: duration.Seconds(), - SentBytes: crw.BytesWritten(), - RcvdBytes: crw.BytesRead(), - } - - out, err := formatter.Format(inv.Context(), results) - if err != nil { - return err - } - - _, err = fmt.Fprintln(inv.Stdout, out) - return err - }, - } - - cmd.Options = []clibase.Option{ - { - Flag: "duration", - Env: "CODER_TRAFFICGEN_DURATION", - Default: "10s", - Description: "How long to generate traffic for.", - Value: clibase.DurationOf(&duration), - }, - { - Flag: "bps", - Env: "CODER_TRAFFICGEN_BPS", - Default: "1024", - Description: "How much traffic to generate in bytes per second.", - Value: clibase.Int64Of(&bps), - }, - } - - formatter.AttachOptions(&cmd.Options) - return cmd -} - -func readContext(ctx context.Context, src io.Reader, bufSize int) error { - buf := make([]byte, bufSize) - for { - select { - case <-ctx.Done(): - return nil - default: - if ctx.Err() != nil { - return nil - } - _, err := src.Read(buf) - if err != nil { - if xerrors.Is(err, io.EOF) { - return nil - } - return err - } - } - } -} - -func writeRandomData(ctx context.Context, dst io.Writer, size int, tick <-chan time.Time) error { - for { - select { - case <-ctx.Done(): - return nil - case <-tick: - payload := "#" + mustRandStr(size-1) - data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ - Data: payload, - }) - if err != nil { - return err - } - if _, err := copyContext(ctx, dst, data); err != nil { - return err - } - } - } -} - -// copyContext copies from src to dst until ctx is canceled. -func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) { - var count int - for { - select { - case <-ctx.Done(): - return count, nil - default: - if ctx.Err() != nil { - return count, nil - } - n, err := dst.Write(src) - if err != nil { - if xerrors.Is(err, io.EOF) { - // On an EOF, assume that all of src was consumed. - return len(src), nil - } - return count, err - } - count += n - if n == len(src) { - return count, nil - } - // Not all of src was consumed. Update src and retry. - src = src[n:] - } - } -} - -type countReadWriter struct { - io.ReadWriter - bytesRead atomic.Int64 - bytesWritten atomic.Int64 -} - -func (w *countReadWriter) Read(p []byte) (int, error) { - n, err := w.ReadWriter.Read(p) - if err == nil { - w.bytesRead.Add(int64(n)) - } - return n, err -} - -func (w *countReadWriter) Write(p []byte) (int, error) { - n, err := w.ReadWriter.Write(p) - if err == nil { - w.bytesWritten.Add(int64(n)) - } - return n, err -} - -func (w *countReadWriter) BytesRead() int64 { - return w.bytesRead.Load() -} - -func (w *countReadWriter) BytesWritten() int64 { - return w.bytesWritten.Load() -} - -func mustRandStr(len int) string { - randStr, err := cryptorand.String(len) - if err != nil { - panic(err) - } - return randStr -} diff --git a/cli/trafficgen_test.go b/cli/trafficgen_test.go deleted file mode 100644 index cff86a813d4fd..0000000000000 --- a/cli/trafficgen_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package cli_test - -import ( - "bytes" - "context" - "encoding/json" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - - "github.com/coder/coder/agent" - "github.com/coder/coder/cli/clitest" - "github.com/coder/coder/coderd/coderdtest" - "github.com/coder/coder/codersdk/agentsdk" - "github.com/coder/coder/provisioner/echo" - "github.com/coder/coder/provisionersdk/proto" - "github.com/coder/coder/testutil" -) - -// This test pretends to stand up a workspace and run a no-op traffic generation test. -// It's not a real test, but it's useful for debugging. -// We do not perform any cleanup. -func TestTrafficGen(t *testing.T) { - t.Parallel() - - ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium) - defer cancelFunc() - - client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) - user := coderdtest.CreateFirstUser(t, client) - - authToken := uuid.NewString() - version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ - Parse: echo.ParseComplete, - ProvisionPlan: echo.ProvisionComplete, - ProvisionApply: []*proto.Provision_Response{{ - Type: &proto.Provision_Response_Complete{ - Complete: &proto.Provision_Complete{ - Resources: []*proto.Resource{{ - Name: "example", - Type: "aws_instance", - Agents: []*proto.Agent{{ - Id: uuid.NewString(), - Name: "agent", - Auth: &proto.Agent_Token{ - Token: authToken, - }, - Apps: []*proto.App{}, - }}, - }}, - }, - }, - }}, - }) - template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) - coderdtest.AwaitTemplateVersionJob(t, client, version.ID) - - ws := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - coderdtest.AwaitWorkspaceBuildJob(t, client, ws.LatestBuild.ID) - - agentClient := agentsdk.New(client.URL) - agentClient.SetSessionToken(authToken) - agentCloser := agent.New(agent.Options{ - Client: agentClient, - }) - t.Cleanup(func() { - _ = agentCloser.Close() - }) - - coderdtest.AwaitWorkspaceAgents(t, client, ws.ID) - - inv, root := clitest.New(t, "trafficgen", ws.Name, - "--duration", "1s", - "--bps", "100", - "-o", "json", - ) - clitest.SetupConfig(t, client, root) - var stdout, stderr bytes.Buffer - inv.Stdout = &stdout - inv.Stderr = &stderr - err := inv.WithContext(ctx).Run() - require.NoError(t, err) - // TODO: this struct is currently unexported. Put it somewhere better. - var output struct { - DurationSeconds float64 `json:"duration_s"` - SentBytes int64 `json:"sent_bytes"` - RcvdBytes int64 `json:"rcvd_bytes"` - } - require.NoError(t, json.Unmarshal(stdout.Bytes(), &output)) - require.NotZero(t, output.DurationSeconds) - require.NotZero(t, output.SentBytes) - require.NotZero(t, output.RcvdBytes) -} From 0bfa9f6e4707789905d9844021a0f2f2b8c469ff Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 2 May 2023 17:30:00 +0100 Subject: [PATCH 09/25] integrate with scaletest harness --- cli/scaletest.go | 401 +++++++++++++-------------------- cli/scaletest_test.go | 18 +- scaletest/trafficgen/config.go | 41 ++++ scaletest/trafficgen/run.go | 232 +++++++++++++++++++ 4 files changed, 440 insertions(+), 252 deletions(-) create mode 100644 scaletest/trafficgen/config.go create mode 100644 scaletest/trafficgen/run.go diff --git a/cli/scaletest.go b/cli/scaletest.go index 62f6435936e66..66085559fe207 100644 --- a/cli/scaletest.go +++ b/cli/scaletest.go @@ -10,7 +10,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "syscall" "time" @@ -28,6 +27,7 @@ import ( "github.com/coder/coder/scaletest/createworkspaces" "github.com/coder/coder/scaletest/harness" "github.com/coder/coder/scaletest/reconnectingpty" + "github.com/coder/coder/scaletest/trafficgen" "github.com/coder/coder/scaletest/workspacebuild" ) @@ -386,33 +386,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd { } cliui.Infof(inv.Stdout, "Fetching scaletest workspaces...") - var ( - pageNumber = 0 - limit = 100 - workspaces []codersdk.Workspace - ) - for { - page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{ - Name: "scaletest-", - Offset: pageNumber * limit, - Limit: limit, - }) - if err != nil { - return xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err) - } - - pageNumber++ - if len(page.Workspaces) == 0 { - break - } - - pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces)) - for _, w := range page.Workspaces { - if isScaleTestWorkspace(w) { - pageWorkspaces = append(pageWorkspaces, w) - } - } - workspaces = append(workspaces, pageWorkspaces...) + workspaces, err := getScaletestWorkspaces(ctx, client) + if err != nil { + return err } cliui.Errorf(inv.Stderr, "Found %d scaletest workspaces\n", len(workspaces)) @@ -443,33 +419,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd { } cliui.Infof(inv.Stdout, "Fetching scaletest users...") - pageNumber = 0 - limit = 100 - var users []codersdk.User - for { - page, err := client.Users(ctx, codersdk.UsersRequest{ - Search: "scaletest-", - Pagination: codersdk.Pagination{ - Offset: pageNumber * limit, - Limit: limit, - }, - }) - if err != nil { - return xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err) - } - - pageNumber++ - if len(page.Users) == 0 { - break - } - - pageUsers := make([]codersdk.User, 0, len(page.Users)) - for _, u := range page.Users { - if isScaleTestUser(u) { - pageUsers = append(pageUsers, u) - } - } - users = append(users, pageUsers...) + users, err := getScaletestUsers(ctx, client) + if err != nil { + return err } cliui.Errorf(inv.Stderr, "Found %d scaletest users\n", len(users)) @@ -949,132 +901,138 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { return cmd } -type trafficGenOutput struct { - DurationSeconds float64 `json:"duration_s"` - SentBytes int64 `json:"sent_bytes"` - RcvdBytes int64 `json:"rcvd_bytes"` -} - -func (o trafficGenOutput) String() string { - return fmt.Sprintf("Duration: %.2fs\n", o.DurationSeconds) + - fmt.Sprintf("Sent: %dB\n", o.SentBytes) + - fmt.Sprintf("Rcvd: %dB", o.RcvdBytes) -} - func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { var ( - duration time.Duration - formatter = cliui.NewOutputFormatter( - cliui.TextFormat(), - cliui.JSONFormat(), - ) - bps int64 - client = new(codersdk.Client) + duration time.Duration + bps int64 + client = new(codersdk.Client) + tracingFlags = &scaletestTracingFlags{} + strategy = &scaletestStrategyFlags{} + cleanupStrategy = &scaletestStrategyFlags{cleanup: true} + output = &scaletestOutputFlags{} ) cmd := &clibase.Cmd{ Use: "trafficgen", Hidden: true, - Short: "Generate traffic to a Coder workspace", + Short: "Generate traffic to scaletest workspaces", Middleware: clibase.Chain( - clibase.RequireRangeArgs(1, 2), r.InitClient(client), ), Handler: func(inv *clibase.Invocation) error { - var ( - agentName string - tickInterval = 100 * time.Millisecond - ) - ws, err := namedWorkspace(inv.Context(), client, inv.Args[0]) - if err != nil { - return err + ctx := inv.Context() + + // Bypass rate limiting + client.HTTPClient = &http.Client{ + Transport: &headerTransport{ + transport: http.DefaultTransport, + header: map[string][]string{ + codersdk.BypassRatelimitHeader: {"true"}, + }, + }, } - var agentID uuid.UUID - for _, res := range ws.LatestBuild.Resources { - if len(res.Agents) == 0 { - continue - } - if agentName != "" && agentName != res.Agents[0].Name { - continue - } - agentID = res.Agents[0].ID + workspaces, err := getScaletestWorkspaces(inv.Context(), client) + if err != nil { + return err } - if agentID == uuid.Nil { - return xerrors.Errorf("no agent found for workspace %s", ws.Name) + if len(workspaces) == 0 { + return xerrors.Errorf("no scaletest workspaces exist") } - // Setup our workspace agent connection. - reconnect := uuid.New() - conn, err := client.WorkspaceAgentReconnectingPTY(inv.Context(), codersdk.WorkspaceAgentReconnectingPTYOpts{ - AgentID: agentID, - Reconnect: reconnect, - Height: 65535, - Width: 65535, - Command: "/bin/sh", - }) + tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx) if err != nil { - return xerrors.Errorf("connect to workspace: %w", err) + return xerrors.Errorf("create tracer provider: %w", err) } - defer func() { - _ = conn.Close() + // Allow time for traces to flush even if command context is + // canceled. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _ = closeTracing(ctx) }() + tracer := tracerProvider.Tracer(scaletestTracerName) - // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. - crw := countReadWriter{ReadWriter: conn} - - // Set a deadline for stopping the text. - start := time.Now() - deadlineCtx, cancel := context.WithDeadline(inv.Context(), start.Add(duration)) - defer cancel() + outputs, err := output.parse() + if err != nil { + return xerrors.Errorf("could not parse --output flags") + } - // Create a ticker for sending data to the PTY. - tick := time.NewTicker(tickInterval) - defer tick.Stop() + th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy()) + for idx, ws := range workspaces { + var ( + agentID uuid.UUID + name = "trafficgen" + id = strconv.Itoa(idx) + ) + + for _, res := range ws.LatestBuild.Resources { + if len(res.Agents) == 0 { + continue + } + agentID = res.Agents[0].ID + } - // Now we begin writing random data to the pty. - writeSize := int(bps / 10) - rch := make(chan error) - wch := make(chan error) + if agentID == uuid.Nil { + return xerrors.Errorf("no agent found for workspace %s", ws.Name) + } - // Read forever in the background. - go func() { - rch <- readContext(deadlineCtx, &crw, writeSize*2) - conn.Close() - close(rch) - }() + // Setup our workspace agent connection. + config := trafficgen.Config{ + AgentID: agentID, + BytesPerSecond: bps, + Duration: duration, + TicksPerSecond: 10, + } - // Write random data to the PTY every tick. - go func() { - wch <- writeRandomData(deadlineCtx, &crw, writeSize, tick.C) - close(wch) - }() + if err := config.Validate(); err != nil { + return xerrors.Errorf("validate config: %w", err) + } + var runner harness.Runnable = trafficgen.NewRunner(client, config) + if tracingEnabled { + runner = &runnableTraceWrapper{ + tracer: tracer, + spanName: fmt.Sprintf("%s/%s", name, id), + runner: runner, + } + } - // Wait for both our reads and writes to be finished. - if wErr := <-wch; wErr != nil { - return xerrors.Errorf("write to pty: %w", wErr) + th.AddRun(name, id, runner) } - if rErr := <-rch; rErr != nil { - return xerrors.Errorf("read from pty: %w", rErr) + + _, _ = fmt.Fprintln(inv.Stderr, "Running load test...") + testCtx, testCancel := strategy.toContext(ctx) + defer testCancel() + err = th.Run(testCtx) + if err != nil { + return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err) } - duration := time.Since(start) + res := th.Results() + for _, o := range outputs { + err = o.write(res, inv.Stdout) + if err != nil { + return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err) + } + } - results := trafficGenOutput{ - DurationSeconds: duration.Seconds(), - SentBytes: crw.BytesWritten(), - RcvdBytes: crw.BytesRead(), + // Upload traces. + if tracingEnabled { + _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + err := closeTracing(ctx) + if err != nil { + _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) + } } - out, err := formatter.Format(inv.Context(), results) - if err != nil { - return err + if res.TotalFail > 0 { + return xerrors.New("load test failed, see above for more details") } - _, err = fmt.Fprintln(inv.Stdout, out) - return err + return nil }, } @@ -1095,7 +1053,11 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { }, } - formatter.AttachOptions(&cmd.Options) + tracingFlags.attach(&cmd.Options) + strategy.attach(&cmd.Options) + cleanupStrategy.attach(&cmd.Options) + output.attach(&cmd.Options) + return cmd } @@ -1176,110 +1138,71 @@ func isScaleTestWorkspace(workspace codersdk.Workspace) bool { strings.HasPrefix(workspace.Name, "scaletest-") } -func readContext(ctx context.Context, src io.Reader, bufSize int) error { - buf := make([]byte, bufSize) +func getScaletestWorkspaces(ctx context.Context, client *codersdk.Client) ([]codersdk.Workspace, error) { + var ( + pageNumber = 0 + limit = 100 + workspaces []codersdk.Workspace + ) + for { - select { - case <-ctx.Done(): - return nil - default: - if ctx.Err() != nil { - return nil - } - _, err := src.Read(buf) - if err != nil { - if xerrors.Is(err, io.EOF) { - return nil - } - return err - } + page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{ + Name: "scaletest-", + Offset: pageNumber * limit, + Limit: limit, + }) + if err != nil { + return nil, xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err) } - } -} -func writeRandomData(ctx context.Context, dst io.Writer, size int, tick <-chan time.Time) error { - for { - select { - case <-ctx.Done(): - return nil - case <-tick: - payload := "#" + mustRandStr(size-1) - data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ - Data: payload, - }) - if err != nil { - return err - } - if _, err := copyContext(ctx, dst, data); err != nil { - return err - } + pageNumber++ + if len(page.Workspaces) == 0 { + break } - } -} -// copyContext copies from src to dst until ctx is canceled. -func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) { - var count int - for { - select { - case <-ctx.Done(): - return count, nil - default: - if ctx.Err() != nil { - return count, nil - } - n, err := dst.Write(src) - if err != nil { - if xerrors.Is(err, io.EOF) { - // On an EOF, assume that all of src was consumed. - return len(src), nil - } - return count, err - } - count += n - if n == len(src) { - return count, nil + pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces)) + for _, w := range page.Workspaces { + if isScaleTestWorkspace(w) { + pageWorkspaces = append(pageWorkspaces, w) } - // Not all of src was consumed. Update src and retry. - src = src[n:] } + workspaces = append(workspaces, pageWorkspaces...) } + return workspaces, nil } -type countReadWriter struct { - io.ReadWriter - bytesRead atomic.Int64 - bytesWritten atomic.Int64 -} - -func (w *countReadWriter) Read(p []byte) (int, error) { - n, err := w.ReadWriter.Read(p) - if err == nil { - w.bytesRead.Add(int64(n)) - } - return n, err -} - -func (w *countReadWriter) Write(p []byte) (int, error) { - n, err := w.ReadWriter.Write(p) - if err == nil { - w.bytesWritten.Add(int64(n)) - } - return n, err -} +func getScaletestUsers(ctx context.Context, client *codersdk.Client) ([]codersdk.User, error) { + var ( + pageNumber = 0 + limit = 100 + users []codersdk.User + ) -func (w *countReadWriter) BytesRead() int64 { - return w.bytesRead.Load() -} + for { + page, err := client.Users(ctx, codersdk.UsersRequest{ + Search: "scaletest-", + Pagination: codersdk.Pagination{ + Offset: pageNumber * limit, + Limit: limit, + }, + }) + if err != nil { + return nil, xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err) + } -func (w *countReadWriter) BytesWritten() int64 { - return w.bytesWritten.Load() -} + pageNumber++ + if len(page.Users) == 0 { + break + } -func mustRandStr(len int) string { - randStr, err := cryptorand.String(len) - if err != nil { - panic(err) + pageUsers := make([]codersdk.User, 0, len(page.Users)) + for _, u := range page.Users { + if isScaleTestUser(u) { + pageUsers = append(pageUsers, u) + } + } + users = append(users, pageUsers...) } - return randStr + + return users, nil } diff --git a/cli/scaletest_test.go b/cli/scaletest_test.go index 4994b87450a3e..4e6daa855a4ad 100644 --- a/cli/scaletest_test.go +++ b/cli/scaletest_test.go @@ -25,7 +25,7 @@ import ( ) func TestScaleTestCreateWorkspaces(t *testing.T) { - // t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") + t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") t.Parallel() // This test does a create-workspaces scale test with --no-cleanup, checks @@ -243,7 +243,9 @@ func TestScaleTestTrafficGen(t *testing.T) { template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) coderdtest.AwaitTemplateVersionJob(t, client, version.ID) - ws := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) + ws := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID, func(cwr *codersdk.CreateWorkspaceRequest) { + cwr.Name = "scaletest-test" + }) coderdtest.AwaitWorkspaceBuildJob(t, client, ws.LatestBuild.ID) agentClient := agentsdk.New(client.URL) @@ -260,7 +262,6 @@ func TestScaleTestTrafficGen(t *testing.T) { inv, root := clitest.New(t, "scaletest", "trafficgen", ws.Name, "--duration", "1s", "--bps", "100", - "-o", "json", ) clitest.SetupConfig(t, client, root) var stdout, stderr bytes.Buffer @@ -268,14 +269,5 @@ func TestScaleTestTrafficGen(t *testing.T) { inv.Stderr = &stderr err := inv.WithContext(ctx).Run() require.NoError(t, err) - // TODO: this struct is currently unexported. Put it somewhere better. - var output struct { - DurationSeconds float64 `json:"duration_s"` - SentBytes int64 `json:"sent_bytes"` - RcvdBytes int64 `json:"rcvd_bytes"` - } - require.NoError(t, json.Unmarshal(stdout.Bytes(), &output)) - require.NotZero(t, output.DurationSeconds) - require.NotZero(t, output.SentBytes) - require.NotZero(t, output.RcvdBytes) + require.Contains(t, stdout.String(), "Pass: 1") } diff --git a/scaletest/trafficgen/config.go b/scaletest/trafficgen/config.go new file mode 100644 index 0000000000000..cebe5a6bdca35 --- /dev/null +++ b/scaletest/trafficgen/config.go @@ -0,0 +1,41 @@ +package trafficgen + +import ( + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" +) + +type Config struct { + // AgentID is the workspace agent ID to which to connect. + AgentID uuid.UUID `json:"agent_id"` + // BytesPerSecond is the number of bytes to send to the agent. + + BytesPerSecond int64 `json:"bytes_per_second"` + + // Duration is the total duration for which to send traffic to the agent. + Duration time.Duration `json:"duration"` + + // TicksPerSecond specifies how many times per second we send traffic. + TicksPerSecond int64 `json:"ticks_per_second"` +} + +func (c Config) Validate() error { + if c.AgentID == uuid.Nil { + return xerrors.Errorf("validate agent_id: must not be nil") + } + + if c.BytesPerSecond <= 0 { + return xerrors.Errorf("validate bytes_per_second: must be greater than zero") + } + + if c.Duration <= 0 { + return xerrors.Errorf("validate duration: must be greater than zero") + } + + if c.TicksPerSecond <= 0 { + return xerrors.Errorf("validate ticks_per_second: must be greater than zero") + } + return nil +} diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go new file mode 100644 index 0000000000000..4e61c0d02d66d --- /dev/null +++ b/scaletest/trafficgen/run.go @@ -0,0 +1,232 @@ +package trafficgen + +import ( + "context" + "encoding/json" + "io" + "sync/atomic" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" + "github.com/coder/coder/coderd/tracing" + "github.com/coder/coder/codersdk" + "github.com/coder/coder/cryptorand" + "github.com/coder/coder/scaletest/harness" + "github.com/coder/coder/scaletest/loadtestutil" +) + +type Runner struct { + client *codersdk.Client + cfg Config +} + +var ( + _ harness.Runnable = &Runner{} + _ harness.Cleanable = &Runner{} +) + +func NewRunner(client *codersdk.Client, cfg Config) *Runner { + return &Runner{ + client: client, + cfg: cfg, + } +} + +func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { + ctx, span := tracing.StartSpan(ctx) + defer span.End() + + logs = loadtestutil.NewSyncWriter(logs) + logger := slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug) + r.client.Logger = logger + r.client.LogBodies = true + + var ( + agentID = r.cfg.AgentID + reconnect = uuid.New() + height uint16 = 65535 + width uint16 = 65535 + tickInterval = r.cfg.TicksPerSecond + bytesPerTick = r.cfg.BytesPerSecond / r.cfg.TicksPerSecond + ) + + logger.Debug(ctx, "connect to workspace agent", slog.F("agent_id", agentID)) + conn, err := r.client.WorkspaceAgentReconnectingPTY(ctx, codersdk.WorkspaceAgentReconnectingPTYOpts{ + AgentID: agentID, + Reconnect: reconnect, + Height: height, + Width: width, + Command: "/bin/sh", + }) + if err != nil { + logger.Error(ctx, "connect to workspace agent", slog.F("agent_id", agentID), slog.Error(err)) + return xerrors.Errorf("connect to workspace: %w", err) + } + + defer func() { + logger.Debug(ctx, "close agent connection", slog.F("agent_id", agentID)) + _ = conn.Close() + }() + + // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. + crw := countReadWriter{ReadWriter: conn} + + // Set a deadline for stopping the text. + start := time.Now() + deadlineCtx, cancel := context.WithDeadline(ctx, start.Add(r.cfg.Duration)) + defer cancel() + + // Create a ticker for sending data to the PTY. + tick := time.NewTicker(time.Duration(tickInterval)) + defer tick.Stop() + + // Now we begin writing random data to the pty. + rch := make(chan error) + wch := make(chan error) + + // Read forever in the background. + go func() { + logger.Debug(ctx, "reading from agent", slog.F("agent_id", agentID)) + rch <- readContext(deadlineCtx, &crw, bytesPerTick*2) + logger.Debug(ctx, "done reading from agent", slog.F("agent_id", agentID)) + conn.Close() + close(rch) + }() + + // Write random data to the PTY every tick. + go func() { + logger.Debug(ctx, "writing to agent", slog.F("agent_id", agentID)) + wch <- writeRandomData(deadlineCtx, &crw, bytesPerTick, tick.C) + logger.Debug(ctx, "done writing to agent", slog.F("agent_id", agentID)) + close(wch) + }() + + // Wait for both our reads and writes to be finished. + if wErr := <-wch; wErr != nil { + return xerrors.Errorf("write to pty: %w", wErr) + } + if rErr := <-rch; rErr != nil { + return xerrors.Errorf("read from pty: %w", rErr) + } + + duration := time.Since(start) + + logger.Info(ctx, "trafficgen result", + slog.F("duration", duration), + slog.F("sent", crw.BytesWritten()), + slog.F("rcvd", crw.BytesRead()), + ) + + return nil +} + +// Cleanup does nothing, successfully. +func (*Runner) Cleanup(context.Context, string) error { + return nil +} + +func readContext(ctx context.Context, src io.Reader, bufSize int64) error { + buf := make([]byte, bufSize) + for { + select { + case <-ctx.Done(): + return nil + default: + _, err := src.Read(buf) + if err != nil { + if xerrors.Is(err, io.EOF) { + return nil + } + return err + } + } + } +} + +func writeRandomData(ctx context.Context, dst io.Writer, size int64, tick <-chan time.Time) error { + for { + select { + case <-ctx.Done(): + return nil + case <-tick: + payload := "#" + mustRandStr(size-1) + data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ + Data: payload, + }) + if err != nil { + return err + } + if _, err := copyContext(ctx, dst, data); err != nil { + return err + } + } + } +} + +// copyContext copies from src to dst until ctx is canceled. +func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) { + var count int + for { + select { + case <-ctx.Done(): + return count, nil + default: + n, err := dst.Write(src) + if err != nil { + if xerrors.Is(err, io.EOF) { + // On an EOF, assume that all of src was consumed. + return len(src), nil + } + return count, err + } + count += n + if n == len(src) { + return count, nil + } + // Not all of src was consumed. Update src and retry. + src = src[n:] + } + } +} + +type countReadWriter struct { + io.ReadWriter + bytesRead atomic.Int64 + bytesWritten atomic.Int64 +} + +func (w *countReadWriter) Read(p []byte) (int, error) { + n, err := w.ReadWriter.Read(p) + if err == nil { + w.bytesRead.Add(int64(n)) + } + return n, err +} + +func (w *countReadWriter) Write(p []byte) (int, error) { + n, err := w.ReadWriter.Write(p) + if err == nil { + w.bytesWritten.Add(int64(n)) + } + return n, err +} + +func (w *countReadWriter) BytesRead() int64 { + return w.bytesRead.Load() +} + +func (w *countReadWriter) BytesWritten() int64 { + return w.bytesWritten.Load() +} + +func mustRandStr(len int64) string { + randStr, err := cryptorand.String(int(len)) + if err != nil { + panic(err) + } + return randStr +} From da935a285c88f08cd3445820b4b217a0d2c50f7f Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 3 May 2023 11:31:17 +0100 Subject: [PATCH 10/25] drain connection async --- cli/scaletest.go | 3 +- scaletest/trafficgen/run.go | 77 +++++++++++++++++++++++++++---------- 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/cli/scaletest.go b/cli/scaletest.go index 66085559fe207..ce20a6b20b16a 100644 --- a/cli/scaletest.go +++ b/cli/scaletest.go @@ -975,7 +975,8 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { } if agentID == uuid.Nil { - return xerrors.Errorf("no agent found for workspace %s", ws.Name) + _, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name) + continue } // Setup our workspace agent connection. diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index 4e61c0d02d66d..347b71823d658 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -1,6 +1,7 @@ package trafficgen import ( + "bytes" "context" "encoding/json" "io" @@ -12,6 +13,7 @@ import ( "cdr.dev/slog" "cdr.dev/slog/sloggers/sloghuman" + "github.com/coder/coder/coderd/tracing" "github.com/coder/coder/codersdk" "github.com/coder/coder/cryptorand" @@ -72,14 +74,14 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { _ = conn.Close() }() - // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. - crw := countReadWriter{ReadWriter: conn} - // Set a deadline for stopping the text. start := time.Now() deadlineCtx, cancel := context.WithDeadline(ctx, start.Add(r.cfg.Duration)) defer cancel() + // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. + crw := countReadWriter{ReadWriter: conn, ctx: deadlineCtx} + // Create a ticker for sending data to the PTY. tick := time.NewTicker(time.Duration(tickInterval)) defer tick.Stop() @@ -88,10 +90,15 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { rch := make(chan error) wch := make(chan error) + go func() { + <-deadlineCtx.Done() + logger.Debug(ctx, "context deadline reached", slog.F("duration", time.Since(start))) + }() + // Read forever in the background. go func() { logger.Debug(ctx, "reading from agent", slog.F("agent_id", agentID)) - rch <- readContext(deadlineCtx, &crw, bytesPerTick*2) + rch <- drainContext(deadlineCtx, &crw, bytesPerTick*2) logger.Debug(ctx, "done reading from agent", slog.F("agent_id", agentID)) conn.Close() close(rch) @@ -115,7 +122,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { duration := time.Since(start) - logger.Info(ctx, "trafficgen result", + logger.Info(ctx, "results", slog.F("duration", duration), slog.F("sent", crw.BytesWritten()), slog.F("rcvd", crw.BytesRead()), @@ -129,14 +136,33 @@ func (*Runner) Cleanup(context.Context, string) error { return nil } -func readContext(ctx context.Context, src io.Reader, bufSize int64) error { - buf := make([]byte, bufSize) +// drainContext drains from src until it returns io.EOF or ctx times out. +func drainContext(ctx context.Context, src io.Reader, bufSize int64) error { + errCh := make(chan error) + done := make(chan struct{}) + go func() { + tmp := make([]byte, bufSize) + buf := bytes.NewBuffer(tmp) + for { + select { + case <-done: + return + default: + _, err := io.CopyN(buf, src, 1) + if err != nil { + errCh <- err + close(errCh) + return + } + } + } + }() for { select { case <-ctx.Done(): + close(done) return nil - default: - _, err := src.Read(buf) + case err := <-errCh: if err != nil { if xerrors.Is(err, io.EOF) { return nil @@ -175,31 +201,37 @@ func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) { case <-ctx.Done(): return count, nil default: - n, err := dst.Write(src) - if err != nil { - if xerrors.Is(err, io.EOF) { - // On an EOF, assume that all of src was consumed. - return len(src), nil + for idx := range src { + n, err := dst.Write(src[idx : idx+1]) + if err != nil { + if xerrors.Is(err, io.EOF) { + return count, nil + } + if xerrors.Is(err, context.DeadlineExceeded) { + // It's OK if we reach the deadline before writing the full payload. + return count, nil + } + return count, err } - return count, err - } - count += n - if n == len(src) { - return count, nil + count += n } - // Not all of src was consumed. Update src and retry. - src = src[n:] + return count, nil } } } +// countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written. type countReadWriter struct { + ctx context.Context io.ReadWriter bytesRead atomic.Int64 bytesWritten atomic.Int64 } func (w *countReadWriter) Read(p []byte) (int, error) { + if err := w.ctx.Err(); err != nil { + return 0, err + } n, err := w.ReadWriter.Read(p) if err == nil { w.bytesRead.Add(int64(n)) @@ -208,6 +240,9 @@ func (w *countReadWriter) Read(p []byte) (int, error) { } func (w *countReadWriter) Write(p []byte) (int, error) { + if err := w.ctx.Err(); err != nil { + return 0, err + } n, err := w.ReadWriter.Write(p) if err == nil { w.bytesWritten.Add(int64(n)) From 5daa526057ac92a0e9e6670ac22cc9326754a896 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 3 May 2023 12:30:23 +0100 Subject: [PATCH 11/25] fix cancellation --- scaletest/trafficgen/run.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index 347b71823d658..fc20c379935a9 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -56,7 +56,12 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { bytesPerTick = r.cfg.BytesPerSecond / r.cfg.TicksPerSecond ) + // Set a deadline for stopping the text. + start := time.Now() + deadlineCtx, cancel := context.WithDeadline(ctx, start.Add(r.cfg.Duration)) + defer cancel() logger.Debug(ctx, "connect to workspace agent", slog.F("agent_id", agentID)) + conn, err := r.client.WorkspaceAgentReconnectingPTY(ctx, codersdk.WorkspaceAgentReconnectingPTYOpts{ AgentID: agentID, Reconnect: reconnect, @@ -74,11 +79,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { _ = conn.Close() }() - // Set a deadline for stopping the text. - start := time.Now() - deadlineCtx, cancel := context.WithDeadline(ctx, start.Add(r.cfg.Duration)) - defer cancel() - // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. crw := countReadWriter{ReadWriter: conn, ctx: deadlineCtx} @@ -112,7 +112,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { close(wch) }() - // Wait for both our reads and writes to be finished. + // Write until the context is canceled. if wErr := <-wch; wErr != nil { return xerrors.Errorf("write to pty: %w", wErr) } @@ -138,7 +138,7 @@ func (*Runner) Cleanup(context.Context, string) error { // drainContext drains from src until it returns io.EOF or ctx times out. func drainContext(ctx context.Context, src io.Reader, bufSize int64) error { - errCh := make(chan error) + errCh := make(chan error, 1) done := make(chan struct{}) go func() { tmp := make([]byte, bufSize) @@ -149,6 +149,9 @@ func drainContext(ctx context.Context, src io.Reader, bufSize int64) error { return default: _, err := io.CopyN(buf, src, 1) + if ctx.Err() != nil { + return // context canceled while we were copying. + } if err != nil { errCh <- err close(errCh) From 4f165bec834b65d5851d93894dd4fdbb87a0887b Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 3 May 2023 12:55:00 +0100 Subject: [PATCH 12/25] handle deadline exceeded in drain --- scaletest/trafficgen/run.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index fc20c379935a9..b9f3bb7f94e6d 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -170,6 +170,10 @@ func drainContext(ctx context.Context, src io.Reader, bufSize int64) error { if xerrors.Is(err, io.EOF) { return nil } + // It's OK if the context is canceled. + if xerrors.Is(err, context.DeadlineExceeded) { + return nil + } return err } } From 31fa8be33290bedc28bba12ab3d9ec40bd1ee49d Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 3 May 2023 14:58:04 +0100 Subject: [PATCH 13/25] address PR comments --- scaletest/trafficgen/run.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index b9f3bb7f94e6d..88ba042334a53 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -1,7 +1,6 @@ package trafficgen import ( - "bytes" "context" "encoding/json" "io" @@ -50,8 +49,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { var ( agentID = r.cfg.AgentID reconnect = uuid.New() - height uint16 = 65535 - width uint16 = 65535 + height uint16 = 25 + width uint16 = 80 tickInterval = r.cfg.TicksPerSecond bytesPerTick = r.cfg.BytesPerSecond / r.cfg.TicksPerSecond ) @@ -74,7 +73,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { return xerrors.Errorf("connect to workspace: %w", err) } - defer func() { + go func() { + <-deadlineCtx.Done() logger.Debug(ctx, "close agent connection", slog.F("agent_id", agentID)) _ = conn.Close() }() @@ -87,8 +87,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { defer tick.Stop() // Now we begin writing random data to the pty. - rch := make(chan error) - wch := make(chan error) + rch := make(chan error, 1) + wch := make(chan error, 1) go func() { <-deadlineCtx.Done() @@ -141,14 +141,12 @@ func drainContext(ctx context.Context, src io.Reader, bufSize int64) error { errCh := make(chan error, 1) done := make(chan struct{}) go func() { - tmp := make([]byte, bufSize) - buf := bytes.NewBuffer(tmp) for { select { case <-done: return default: - _, err := io.CopyN(buf, src, 1) + _, err := io.CopyN(io.Discard, src, 1) if ctx.Err() != nil { return // context canceled while we were copying. } From 08172042333b6f9cdaff1217411a5b2dceba936e Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 3 May 2023 14:58:59 +0100 Subject: [PATCH 14/25] fixup! address PR comments --- scaletest/trafficgen/run.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index 88ba042334a53..f193e136c7e26 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -98,7 +98,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { // Read forever in the background. go func() { logger.Debug(ctx, "reading from agent", slog.F("agent_id", agentID)) - rch <- drainContext(deadlineCtx, &crw, bytesPerTick*2) + rch <- drainContext(deadlineCtx, &crw) logger.Debug(ctx, "done reading from agent", slog.F("agent_id", agentID)) conn.Close() close(rch) @@ -137,7 +137,7 @@ func (*Runner) Cleanup(context.Context, string) error { } // drainContext drains from src until it returns io.EOF or ctx times out. -func drainContext(ctx context.Context, src io.Reader, bufSize int64) error { +func drainContext(ctx context.Context, src io.Reader) error { errCh := make(chan error, 1) done := make(chan struct{}) go func() { From a6d787066d255db943cdd28bb5349aa6b05f3b74 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 3 May 2023 15:17:08 +0100 Subject: [PATCH 15/25] ACTUALLY limit traffic instead of just blasting the firehose --- scaletest/trafficgen/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index f193e136c7e26..f8195241b2106 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -51,7 +51,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { reconnect = uuid.New() height uint16 = 25 width uint16 = 80 - tickInterval = r.cfg.TicksPerSecond + tickInterval = time.Second / time.Duration(r.cfg.TicksPerSecond) bytesPerTick = r.cfg.BytesPerSecond / r.cfg.TicksPerSecond ) From 935dcbda246e5c3d93d73fb36d08a01774605bba Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 3 May 2023 15:17:18 +0100 Subject: [PATCH 16/25] log config --- scaletest/trafficgen/run.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index f8195241b2106..992153d192a49 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -55,6 +55,15 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { bytesPerTick = r.cfg.BytesPerSecond / r.cfg.TicksPerSecond ) + logger.Info(ctx, "config", + slog.F("agent_id", agentID), + slog.F("reconnect", reconnect), + slog.F("height", height), + slog.F("width", width), + slog.F("tick_interval", tickInterval), + slog.F("bytes_per_tick", bytesPerTick), + ) + // Set a deadline for stopping the text. start := time.Now() deadlineCtx, cancel := context.WithDeadline(ctx, start.Add(r.cfg.Duration)) From e2efeff0579809ec65616e5019c16375be8fa0c1 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 3 May 2023 15:36:41 +0100 Subject: [PATCH 17/25] lint --- scaletest/trafficgen/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index 992153d192a49..b3abb88761c1c 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -92,7 +92,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { crw := countReadWriter{ReadWriter: conn, ctx: deadlineCtx} // Create a ticker for sending data to the PTY. - tick := time.NewTicker(time.Duration(tickInterval)) + tick := time.NewTicker(tickInterval) defer tick.Stop() // Now we begin writing random data to the pty. From b105e67dec905de93f7b462c823186695868cd8f Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 4 May 2023 13:42:24 +0100 Subject: [PATCH 18/25] chore(cli): scaletest: move logic for flushing traces into tracing provider --- cli/scaletest.go | 47 +++++++++++++++-------------------------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/cli/scaletest.go b/cli/scaletest.go index ce20a6b20b16a..32c32bd8d5d70 100644 --- a/cli/scaletest.go +++ b/cli/scaletest.go @@ -109,7 +109,10 @@ func (s *scaletestTracingFlags) provider(ctx context.Context) (trace.TracerProvi return tracerProvider, func(ctx context.Context) error { var err error closeTracingOnce.Do(func() { - err = closeTracing(ctx) + // Allow time to upload traces even if ctx is canceled + traceCtx, traceCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer traceCancel() + err = closeTracing(traceCtx) }) return err @@ -637,10 +640,11 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { } defer func() { // Allow time for traces to flush even if command context is - // canceled. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _ = closeTracing(ctx) + // canceled. This is a no-op if tracing is not enabled. + _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") + if err := closeTracing(ctx); err != nil { + _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) + } }() tracer := tracerProvider.Tracer(scaletestTracerName) @@ -754,17 +758,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { return xerrors.Errorf("cleanup tests: %w", err) } - // Upload traces. - if tracingEnabled { - _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") - ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) - defer cancel() - err := closeTracing(ctx) - if err != nil { - _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) - } - } - if res.TotalFail > 0 { return xerrors.New("load test failed, see above for more details") } @@ -905,7 +898,7 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { var ( duration time.Duration bps int64 - client = new(codersdk.Client) + client = &codersdk.Client{} tracingFlags = &scaletestTracingFlags{} strategy = &scaletestStrategyFlags{} cleanupStrategy = &scaletestStrategyFlags{cleanup: true} @@ -947,10 +940,11 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { } defer func() { // Allow time for traces to flush even if command context is - // canceled. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _ = closeTracing(ctx) + // canceled. This is a no-op if tracing is not enabled. + _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") + if err := closeTracing(ctx); err != nil { + _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) + } }() tracer := tracerProvider.Tracer(scaletestTracerName) @@ -1018,17 +1012,6 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { } } - // Upload traces. - if tracingEnabled { - _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") - ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) - defer cancel() - err := closeTracing(ctx) - if err != nil { - _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) - } - } - if res.TotalFail > 0 { return xerrors.New("load test failed, see above for more details") } From 731b4db4bc7bbbdc91acecb83c6867243f85d847 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 4 May 2023 13:43:03 +0100 Subject: [PATCH 19/25] remove unnecessary context-based I/O --- scaletest/trafficgen/run.go | 107 ++++++++---------------------------- 1 file changed, 24 insertions(+), 83 deletions(-) diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index b3abb88761c1c..004c2fc346156 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" "golang.org/x/xerrors" + "nhooyr.io/websocket" "cdr.dev/slog" "cdr.dev/slog/sloggers/sloghuman" @@ -101,22 +102,22 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { go func() { <-deadlineCtx.Done() - logger.Debug(ctx, "context deadline reached", slog.F("duration", time.Since(start))) + logger.Debug(ctx, "closing agent connection") + conn.Close() }() // Read forever in the background. go func() { logger.Debug(ctx, "reading from agent", slog.F("agent_id", agentID)) - rch <- drainContext(deadlineCtx, &crw) + rch <- drain(&crw) logger.Debug(ctx, "done reading from agent", slog.F("agent_id", agentID)) - conn.Close() close(rch) }() // Write random data to the PTY every tick. go func() { logger.Debug(ctx, "writing to agent", slog.F("agent_id", agentID)) - wch <- writeRandomData(deadlineCtx, &crw, bytesPerTick, tick.C) + wch <- writeRandomData(&crw, bytesPerTick, tick.C) logger.Debug(ctx, "done writing to agent", slog.F("agent_id", agentID)) close(wch) }() @@ -145,93 +146,33 @@ func (*Runner) Cleanup(context.Context, string) error { return nil } -// drainContext drains from src until it returns io.EOF or ctx times out. -func drainContext(ctx context.Context, src io.Reader) error { - errCh := make(chan error, 1) - done := make(chan struct{}) - go func() { - for { - select { - case <-done: - return - default: - _, err := io.CopyN(io.Discard, src, 1) - if ctx.Err() != nil { - return // context canceled while we were copying. - } - if err != nil { - errCh <- err - close(errCh) - return - } - } - } - }() - for { - select { - case <-ctx.Done(): - close(done) - return nil - case err := <-errCh: - if err != nil { - if xerrors.Is(err, io.EOF) { - return nil - } - // It's OK if the context is canceled. - if xerrors.Is(err, context.DeadlineExceeded) { - return nil - } - return err - } - } - } -} - -func writeRandomData(ctx context.Context, dst io.Writer, size int64, tick <-chan time.Time) error { - for { - select { - case <-ctx.Done(): +// drain drains from src until it returns io.EOF or ctx times out. +func drain(src io.Reader) error { + if _, err := io.Copy(io.Discard, src); err != nil { + if xerrors.Is(err, context.DeadlineExceeded) || xerrors.Is(err, websocket.CloseError{}) { return nil - case <-tick: - payload := "#" + mustRandStr(size-1) - data, err := json.Marshal(codersdk.ReconnectingPTYRequest{ - Data: payload, - }) - if err != nil { - return err - } - if _, err := copyContext(ctx, dst, data); err != nil { - return err - } } + return err } + return nil } -// copyContext copies from src to dst until ctx is canceled. -func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) { - var count int - for { - select { - case <-ctx.Done(): - return count, nil - default: - for idx := range src { - n, err := dst.Write(src[idx : idx+1]) - if err != nil { - if xerrors.Is(err, io.EOF) { - return count, nil - } - if xerrors.Is(err, context.DeadlineExceeded) { - // It's OK if we reach the deadline before writing the full payload. - return count, nil - } - return count, err - } - count += n +func writeRandomData(dst io.Writer, size int64, tick <-chan time.Time) error { + var ( + enc = json.NewEncoder(dst) + ptyReq = codersdk.ReconnectingPTYRequest{} + ) + for range tick { + payload := "#" + mustRandStr(size-1) + ptyReq.Data = payload + if err := enc.Encode(ptyReq); err != nil { + if xerrors.Is(err, context.DeadlineExceeded) || xerrors.Is(err, websocket.CloseError{}) { + return nil } - return count, nil + return err } } + return nil } // countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written. From 9dc28a2ba6f33b55a839a4f7b7adf4f9f1d09e5f Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 4 May 2023 15:13:33 +0100 Subject: [PATCH 20/25] refactor bytes per second to bytes per tick and tick interval --- cli/scaletest.go | 26 +++++++++++++++++--------- cli/scaletest_test.go | 3 ++- scaletest/trafficgen/config.go | 17 +++++++++-------- scaletest/trafficgen/run.go | 21 +++++++++++++++------ 4 files changed, 43 insertions(+), 24 deletions(-) diff --git a/cli/scaletest.go b/cli/scaletest.go index 32c32bd8d5d70..ce54787d7c232 100644 --- a/cli/scaletest.go +++ b/cli/scaletest.go @@ -897,7 +897,8 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { var ( duration time.Duration - bps int64 + tickInterval time.Duration + bytesPerTick int64 client = &codersdk.Client{} tracingFlags = &scaletestTracingFlags{} strategy = &scaletestStrategyFlags{} @@ -975,10 +976,10 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { // Setup our workspace agent connection. config := trafficgen.Config{ - AgentID: agentID, - BytesPerSecond: bps, - Duration: duration, - TicksPerSecond: 10, + AgentID: agentID, + BytesPerTick: bytesPerTick, + Duration: duration, + TickInterval: tickInterval, } if err := config.Validate(); err != nil { @@ -1029,11 +1030,18 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { Value: clibase.DurationOf(&duration), }, { - Flag: "bps", - Env: "CODER_SCALETEST_TRAFFICGEN_BPS", + Flag: "bytes-per-tick", + Env: "CODER_SCALETEST_TRAFFICGEN_BYTES_PER_TICK", Default: "1024", - Description: "How much traffic to generate in bytes per second.", - Value: clibase.Int64Of(&bps), + Description: "How much traffic to generate per tick.", + Value: clibase.Int64Of(&bytesPerTick), + }, + { + Flag: "tick-interval", + Env: "CODER_SCALETEST_TRAFFICGEN_TICK_INTERVAL", + Default: "100ms", + Description: "How often to send traffic.", + Value: clibase.DurationOf(&tickInterval), }, } diff --git a/cli/scaletest_test.go b/cli/scaletest_test.go index 4e6daa855a4ad..4976d19ee38d0 100644 --- a/cli/scaletest_test.go +++ b/cli/scaletest_test.go @@ -261,7 +261,8 @@ func TestScaleTestTrafficGen(t *testing.T) { inv, root := clitest.New(t, "scaletest", "trafficgen", ws.Name, "--duration", "1s", - "--bps", "100", + "--bytes-per-tick", "1024", + "--tick-interval", "100ms", ) clitest.SetupConfig(t, client, root) var stdout, stderr bytes.Buffer diff --git a/scaletest/trafficgen/config.go b/scaletest/trafficgen/config.go index cebe5a6bdca35..f45fa529152ba 100644 --- a/scaletest/trafficgen/config.go +++ b/scaletest/trafficgen/config.go @@ -10,15 +10,15 @@ import ( type Config struct { // AgentID is the workspace agent ID to which to connect. AgentID uuid.UUID `json:"agent_id"` - // BytesPerSecond is the number of bytes to send to the agent. - BytesPerSecond int64 `json:"bytes_per_second"` + // BytesPerTick is the number of bytes to send to the agent per tick. + BytesPerTick int64 `json:"bytes_per_tick"` // Duration is the total duration for which to send traffic to the agent. Duration time.Duration `json:"duration"` - // TicksPerSecond specifies how many times per second we send traffic. - TicksPerSecond int64 `json:"ticks_per_second"` + // TicksInterval specifies how many times per second we send traffic. + TickInterval time.Duration `json:"tick_interval"` } func (c Config) Validate() error { @@ -26,16 +26,17 @@ func (c Config) Validate() error { return xerrors.Errorf("validate agent_id: must not be nil") } - if c.BytesPerSecond <= 0 { - return xerrors.Errorf("validate bytes_per_second: must be greater than zero") + if c.BytesPerTick <= 0 { + return xerrors.Errorf("validate bytes_per_tick: must be greater than zero") } if c.Duration <= 0 { return xerrors.Errorf("validate duration: must be greater than zero") } - if c.TicksPerSecond <= 0 { - return xerrors.Errorf("validate ticks_per_second: must be greater than zero") + if c.TickInterval <= 0 { + return xerrors.Errorf("validate tick_interval: must be greater than zero") } + return nil } diff --git a/scaletest/trafficgen/run.go b/scaletest/trafficgen/run.go index 004c2fc346156..34edc978dfc19 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/trafficgen/run.go @@ -52,8 +52,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { reconnect = uuid.New() height uint16 = 25 width uint16 = 80 - tickInterval = time.Second / time.Duration(r.cfg.TicksPerSecond) - bytesPerTick = r.cfg.BytesPerSecond / r.cfg.TicksPerSecond + tickInterval = r.cfg.TickInterval + bytesPerTick = r.cfg.BytesPerTick ) logger.Info(ctx, "config", @@ -149,7 +149,10 @@ func (*Runner) Cleanup(context.Context, string) error { // drain drains from src until it returns io.EOF or ctx times out. func drain(src io.Reader) error { if _, err := io.Copy(io.Discard, src); err != nil { - if xerrors.Is(err, context.DeadlineExceeded) || xerrors.Is(err, websocket.CloseError{}) { + if xerrors.Is(err, context.DeadlineExceeded) { + return nil + } + if xerrors.As(err, &websocket.CloseError{}) { return nil } return err @@ -166,7 +169,10 @@ func writeRandomData(dst io.Writer, size int64, tick <-chan time.Time) error { payload := "#" + mustRandStr(size-1) ptyReq.Data = payload if err := enc.Encode(ptyReq); err != nil { - if xerrors.Is(err, context.DeadlineExceeded) || xerrors.Is(err, websocket.CloseError{}) { + if xerrors.Is(err, context.DeadlineExceeded) { + return nil + } + if xerrors.As(err, &websocket.CloseError{}) { return nil } return err @@ -213,8 +219,11 @@ func (w *countReadWriter) BytesWritten() int64 { return w.bytesWritten.Load() } -func mustRandStr(len int64) string { - randStr, err := cryptorand.String(int(len)) +func mustRandStr(l int64) string { + if l < 1 { + l = 1 + } + randStr, err := cryptorand.String(int(l)) if err != nil { panic(err) } From 7b98b354dccfeb61e91a2b0c2aac385149334e7f Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 4 May 2023 15:19:15 +0100 Subject: [PATCH 21/25] rename trafficgen -> workspace-traffic --- cli/scaletest.go | 23 +++++++++---------- cli/scaletest_test.go | 4 ++-- .../config.go | 2 +- .../{trafficgen => workspacetraffic}/run.go | 2 +- 4 files changed, 15 insertions(+), 16 deletions(-) rename scaletest/{trafficgen => workspacetraffic}/config.go (97%) rename scaletest/{trafficgen => workspacetraffic}/run.go (99%) diff --git a/cli/scaletest.go b/cli/scaletest.go index ce54787d7c232..513a1c8adb76f 100644 --- a/cli/scaletest.go +++ b/cli/scaletest.go @@ -27,8 +27,8 @@ import ( "github.com/coder/coder/scaletest/createworkspaces" "github.com/coder/coder/scaletest/harness" "github.com/coder/coder/scaletest/reconnectingpty" - "github.com/coder/coder/scaletest/trafficgen" "github.com/coder/coder/scaletest/workspacebuild" + "github.com/coder/coder/scaletest/workspacetraffic" ) const scaletestTracerName = "coder_scaletest" @@ -43,7 +43,7 @@ func (r *RootCmd) scaletest() *clibase.Cmd { Children: []*clibase.Cmd{ r.scaletestCleanup(), r.scaletestCreateWorkspaces(), - r.scaletestTrafficGen(), + r.scaletestWorkspaceTraffic(), }, } @@ -894,7 +894,7 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { return cmd } -func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { +func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { var ( duration time.Duration tickInterval time.Duration @@ -907,9 +907,8 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { ) cmd := &clibase.Cmd{ - Use: "trafficgen", - Hidden: true, - Short: "Generate traffic to scaletest workspaces", + Use: "workspace-traffic", + Short: "Generate traffic to scaletest workspaces through coderd", Middleware: clibase.Chain( r.InitClient(client), ), @@ -958,7 +957,7 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { for idx, ws := range workspaces { var ( agentID uuid.UUID - name = "trafficgen" + name = "workspace-traffic" id = strconv.Itoa(idx) ) @@ -975,7 +974,7 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { } // Setup our workspace agent connection. - config := trafficgen.Config{ + config := workspacetraffic.Config{ AgentID: agentID, BytesPerTick: bytesPerTick, Duration: duration, @@ -985,7 +984,7 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { if err := config.Validate(); err != nil { return xerrors.Errorf("validate config: %w", err) } - var runner harness.Runnable = trafficgen.NewRunner(client, config) + var runner harness.Runnable = workspacetraffic.NewRunner(client, config) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, @@ -1024,21 +1023,21 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd { cmd.Options = []clibase.Option{ { Flag: "duration", - Env: "CODER_SCALETEST_TRAFFICGEN_DURATION", + Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_DURATION", Default: "10s", Description: "How long to generate traffic for.", Value: clibase.DurationOf(&duration), }, { Flag: "bytes-per-tick", - Env: "CODER_SCALETEST_TRAFFICGEN_BYTES_PER_TICK", + Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK", Default: "1024", Description: "How much traffic to generate per tick.", Value: clibase.Int64Of(&bytesPerTick), }, { Flag: "tick-interval", - Env: "CODER_SCALETEST_TRAFFICGEN_TICK_INTERVAL", + Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL", Default: "100ms", Description: "How often to send traffic.", Value: clibase.DurationOf(&tickInterval), diff --git a/cli/scaletest_test.go b/cli/scaletest_test.go index 4976d19ee38d0..e2268ef5d9742 100644 --- a/cli/scaletest_test.go +++ b/cli/scaletest_test.go @@ -208,7 +208,7 @@ param3: 1 // This test pretends to stand up a workspace and run a no-op traffic generation test. // It's not a real test, but it's useful for debugging. // We do not perform any cleanup. -func TestScaleTestTrafficGen(t *testing.T) { +func TestScaleTestWorkspaceTraffic(t *testing.T) { t.Parallel() ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium) @@ -259,7 +259,7 @@ func TestScaleTestTrafficGen(t *testing.T) { coderdtest.AwaitWorkspaceAgents(t, client, ws.ID) - inv, root := clitest.New(t, "scaletest", "trafficgen", ws.Name, + inv, root := clitest.New(t, "scaletest", "workspace-traffic", "--duration", "1s", "--bytes-per-tick", "1024", "--tick-interval", "100ms", diff --git a/scaletest/trafficgen/config.go b/scaletest/workspacetraffic/config.go similarity index 97% rename from scaletest/trafficgen/config.go rename to scaletest/workspacetraffic/config.go index f45fa529152ba..e47ff7873cce2 100644 --- a/scaletest/trafficgen/config.go +++ b/scaletest/workspacetraffic/config.go @@ -1,4 +1,4 @@ -package trafficgen +package workspacetraffic import ( "time" diff --git a/scaletest/trafficgen/run.go b/scaletest/workspacetraffic/run.go similarity index 99% rename from scaletest/trafficgen/run.go rename to scaletest/workspacetraffic/run.go index 34edc978dfc19..c0e74ea5c9ddb 100644 --- a/scaletest/trafficgen/run.go +++ b/scaletest/workspacetraffic/run.go @@ -1,4 +1,4 @@ -package trafficgen +package workspacetraffic import ( "context" From b9c845f2f0957386a21358b47a259b8851c7af2f Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 4 May 2023 15:21:08 +0100 Subject: [PATCH 22/25] make gen --- cli/testdata/coder_scaletest_--help.golden | 1 + ..._scaletest_workspace-traffic_--help.golden | 58 +++++++ docs/cli/scaletest.md | 1 + docs/cli/scaletest_workspace-traffic.md | 149 ++++++++++++++++++ docs/manifest.json | 5 + 5 files changed, 214 insertions(+) create mode 100644 cli/testdata/coder_scaletest_workspace-traffic_--help.golden create mode 100644 docs/cli/scaletest_workspace-traffic.md diff --git a/cli/testdata/coder_scaletest_--help.golden b/cli/testdata/coder_scaletest_--help.golden index 37c7d4d10d677..6ab343cd33377 100644 --- a/cli/testdata/coder_scaletest_--help.golden +++ b/cli/testdata/coder_scaletest_--help.golden @@ -10,6 +10,7 @@ Run a scale test against the Coder API online. Optionally runs a command inside each workspace, and connects to the workspace over WireGuard. + workspace-traffic Generate traffic to scaletest workspaces through coderd --- Run `coder --help` for a list of global options. diff --git a/cli/testdata/coder_scaletest_workspace-traffic_--help.golden b/cli/testdata/coder_scaletest_workspace-traffic_--help.golden new file mode 100644 index 0000000000000..cef44641add1a --- /dev/null +++ b/cli/testdata/coder_scaletest_workspace-traffic_--help.golden @@ -0,0 +1,58 @@ +Usage: coder scaletest workspace-traffic [flags] + +Generate traffic to scaletest workspaces through coderd + +Options + --bytes-per-tick int, $CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK (default: 1024) + How much traffic to generate per tick. + + --cleanup-concurrency int, $CODER_SCALETEST_CLEANUP_CONCURRENCY (default: 1) + Number of concurrent cleanup jobs to run. 0 means unlimited. + + --cleanup-job-timeout duration, $CODER_SCALETEST_CLEANUP_JOB_TIMEOUT (default: 5m) + Timeout per job. Jobs may take longer to complete under higher + concurrency limits. + + --cleanup-timeout duration, $CODER_SCALETEST_CLEANUP_TIMEOUT (default: 30m) + Timeout for the entire cleanup run. 0 means unlimited. + + --concurrency int, $CODER_SCALETEST_CONCURRENCY (default: 1) + Number of concurrent jobs to run. 0 means unlimited. + + --duration duration, $CODER_SCALETEST_WORKSPACE_TRAFFIC_DURATION (default: 10s) + How long to generate traffic for. + + --job-timeout duration, $CODER_SCALETEST_JOB_TIMEOUT (default: 5m) + Timeout per job. Jobs may take longer to complete under higher + concurrency limits. + + --output string-array, $CODER_SCALETEST_OUTPUTS (default: text) + Output format specs in the format "[:]". Not specifying + a path will default to stdout. Available formats: text, json. + + --tick-interval duration, $CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL (default: 100ms) + How often to send traffic. + + --timeout duration, $CODER_SCALETEST_TIMEOUT (default: 30m) + Timeout for the entire test run. 0 means unlimited. + + --trace bool, $CODER_SCALETEST_TRACE + Whether application tracing data is collected. It exports to a backend + configured by environment variables. See: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md. + + --trace-coder bool, $CODER_SCALETEST_TRACE_CODER + Whether opentelemetry traces are sent to Coder. We recommend keeping + this disabled unless we advise you to enable it. + + --trace-honeycomb-api-key string, $CODER_SCALETEST_TRACE_HONEYCOMB_API_KEY + Enables trace exporting to Honeycomb.io using the provided API key. + + --trace-propagate bool, $CODER_SCALETEST_TRACE_PROPAGATE + Enables trace propagation to the Coder backend, which will be used to + correlate server-side spans with client-side spans. Only enable this + if the server is configured with the exact same tracing configuration + as the client. + +--- +Run `coder --help` for a list of global options. diff --git a/docs/cli/scaletest.md b/docs/cli/scaletest.md index aae17ff7ba5fd..dd9eb4c6462aa 100644 --- a/docs/cli/scaletest.md +++ b/docs/cli/scaletest.md @@ -16,3 +16,4 @@ coder scaletest | ------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | [cleanup](./scaletest_cleanup.md) | Cleanup scaletest workspaces, then cleanup scaletest users. | | [create-workspaces](./scaletest_create-workspaces.md) | Creates many users, then creates a workspace for each user and waits for them finish building and fully come online. Optionally runs a command inside each workspace, and connects to the workspace over WireGuard. | +| [workspace-traffic](./scaletest_workspace-traffic.md) | Generate traffic to scaletest workspaces through coderd | diff --git a/docs/cli/scaletest_workspace-traffic.md b/docs/cli/scaletest_workspace-traffic.md new file mode 100644 index 0000000000000..eb8595b9d96ce --- /dev/null +++ b/docs/cli/scaletest_workspace-traffic.md @@ -0,0 +1,149 @@ + + +# scaletest workspace-traffic + +Generate traffic to scaletest workspaces through coderd + +## Usage + +```console +coder scaletest workspace-traffic [flags] +``` + +## Options + +### --bytes-per-tick + +| | | +| ----------- | -------------------------------------------------------------- | +| Type | int | +| Environment | $CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK | +| Default | 1024 | + +How much traffic to generate per tick. + +### --cleanup-concurrency + +| | | +| ----------- | ------------------------------------------------- | +| Type | int | +| Environment | $CODER_SCALETEST_CLEANUP_CONCURRENCY | +| Default | 1 | + +Number of concurrent cleanup jobs to run. 0 means unlimited. + +### --cleanup-job-timeout + +| | | +| ----------- | ------------------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_CLEANUP_JOB_TIMEOUT | +| Default | 5m | + +Timeout per job. Jobs may take longer to complete under higher concurrency limits. + +### --cleanup-timeout + +| | | +| ----------- | --------------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_CLEANUP_TIMEOUT | +| Default | 30m | + +Timeout for the entire cleanup run. 0 means unlimited. + +### --concurrency + +| | | +| ----------- | ----------------------------------------- | +| Type | int | +| Environment | $CODER_SCALETEST_CONCURRENCY | +| Default | 1 | + +Number of concurrent jobs to run. 0 means unlimited. + +### --duration + +| | | +| ----------- | -------------------------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_WORKSPACE_TRAFFIC_DURATION | +| Default | 10s | + +How long to generate traffic for. + +### --job-timeout + +| | | +| ----------- | ----------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_JOB_TIMEOUT | +| Default | 5m | + +Timeout per job. Jobs may take longer to complete under higher concurrency limits. + +### --output + +| | | +| ----------- | ------------------------------------- | +| Type | string-array | +| Environment | $CODER_SCALETEST_OUTPUTS | +| Default | text | + +Output format specs in the format "[:]". Not specifying a path will default to stdout. Available formats: text, json. + +### --tick-interval + +| | | +| ----------- | ------------------------------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL | +| Default | 100ms | + +How often to send traffic. + +### --timeout + +| | | +| ----------- | ------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_TIMEOUT | +| Default | 30m | + +Timeout for the entire test run. 0 means unlimited. + +### --trace + +| | | +| ----------- | ----------------------------------- | +| Type | bool | +| Environment | $CODER_SCALETEST_TRACE | + +Whether application tracing data is collected. It exports to a backend configured by environment variables. See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md. + +### --trace-coder + +| | | +| ----------- | ----------------------------------------- | +| Type | bool | +| Environment | $CODER_SCALETEST_TRACE_CODER | + +Whether opentelemetry traces are sent to Coder. We recommend keeping this disabled unless we advise you to enable it. + +### --trace-honeycomb-api-key + +| | | +| ----------- | ----------------------------------------------------- | +| Type | string | +| Environment | $CODER_SCALETEST_TRACE_HONEYCOMB_API_KEY | + +Enables trace exporting to Honeycomb.io using the provided API key. + +### --trace-propagate + +| | | +| ----------- | --------------------------------------------- | +| Type | bool | +| Environment | $CODER_SCALETEST_TRACE_PROPAGATE | + +Enables trace propagation to the Coder backend, which will be used to correlate server-side spans with client-side spans. Only enable this if the server is configured with the exact same tracing configuration as the client. diff --git a/docs/manifest.json b/docs/manifest.json index 32f4c60151bc4..bbfb5673b8573 100644 --- a/docs/manifest.json +++ b/docs/manifest.json @@ -630,6 +630,11 @@ "description": "Creates many users, then creates a workspace for each user and waits for them finish building and fully come online. Optionally runs a command inside each workspace, and connects to the workspace over WireGuard.", "path": "cli/scaletest_create-workspaces.md" }, + { + "title": "scaletest workspace-traffic", + "description": "Generate traffic to scaletest workspaces through coderd", + "path": "cli/scaletest_workspace-traffic.md" + }, { "title": "schedule", "description": "Schedule automated start and stop times for workspaces", From 2574a0044fdbf1bcfa1205169a90dcb85fbac9fe Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 4 May 2023 15:27:25 +0100 Subject: [PATCH 23/25] use strategy.timeout instead of duration --- cli/scaletest.go | 10 +--------- cli/scaletest_test.go | 2 +- .../coder_scaletest_workspace-traffic_--help.golden | 3 --- docs/cli/scaletest_workspace-traffic.md | 10 ---------- 4 files changed, 2 insertions(+), 23 deletions(-) diff --git a/cli/scaletest.go b/cli/scaletest.go index 513a1c8adb76f..67186da2212fa 100644 --- a/cli/scaletest.go +++ b/cli/scaletest.go @@ -896,7 +896,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { var ( - duration time.Duration tickInterval time.Duration bytesPerTick int64 client = &codersdk.Client{} @@ -977,7 +976,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { config := workspacetraffic.Config{ AgentID: agentID, BytesPerTick: bytesPerTick, - Duration: duration, + Duration: strategy.timeout, TickInterval: tickInterval, } @@ -1021,13 +1020,6 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { } cmd.Options = []clibase.Option{ - { - Flag: "duration", - Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_DURATION", - Default: "10s", - Description: "How long to generate traffic for.", - Value: clibase.DurationOf(&duration), - }, { Flag: "bytes-per-tick", Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK", diff --git a/cli/scaletest_test.go b/cli/scaletest_test.go index e2268ef5d9742..b026e7636b0f3 100644 --- a/cli/scaletest_test.go +++ b/cli/scaletest_test.go @@ -260,7 +260,7 @@ func TestScaleTestWorkspaceTraffic(t *testing.T) { coderdtest.AwaitWorkspaceAgents(t, client, ws.ID) inv, root := clitest.New(t, "scaletest", "workspace-traffic", - "--duration", "1s", + "--timeout", "1s", "--bytes-per-tick", "1024", "--tick-interval", "100ms", ) diff --git a/cli/testdata/coder_scaletest_workspace-traffic_--help.golden b/cli/testdata/coder_scaletest_workspace-traffic_--help.golden index cef44641add1a..b7de6ca960e97 100644 --- a/cli/testdata/coder_scaletest_workspace-traffic_--help.golden +++ b/cli/testdata/coder_scaletest_workspace-traffic_--help.golden @@ -19,9 +19,6 @@ Generate traffic to scaletest workspaces through coderd --concurrency int, $CODER_SCALETEST_CONCURRENCY (default: 1) Number of concurrent jobs to run. 0 means unlimited. - --duration duration, $CODER_SCALETEST_WORKSPACE_TRAFFIC_DURATION (default: 10s) - How long to generate traffic for. - --job-timeout duration, $CODER_SCALETEST_JOB_TIMEOUT (default: 5m) Timeout per job. Jobs may take longer to complete under higher concurrency limits. diff --git a/docs/cli/scaletest_workspace-traffic.md b/docs/cli/scaletest_workspace-traffic.md index eb8595b9d96ce..5303847345a49 100644 --- a/docs/cli/scaletest_workspace-traffic.md +++ b/docs/cli/scaletest_workspace-traffic.md @@ -62,16 +62,6 @@ Timeout for the entire cleanup run. 0 means unlimited. Number of concurrent jobs to run. 0 means unlimited. -### --duration - -| | | -| ----------- | -------------------------------------------------------- | -| Type | duration | -| Environment | $CODER_SCALETEST_WORKSPACE_TRAFFIC_DURATION | -| Default | 10s | - -How long to generate traffic for. - ### --job-timeout | | | From 516ffa1a78eb0db0ebb25d0f0fd582fe89918355 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 4 May 2023 15:34:33 +0100 Subject: [PATCH 24/25] rm ctx from countReadWriter --- scaletest/workspacetraffic/run.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/scaletest/workspacetraffic/run.go b/scaletest/workspacetraffic/run.go index c0e74ea5c9ddb..863e26e958596 100644 --- a/scaletest/workspacetraffic/run.go +++ b/scaletest/workspacetraffic/run.go @@ -90,7 +90,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { }() // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. - crw := countReadWriter{ReadWriter: conn, ctx: deadlineCtx} + crw := countReadWriter{ReadWriter: conn} // Create a ticker for sending data to the PTY. tick := time.NewTicker(tickInterval) @@ -183,16 +183,12 @@ func writeRandomData(dst io.Writer, size int64, tick <-chan time.Time) error { // countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written. type countReadWriter struct { - ctx context.Context io.ReadWriter bytesRead atomic.Int64 bytesWritten atomic.Int64 } func (w *countReadWriter) Read(p []byte) (int, error) { - if err := w.ctx.Err(); err != nil { - return 0, err - } n, err := w.ReadWriter.Read(p) if err == nil { w.bytesRead.Add(int64(n)) @@ -201,9 +197,6 @@ func (w *countReadWriter) Read(p []byte) (int, error) { } func (w *countReadWriter) Write(p []byte) (int, error) { - if err := w.ctx.Err(); err != nil { - return 0, err - } n, err := w.ReadWriter.Write(p) if err == nil { w.bytesWritten.Add(int64(n)) From 655d95aaaaead9f720f059e9231444e1aa63d836 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 5 May 2023 10:02:33 +0100 Subject: [PATCH 25/25] fixup --- cli/root.go | 2 +- scaletest/workspacetraffic/config.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cli/root.go b/cli/root.go index 0ea0da1b4b2c9..6702754abb0bf 100644 --- a/cli/root.go +++ b/cli/root.go @@ -93,6 +93,7 @@ func (r *RootCmd) Core() []*clibase.Cmd { r.parameters(), r.ping(), r.rename(), + r.scaletest(), r.schedules(), r.show(), r.speedtest(), @@ -104,7 +105,6 @@ func (r *RootCmd) Core() []*clibase.Cmd { // Hidden r.gitssh(), - r.scaletest(), r.vscodeSSH(), r.workspaceAgent(), } diff --git a/scaletest/workspacetraffic/config.go b/scaletest/workspacetraffic/config.go index e47ff7873cce2..abf7b7b77159a 100644 --- a/scaletest/workspacetraffic/config.go +++ b/scaletest/workspacetraffic/config.go @@ -17,7 +17,8 @@ type Config struct { // Duration is the total duration for which to send traffic to the agent. Duration time.Duration `json:"duration"` - // TicksInterval specifies how many times per second we send traffic. + // TickInterval specifies the interval between ticks (that is, attempts to + // send data to workspace agents). TickInterval time.Duration `json:"tick_interval"` }