From 9a4717b8268278dcc9e36b6b4769a1cdeafaad0a Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Fri, 4 Nov 2022 14:37:15 +0000 Subject: [PATCH 1/2] feat: add new loadtest type agentconn --- cli/loadtestconfig.go | 20 +- loadtest/agentconn/config.go | 89 +++++++++ loadtest/agentconn/config_test.go | 185 +++++++++++++++++ loadtest/agentconn/run.go | 321 ++++++++++++++++++++++++++++++ loadtest/agentconn/run_test.go | 283 ++++++++++++++++++++++++++ 5 files changed, 896 insertions(+), 2 deletions(-) create mode 100644 loadtest/agentconn/config.go create mode 100644 loadtest/agentconn/config_test.go create mode 100644 loadtest/agentconn/run.go create mode 100644 loadtest/agentconn/run_test.go diff --git a/cli/loadtestconfig.go b/cli/loadtestconfig.go index 0a1f865d8701a..5dcb9ddb27d3a 100644 --- a/cli/loadtestconfig.go +++ b/cli/loadtestconfig.go @@ -7,6 +7,7 @@ import ( "github.com/coder/coder/coderd/httpapi" "github.com/coder/coder/codersdk" + "github.com/coder/coder/loadtest/agentconn" "github.com/coder/coder/loadtest/harness" "github.com/coder/coder/loadtest/placebo" "github.com/coder/coder/loadtest/workspacebuild" @@ -86,6 +87,7 @@ func (s LoadTestStrategy) ExecutionStrategy() harness.ExecutionStrategy { type LoadTestType string const ( + LoadTestTypeAgentConn LoadTestType = "agentconn" LoadTestTypePlacebo LoadTestType = "placebo" LoadTestTypeWorkspaceBuild LoadTestType = "workspacebuild" ) @@ -97,6 +99,8 @@ type LoadTest struct { // the count is 0 or negative, defaults to 1. Count int `json:"count"` + // AgentConn must be set if type == "agentconn". + AgentConn *agentconn.Config `json:"agentconn,omitempty"` // Placebo must be set if type == "placebo". Placebo *placebo.Config `json:"placebo,omitempty"` // WorkspaceBuild must be set if type == "workspacebuild". @@ -105,17 +109,20 @@ type LoadTest struct { func (t LoadTest) NewRunner(client *codersdk.Client) (harness.Runnable, error) { switch t.Type { + case LoadTestTypeAgentConn: + if t.AgentConn == nil { + return nil, xerrors.New("agentconn config must be set") + } + return agentconn.NewRunner(client, *t.AgentConn), nil case LoadTestTypePlacebo: if t.Placebo == nil { return nil, xerrors.New("placebo config must be set") } - return placebo.NewRunner(*t.Placebo), nil case LoadTestTypeWorkspaceBuild: if t.WorkspaceBuild == nil { return nil, xerrors.Errorf("workspacebuild config must be set") } - return workspacebuild.NewRunner(client, *t.WorkspaceBuild), nil default: return nil, xerrors.Errorf("unknown test type %q", t.Type) @@ -155,6 +162,15 @@ func (s *LoadTestStrategy) Validate() error { func (t *LoadTest) Validate() error { switch t.Type { + case LoadTestTypeAgentConn: + if t.AgentConn == nil { + return xerrors.Errorf("agentconn test type must specify agentconn") + } + + err := t.AgentConn.Validate() + if err != nil { + return xerrors.Errorf("validate agentconn: %w", err) + } case LoadTestTypePlacebo: if t.Placebo == nil { return xerrors.Errorf("placebo test type must specify placebo") diff --git a/loadtest/agentconn/config.go b/loadtest/agentconn/config.go new file mode 100644 index 0000000000000..1d5de5cf7a6e0 --- /dev/null +++ b/loadtest/agentconn/config.go @@ -0,0 +1,89 @@ +package agentconn + +import ( + "net/url" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "github.com/coder/coder/coderd/httpapi" +) + +type ConnectionMode string + +const ( + ConnectionModeDirect ConnectionMode = "direct" + ConnectionModeDerp ConnectionMode = "derp" +) + +type Config struct { + // AgentID is the ID of the agent to connect to. + AgentID uuid.UUID `json:"agent_id"` + // ConnectionMode is the strategy to use when connecting to the agent. + ConnectionMode ConnectionMode `json:"connection_mode"` + // HoldDuration is the duration to hold the connection open for. If set to + // 0, the connection will be closed immediately after making each request + // once. + HoldDuration httpapi.Duration `json:"hold_duration"` + + // Connections is the list of connections to make to services running + // inside the workspace. Only HTTP connections are supported. + Connections []Connection `json:"connections"` +} + +type Connection struct { + // URL is the address to connect to (e.g. "http://127.0.0.1:8080/path"). The + // endpoint must respond with a any response within timeout. The IP address + // is ignored and the connection is made to the agent's WireGuard IP + // instead. + URL string `json:"url"` + // Interval is the duration to wait between connections to this endpoint. If + // set to 0, the connection will only be made once. Must be set to 0 if + // the parent config's hold_duration is set to 0. + Interval httpapi.Duration `json:"interval"` + // Timeout is the duration to wait for a connection to this endpoint to + // succeed. If set to 0, the default timeout will be used. + Timeout httpapi.Duration `json:"timeout"` +} + +func (c Config) Validate() error { + if c.AgentID == uuid.Nil { + return xerrors.New("agent_id must be set") + } + if c.ConnectionMode == "" { + return xerrors.New("connection_mode must be set") + } + switch c.ConnectionMode { + case ConnectionModeDirect: + case ConnectionModeDerp: + default: + return xerrors.Errorf("invalid connection_mode: %q", c.ConnectionMode) + } + if c.HoldDuration < 0 { + return xerrors.New("hold_duration must be a positive value") + } + + for i, conn := range c.Connections { + if conn.URL == "" { + return xerrors.Errorf("connections[%d].url must be set", i) + } + u, err := url.Parse(conn.URL) + if err != nil { + return xerrors.Errorf("connections[%d].url is not a valid URL: %w", i, err) + } + if u.Scheme != "http" { + return xerrors.Errorf("connections[%d].url has an unsupported scheme %q, only http is supported", i, u.Scheme) + } + if conn.Interval < 0 { + return xerrors.Errorf("connections[%d].interval must be a positive value", i) + } + if conn.Interval > 0 && c.HoldDuration == 0 { + return xerrors.Errorf("connections[%d].interval must be 0 if hold_duration is 0", i) + } + if conn.Timeout < 0 { + return xerrors.Errorf("connections[%d].timeout must be a positive value", i) + } + } + + return nil +} diff --git a/loadtest/agentconn/config_test.go b/loadtest/agentconn/config_test.go new file mode 100644 index 0000000000000..d803423d6d2c6 --- /dev/null +++ b/loadtest/agentconn/config_test.go @@ -0,0 +1,185 @@ +package agentconn_test + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/coderd/httpapi" + "github.com/coder/coder/loadtest/agentconn" +) + +func Test_Config(t *testing.T) { + t.Parallel() + + id := uuid.Must(uuid.NewRandom()) + + cases := []struct { + name string + config agentconn.Config + errContains string + }{ + { + name: "OK", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: httpapi.Duration(time.Minute), + Connections: []agentconn.Connection{ + { + URL: "http://localhost:8080/path", + Interval: httpapi.Duration(time.Second), + Timeout: httpapi.Duration(time.Second), + }, + { + URL: "http://localhost:8000/differentpath", + Interval: httpapi.Duration(2 * time.Second), + Timeout: httpapi.Duration(2 * time.Second), + }, + }, + }, + }, + { + name: "NoAgentID", + config: agentconn.Config{ + AgentID: uuid.Nil, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: 0, + Connections: nil, + }, + errContains: "agent_id must be set", + }, + { + name: "NoConnectionMode", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: "", + HoldDuration: 0, + Connections: nil, + }, + errContains: "connection_mode must be set", + }, + { + name: "InvalidConnectionMode", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: "blah", + HoldDuration: 0, + Connections: nil, + }, + errContains: "invalid connection_mode", + }, + { + name: "NegativeHoldDuration", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: agentconn.ConnectionModeDerp, + HoldDuration: -1, + Connections: nil, + }, + errContains: "hold_duration must be a positive value", + }, + { + name: "ConnectionNoURL", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: 1, + Connections: []agentconn.Connection{{ + URL: "", + Interval: 0, + Timeout: 0, + }}, + }, + errContains: "connections[0].url must be set", + }, + { + name: "ConnectionInvalidURL", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: 1, + Connections: []agentconn.Connection{{ + URL: string([]byte{0x7f}), + Interval: 0, + Timeout: 0, + }}, + }, + errContains: "connections[0].url is not a valid URL", + }, + { + name: "ConnectionInvalidURLScheme", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: 1, + Connections: []agentconn.Connection{{ + URL: "blah://localhost:8080", + Interval: 0, + Timeout: 0, + }}, + }, + errContains: "connections[0].url has an unsupported scheme", + }, + { + name: "ConnectionNegativeInterval", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: 1, + Connections: []agentconn.Connection{{ + URL: "http://localhost:8080", + Interval: -1, + Timeout: 0, + }}, + }, + errContains: "connections[0].interval must be a positive value", + }, + { + name: "ConnectionIntervalMustBeZero", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: 0, + Connections: []agentconn.Connection{{ + URL: "http://localhost:8080", + Interval: 1, + Timeout: 0, + }}, + }, + errContains: "connections[0].interval must be 0 if hold_duration is 0", + }, + { + name: "ConnectionNegativeTimeout", + config: agentconn.Config{ + AgentID: id, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: 1, + Connections: []agentconn.Connection{{ + URL: "http://localhost:8080", + Interval: 0, + Timeout: -1, + }}, + }, + errContains: "connections[0].timeout must be a positive value", + }, + } + + for _, c := range cases { + c := c + + t.Run(c.name, func(t *testing.T) { + t.Parallel() + + err := c.config.Validate() + if c.errContains != "" { + require.Error(t, err) + require.Contains(t, err.Error(), c.errContains) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/loadtest/agentconn/run.go b/loadtest/agentconn/run.go new file mode 100644 index 0000000000000..143605fe3eaea --- /dev/null +++ b/loadtest/agentconn/run.go @@ -0,0 +1,321 @@ +package agentconn + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "net/netip" + "net/url" + "strconv" + "sync" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" + "github.com/coder/coder/codersdk" + "github.com/coder/coder/loadtest/harness" +) + +const defaultRequestTimeout = 5 * time.Second + +type holdDurationEndedError struct{} + +func (holdDurationEndedError) Error() string { + return "hold duration ended" +} + +type Runner struct { + client *codersdk.Client + cfg Config +} + +var _ harness.Runnable = &Runner{} + +func NewRunner(client *codersdk.Client, cfg Config) *Runner { + return &Runner{ + client: client, + cfg: cfg, + } +} + +// Run implements Runnable. +func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { + logs = syncWriter{ + mut: &sync.Mutex{}, + w: logs, + } + logger := slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug) + + _, _ = fmt.Fprintln(logs, "Opening connection to workspace agent") + switch r.cfg.ConnectionMode { + case ConnectionModeDirect: + _, _ = fmt.Fprintln(logs, "\tUsing direct connection...") + case ConnectionModeDerp: + _, _ = fmt.Fprintln(logs, "\tUsing proxied DERP connection through coder server...") + } + + conn, err := r.client.DialWorkspaceAgent(ctx, r.cfg.AgentID, &codersdk.DialWorkspaceAgentOptions{ + Logger: logger.Named("agentconn"), + // If the config requested DERP, then force DERP. + BlockEndpoints: r.cfg.ConnectionMode == ConnectionModeDerp, + }) + if err != nil { + return xerrors.Errorf("dial workspace agent: %w", err) + } + defer conn.Close() + + // Wait for the disco connection to be established. + const pingAttempts = 10 + const pingDelay = 1 * time.Second + for i := 0; i < pingAttempts; i++ { + _, _ = fmt.Fprintf(logs, "\tDisco ping attempt %d/%d...\n", i+1, pingAttempts) + pingCtx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) + _, err := conn.Ping(pingCtx) + cancel() + if err == nil { + break + } + if i == pingAttempts-1 { + return xerrors.Errorf("ping workspace agent: %w", err) + } + + select { + case <-ctx.Done(): + return xerrors.Errorf("wait for connection to be established: %w", ctx.Err()) + // We use time.After here since it's a very short duration so leaking a + // timer is fine. + case <-time.After(pingDelay): + } + } + + // Wait for a DERP connection. + if r.cfg.ConnectionMode == ConnectionModeDirect { + const directConnectionAttempts = 30 + const directConnectionDelay = 1 * time.Second + for i := 0; i < directConnectionAttempts; i++ { + _, _ = fmt.Fprintf(logs, "\tDirect connection check %d/%d...\n", i+1, directConnectionAttempts) + status := conn.Status() + + var err error + if len(status.Peers()) != 1 { + _, _ = fmt.Fprintf(logs, "\t\tExpected 1 peer, found %d", len(status.Peers())) + err = xerrors.Errorf("expected 1 peer, got %d", len(status.Peers())) + } else { + peer := status.Peer[status.Peers()[0]] + _, _ = fmt.Fprintf(logs, "\t\tCurAddr: %s\n", peer.CurAddr) + _, _ = fmt.Fprintf(logs, "\t\tRelay: %s\n", peer.Relay) + if peer.Relay != "" && peer.CurAddr == "" { + err = xerrors.Errorf("peer is connected via DERP, not direct") + } + } + if err == nil { + break + } + if i == directConnectionAttempts-1 { + return xerrors.Errorf("wait for direct connection to agent: %w", err) + } + + select { + case <-ctx.Done(): + return xerrors.Errorf("wait for direct connection to agent: %w", ctx.Err()) + // We use time.After here since it's a very short duration so + // leaking a timer is fine. + case <-time.After(directConnectionDelay): + } + } + } + + // Ensure DERP for completeness. + if r.cfg.ConnectionMode == ConnectionModeDerp { + status := conn.Status() + if len(status.Peers()) != 1 { + return xerrors.Errorf("check connection mode: expected 1 peer, got %d", len(status.Peers())) + } + peer := status.Peer[status.Peers()[0]] + if peer.Relay == "" || peer.CurAddr != "" { + return xerrors.Errorf("check connection mode: peer is connected directly, not via DERP") + } + } + + _, _ = fmt.Fprint(logs, "\nConnection established.\n\n") + + client := &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + _, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, xerrors.Errorf("split host port %q: %w", addr, err) + } + + portUint, err := strconv.ParseUint(port, 10, 16) + if err != nil { + return nil, xerrors.Errorf("parse port %q: %w", port, err) + } + return conn.DialContextTCP(ctx, netip.AddrPortFrom(codersdk.TailnetIP, uint16(portUint))) + }, + }, + } + + // HACK: even though the ping passed above, we still need to open a + // connection to the agent to ensure it's ready to accept connections. Not + // sure why this is the case but it seems to be necessary. + const verifyConnectionAttempts = 30 + const verifyConnectionDelay = 1 * time.Second + for i := 0; i < verifyConnectionAttempts; i++ { + _, _ = fmt.Fprintf(logs, "\tVerify connection attempt %d/%d...\n", i+1, verifyConnectionAttempts) + verifyCtx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) + + u := &url.URL{ + Scheme: "http", + Host: net.JoinHostPort("localhost", strconv.Itoa(codersdk.TailnetStatisticsPort)), + Path: "/", + } + req, err := http.NewRequestWithContext(verifyCtx, http.MethodGet, u.String(), nil) + if err != nil { + cancel() + return xerrors.Errorf("new verify connection request to %q: %w", u.String(), err) + } + resp, err := client.Do(req) + cancel() + if err == nil { + _ = resp.Body.Close() + break + } + if i == verifyConnectionAttempts-1 { + return xerrors.Errorf("verify connection: %w", err) + } + + select { + case <-ctx.Done(): + return xerrors.Errorf("verify connection: %w", ctx.Err()) + case <-time.After(verifyConnectionDelay): + } + } + + _, _ = fmt.Fprint(logs, "\nConnection verified.\n\n") + + // Make initial connections sequentially to ensure the services are + // reachable before we start spawning a bunch of goroutines and tickers. + if len(r.cfg.Connections) > 0 { + _, _ = fmt.Fprintln(logs, "Performing initial service connections...") + } + for i, connSpec := range r.cfg.Connections { + _, _ = fmt.Fprintf(logs, "\t%d. %s\n", i, connSpec.URL) + + timeout := defaultRequestTimeout + if connSpec.Timeout > 0 { + timeout = time.Duration(connSpec.Timeout) + } + ctx, cancel := context.WithTimeout(ctx, timeout) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, connSpec.URL, nil) + if err != nil { + cancel() + return xerrors.Errorf("create request: %w", err) + } + + res, err := client.Do(req) + cancel() + if err != nil { + _, _ = fmt.Fprintf(logs, "\t\tFailed: %+v\n", err) + return xerrors.Errorf("make initial connection to conn spec %d %q: %w", i, connSpec.URL, err) + } + _ = res.Body.Close() + + _, _ = fmt.Fprintln(logs, "\t\tOK") + } + + if r.cfg.HoldDuration > 0 { + eg, egCtx := errgroup.WithContext(ctx) + + if len(r.cfg.Connections) > 0 { + _, _ = fmt.Fprintln(logs, "\nStarting connection loops...") + } + for i, connSpec := range r.cfg.Connections { + i, connSpec := i, connSpec + if connSpec.Interval <= 0 { + continue + } + + eg.Go(func() error { + t := time.NewTicker(time.Duration(connSpec.Interval)) + defer t.Stop() + + timeout := defaultRequestTimeout + if connSpec.Timeout > 0 { + timeout = time.Duration(connSpec.Timeout) + } + + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case <-t.C: + ctx, cancel := context.WithTimeout(ctx, timeout) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, connSpec.URL, nil) + if err != nil { + cancel() + return xerrors.Errorf("create request: %w", err) + } + + res, err := client.Do(req) + cancel() + if err != nil { + _, _ = fmt.Fprintf(logs, "\tERR: %s (%d): %+v\n", connSpec.URL, i, err) + return xerrors.Errorf("make connection to conn spec %d %q: %w", i, connSpec.URL, err) + } + res.Body.Close() + + _, _ = fmt.Fprintf(logs, "\tOK: %s (%d)\n", connSpec.URL, i) + t.Reset(time.Duration(connSpec.Interval)) + } + } + }) + } + + // Wait for the hold duration to end. We use a fake error to signal that + // the hold duration has ended. + _, _ = fmt.Fprintf(logs, "\nWaiting for %s...\n", time.Duration(r.cfg.HoldDuration)) + eg.Go(func() error { + t := time.NewTicker(time.Duration(r.cfg.HoldDuration)) + defer t.Stop() + + select { + case <-egCtx.Done(): + return egCtx.Err() + case <-t.C: + return holdDurationEndedError{} + } + }) + + err = eg.Wait() + if err != nil && !xerrors.Is(err, holdDurationEndedError{}) { + return xerrors.Errorf("run connections loop: %w", err) + } + } + + err = conn.Close() + if err != nil { + return xerrors.Errorf("close connection: %w", err) + } + + return nil +} + +// syncWriter wraps an io.Writer in a sync.Mutex. +type syncWriter struct { + mut *sync.Mutex + w io.Writer +} + +// Write implements io.Writer. +func (sw syncWriter) Write(p []byte) (n int, err error) { + sw.mut.Lock() + defer sw.mut.Unlock() + return sw.w.Write(p) +} diff --git a/loadtest/agentconn/run_test.go b/loadtest/agentconn/run_test.go new file mode 100644 index 0000000000000..77bf85633e741 --- /dev/null +++ b/loadtest/agentconn/run_test.go @@ -0,0 +1,283 @@ +package agentconn_test + +import ( + "bytes" + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/agent" + "github.com/coder/coder/coderd/coderdtest" + "github.com/coder/coder/coderd/httpapi" + "github.com/coder/coder/codersdk" + "github.com/coder/coder/loadtest/agentconn" + "github.com/coder/coder/provisioner/echo" + "github.com/coder/coder/provisionersdk/proto" + "github.com/coder/coder/testutil" +) + +func Test_Runner(t *testing.T) { + t.Parallel() + + t.Run("Derp+Simple", func(t *testing.T) { + t.Parallel() + + client, agentID := setupRunnerTest(t) + + runner := agentconn.NewRunner(client, agentconn.Config{ + AgentID: agentID, + ConnectionMode: agentconn.ConnectionModeDerp, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + + logs := bytes.NewBuffer(nil) + err := runner.Run(ctx, "1", logs) + logStr := logs.String() + t.Log("Runner logs:\n\n" + logStr) + require.NoError(t, err) + + require.Contains(t, logStr, "Opening connection to workspace agent") + require.Contains(t, logStr, "Using proxied DERP connection") + require.Contains(t, logStr, "Disco ping attempt 1/10...") + require.Contains(t, logStr, "Connection established") + require.Contains(t, logStr, "Verify connection attempt 1/30...") + require.Contains(t, logStr, "Connection verified") + require.NotContains(t, logStr, "Performing initial service connections") + require.NotContains(t, logStr, "Starting connection loops") + require.NotContains(t, logStr, "Waiting for ") + }) + + //nolint:paralleltest // Measures timing as part of the test. + t.Run("Direct+Hold", func(t *testing.T) { + client, agentID := setupRunnerTest(t) + + runner := agentconn.NewRunner(client, agentconn.Config{ + AgentID: agentID, + ConnectionMode: agentconn.ConnectionModeDirect, + HoldDuration: httpapi.Duration(testutil.WaitShort), + }) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + + logs := bytes.NewBuffer(nil) + start := time.Now() + err := runner.Run(ctx, "1", logs) + logStr := logs.String() + t.Log("Runner logs:\n\n" + logStr) + require.NoError(t, err) + + require.WithinRange(t, + time.Now(), + start.Add(testutil.WaitShort-time.Second), + start.Add(testutil.WaitShort+5*time.Second), + ) + + require.Contains(t, logStr, "Opening connection to workspace agent") + require.Contains(t, logStr, "Using direct connection") + require.Contains(t, logStr, "Disco ping attempt 1/10...") + require.Contains(t, logStr, "Direct connection check 1/30...") + require.Contains(t, logStr, "Connection established") + require.Contains(t, logStr, "Verify connection attempt 1/30...") + require.Contains(t, logStr, "Connection verified") + require.NotContains(t, logStr, "Performing initial service connections") + require.NotContains(t, logStr, "Starting connection loops") + require.Contains(t, logStr, fmt.Sprintf("Waiting for %s", testutil.WaitShort)) + }) + + t.Run("Derp+ServicesNoHold", func(t *testing.T) { + t.Parallel() + + client, agentID := setupRunnerTest(t) + service1URL, service1Count := testServer(t) + service2URL, service2Count := testServer(t) + + runner := agentconn.NewRunner(client, agentconn.Config{ + AgentID: agentID, + ConnectionMode: agentconn.ConnectionModeDerp, + HoldDuration: 0, + Connections: []agentconn.Connection{ + { + URL: service1URL, + Timeout: httpapi.Duration(time.Second), + }, + { + URL: service2URL, + Timeout: httpapi.Duration(time.Second), + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + + logs := bytes.NewBuffer(nil) + err := runner.Run(ctx, "1", logs) + logStr := logs.String() + t.Log("Runner logs:\n\n" + logStr) + require.NoError(t, err) + + require.Contains(t, logStr, "Opening connection to workspace agent") + require.Contains(t, logStr, "Using proxied DERP connection") + require.Contains(t, logStr, "Disco ping attempt 1/10...") + require.Contains(t, logStr, "Connection established") + require.Contains(t, logStr, "Verify connection attempt 1/30...") + require.Contains(t, logStr, "Connection verified") + require.Contains(t, logStr, "Performing initial service connections") + require.Contains(t, logStr, "0. "+service1URL) + require.Contains(t, logStr, "1. "+service2URL) + require.NotContains(t, logStr, "Starting connection loops") + require.NotContains(t, logStr, "Waiting for ") + + require.EqualValues(t, 1, service1Count()) + require.EqualValues(t, 1, service2Count()) + }) + + //nolint:paralleltest // Measures timing as part of the test. + t.Run("Derp+Hold+Services", func(t *testing.T) { + client, agentID := setupRunnerTest(t) + service1URL, service1Count := testServer(t) + service2URL, service2Count := testServer(t) + service3URL, service3Count := testServer(t) + + runner := agentconn.NewRunner(client, agentconn.Config{ + AgentID: agentID, + ConnectionMode: agentconn.ConnectionModeDerp, + HoldDuration: httpapi.Duration(testutil.WaitShort), + Connections: []agentconn.Connection{ + { + URL: service1URL, + // No interval. + Timeout: httpapi.Duration(time.Second), + }, + { + URL: service2URL, + Interval: httpapi.Duration(1 * time.Second), + Timeout: httpapi.Duration(time.Second), + }, + { + URL: service3URL, + Interval: httpapi.Duration(500 * time.Millisecond), + Timeout: httpapi.Duration(time.Second), + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + + logs := bytes.NewBuffer(nil) + start := time.Now() + err := runner.Run(ctx, "1", logs) + logStr := logs.String() + t.Log("Runner logs:\n\n" + logStr) + require.NoError(t, err) + + require.WithinRange(t, + time.Now(), + start.Add(testutil.WaitShort-time.Second), + start.Add(testutil.WaitShort+5*time.Second), + ) + + require.Contains(t, logStr, "Opening connection to workspace agent") + require.Contains(t, logStr, "Using proxied DERP connection") + require.Contains(t, logStr, "Disco ping attempt 1/10...") + require.Contains(t, logStr, "Connection established") + require.Contains(t, logStr, "Verify connection attempt 1/30...") + require.Contains(t, logStr, "Connection verified") + require.Contains(t, logStr, "Performing initial service connections") + require.Contains(t, logStr, "0. "+service1URL) + require.Contains(t, logStr, "1. "+service2URL) + require.Contains(t, logStr, "Starting connection loops") + require.NotContains(t, logStr, fmt.Sprintf("OK: %s (0)", service1URL)) + require.Contains(t, logStr, fmt.Sprintf("OK: %s (1)", service2URL)) + require.Contains(t, logStr, fmt.Sprintf("OK: %s (2)", service3URL)) + require.Contains(t, logStr, fmt.Sprintf("Waiting for %s", testutil.WaitShort)) + + t.Logf("service 1 called %d times", service1Count()) + t.Logf("service 2 called %d times", service2Count()) + t.Logf("service 3 called %d times", service3Count()) + require.EqualValues(t, 1, service1Count()) + require.NotEqualValues(t, 1, service2Count()) + require.NotEqualValues(t, 1, service3Count()) + // service 3 should've been called way more times than service 2 + require.True(t, service3Count() > service2Count()+2) + }) +} + +func setupRunnerTest(t *testing.T) (client *codersdk.Client, agentID uuid.UUID) { + t.Helper() + + client = coderdtest.New(t, &coderdtest.Options{ + IncludeProvisionerDaemon: true, + }) + user := coderdtest.CreateFirstUser(t, client) + + authToken := uuid.NewString() + version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ + Parse: echo.ParseComplete, + ProvisionDryRun: echo.ProvisionComplete, + Provision: []*proto.Provision_Response{{ + Type: &proto.Provision_Response_Complete{ + Complete: &proto.Provision_Complete{ + Resources: []*proto.Resource{{ + Name: "example", + Type: "aws_instance", + Agents: []*proto.Agent{{ + Id: uuid.NewString(), + Name: "agent", + Auth: &proto.Agent_Token{ + Token: authToken, + }, + Apps: []*proto.App{}, + }}, + }}, + }, + }, + }}, + }) + + template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) + coderdtest.AwaitTemplateVersionJob(t, client, version.ID) + + workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) + coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) + + agentClient := codersdk.New(client.URL) + agentClient.SessionToken = authToken + agentCloser := agent.New(agent.Options{ + Client: agentClient, + Logger: slogtest.Make(t, nil).Named("agent"), + }) + t.Cleanup(func() { + _ = agentCloser.Close() + }) + + resources := coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID) + return client, resources[0].Agents[0].ID +} + +func testServer(t *testing.T) (string, func() int64) { + t.Helper() + + var count int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&count, 1) + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + + return srv.URL, func() int64 { + return atomic.LoadInt64(&count) + } +} From f2a03248bc95052250dd79999138feaf54114d1b Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Mon, 7 Nov 2022 17:08:28 +0000 Subject: [PATCH 2/2] chore: pr comments --- loadtest/agentconn/config_test.go | 3 +-- loadtest/agentconn/run.go | 5 ++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/loadtest/agentconn/config_test.go b/loadtest/agentconn/config_test.go index d803423d6d2c6..4c2facf006c42 100644 --- a/loadtest/agentconn/config_test.go +++ b/loadtest/agentconn/config_test.go @@ -14,8 +14,7 @@ import ( func Test_Config(t *testing.T) { t.Parallel() - id := uuid.Must(uuid.NewRandom()) - + id := uuid.New() cases := []struct { name string config agentconn.Config diff --git a/loadtest/agentconn/run.go b/loadtest/agentconn/run.go index 143605fe3eaea..094df34c30d4f 100644 --- a/loadtest/agentconn/run.go +++ b/loadtest/agentconn/run.go @@ -93,7 +93,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { } } - // Wait for a DERP connection. + // Wait for a direct connection if requested. if r.cfg.ConnectionMode == ConnectionModeDirect { const directConnectionAttempts = 30 const directConnectionDelay = 1 * time.Second @@ -289,6 +289,9 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { case <-egCtx.Done(): return egCtx.Err() case <-t.C: + // Returning an error here will cause the errgroup context to + // be canceled, which is what we want. This fake error is + // ignored below. return holdDurationEndedError{} } })