diff --git a/enterprise/cli/proxyserver.go b/enterprise/cli/proxyserver.go index 59b650ffc26cb..6db8591725cf4 100644 --- a/enterprise/cli/proxyserver.go +++ b/enterprise/cli/proxyserver.go @@ -244,7 +244,7 @@ func (r *RootCmd) proxyServer() *clibase.Cmd { closers.Add(closeFunc) } - proxy, err := wsproxy.New(ctx, &wsproxy.Options{ + options := &wsproxy.Options{ Logger: logger, Experiments: coderd.ReadExperiments(logger, cfg.Experiments.Value()), HTTPClient: httpClient, @@ -263,7 +263,12 @@ func (r *RootCmd) proxyServer() *clibase.Cmd { DERPEnabled: cfg.DERP.Server.Enable.Value(), DERPOnly: derpOnly.Value(), DERPServerRelayAddress: cfg.DERP.Server.RelayURL.String(), - }) + } + if httpServers.TLSConfig != nil { + options.TLSCertificates = httpServers.TLSConfig.Certificates + } + + proxy, err := wsproxy.New(ctx, options) if err != nil { return xerrors.Errorf("create workspace proxy: %w", err) } diff --git a/enterprise/coderd/coderd.go b/enterprise/coderd/coderd.go index 16da0453a549e..e68c7d8d17a49 100644 --- a/enterprise/coderd/coderd.go +++ b/enterprise/coderd/coderd.go @@ -3,8 +3,6 @@ package coderd import ( "context" "crypto/ed25519" - "crypto/tls" - "crypto/x509" "fmt" "math" "net/http" @@ -416,27 +414,9 @@ func New(ctx context.Context, options *Options) (_ *API, err error) { }) } - meshRootCA := x509.NewCertPool() - for _, certificate := range options.TLSCertificates { - for _, certificatePart := range certificate.Certificate { - certificate, err := x509.ParseCertificate(certificatePart) - if err != nil { - return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err) - } - meshRootCA.AddCert(certificate) - } - } - // This TLS configuration spoofs access from the access URL hostname - // assuming that the certificates provided will cover that hostname. - // - // Replica sync and DERP meshing require accessing replicas via their - // internal IP addresses, and if TLS is configured we use the same - // certificates. - meshTLSConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, - Certificates: options.TLSCertificates, - RootCAs: meshRootCA, - ServerName: options.AccessURL.Hostname(), + meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(options.AccessURL.Hostname(), options.TLSCertificates) + if err != nil { + return nil, xerrors.Errorf("create DERP mesh TLS config: %w", err) } api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{ ID: api.AGPL.ID, diff --git a/enterprise/coderd/coderdenttest/proxytest.go b/enterprise/coderd/coderdenttest/proxytest.go index 831c4be86f640..730d65c0469f2 100644 --- a/enterprise/coderd/coderdenttest/proxytest.go +++ b/enterprise/coderd/coderdenttest/proxytest.go @@ -43,6 +43,9 @@ type ProxyOptions struct { // region. Token string + // ReplicaPingCallback is optional. + ReplicaPingCallback func(replicas []codersdk.Replica, err string) + // FlushStats is optional FlushStats chan chan<- struct{} } @@ -157,6 +160,7 @@ func NewWorkspaceProxyReplica(t *testing.T, coderdAPI *coderd.API, owner *coders DERPEnabled: !options.DerpDisabled, DERPOnly: options.DerpOnly, DERPServerRelayAddress: serverURL.String(), + ReplicaErrCallback: options.ReplicaPingCallback, StatsCollectorOptions: statsCollectorOptions, }) require.NoError(t, err) diff --git a/enterprise/replicasync/replicasync.go b/enterprise/replicasync/replicasync.go index b7e46b99f6124..aa8adb3a04ad5 100644 --- a/enterprise/replicasync/replicasync.go +++ b/enterprise/replicasync/replicasync.go @@ -3,6 +3,7 @@ package replicasync import ( "context" "crypto/tls" + "crypto/x509" "database/sql" "errors" "fmt" @@ -265,45 +266,35 @@ func (m *Manager) syncReplicas(ctx context.Context) error { }, } defer client.CloseIdleConnections() - var wg sync.WaitGroup - var mu sync.Mutex - failed := make([]string, 0) - for _, peer := range m.Regional() { - wg.Add(1) + + peers := m.Regional() + errs := make(chan error, len(peers)) + for _, peer := range peers { go func(peer database.Replica) { - defer wg.Done() - ra, err := url.Parse(peer.RelayAddress) + err := PingPeerReplica(ctx, client, peer.RelayAddress) if err != nil { - m.logger.Warn(ctx, "could not parse relay address", - slog.F("relay_address", peer.RelayAddress), slog.Error(err)) + errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err) + m.logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", + slog.F("replica_hostname", peer.Hostname), + slog.F("replica_relay_address", peer.RelayAddress), + slog.Error(err), + ) return } - target, err := ra.Parse("/derp/latency-check") - if err != nil { - m.logger.Warn(ctx, "could not resolve /derp/latency-check endpoint", - slog.F("relay_address", peer.RelayAddress), slog.Error(err)) - return - } - req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil) - if err != nil { - m.logger.Warn(ctx, "create http request for relay probe", - slog.F("relay_address", peer.RelayAddress), slog.Error(err)) - return - } - res, err := client.Do(req) - if err != nil { - mu.Lock() - failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err)) - mu.Unlock() - return - } - _ = res.Body.Close() + errs <- nil }(peer) } - wg.Wait() + + replicaErrs := make([]string, 0, len(peers)) + for i := 0; i < len(peers); i++ { + err := <-errs + if err != nil { + replicaErrs = append(replicaErrs, err.Error()) + } + } replicaError := "" - if len(failed) > 0 { - replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(failed, ", ")) + if len(replicaErrs) > 0 { + replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", ")) } databaseLatency, err := m.db.Ping(ctx) @@ -363,6 +354,32 @@ func (m *Manager) syncReplicas(ctx context.Context) error { return nil } +// PingPeerReplica pings a peer replica over it's internal relay address to +// ensure it's reachable and alive for health purposes. +func PingPeerReplica(ctx context.Context, client http.Client, relayAddress string) error { + ra, err := url.Parse(relayAddress) + if err != nil { + return xerrors.Errorf("parse relay address %q: %w", relayAddress, err) + } + target, err := ra.Parse("/derp/latency-check") + if err != nil { + return xerrors.Errorf("parse latency-check URL: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil) + if err != nil { + return xerrors.Errorf("create request: %w", err) + } + res, err := client.Do(req) + if err != nil { + return xerrors.Errorf("do probe: %w", err) + } + _ = res.Body.Close() + if res.StatusCode != http.StatusOK { + return xerrors.Errorf("unexpected status code: %d", res.StatusCode) + } + return nil +} + // Self represents the current replica. func (m *Manager) Self() database.Replica { m.mutex.Lock() @@ -466,3 +483,29 @@ func (m *Manager) Close() error { } return nil } + +// CreateDERPMeshTLSConfig creates a TLS configuration for connecting to peers +// in the DERP mesh over private networking. It overrides the ServerName to be +// the expected public hostname of the peer, and trusts all of the TLS server +// certificates used by this replica (as we expect all replicas to use the same +// TLS certificates). +func CreateDERPMeshTLSConfig(hostname string, tlsCertificates []tls.Certificate) (*tls.Config, error) { + meshRootCA := x509.NewCertPool() + for _, certificate := range tlsCertificates { + for _, certificatePart := range certificate.Certificate { + parsedCert, err := x509.ParseCertificate(certificatePart) + if err != nil { + return nil, xerrors.Errorf("parse certificate %s: %w", parsedCert.Subject.CommonName, err) + } + meshRootCA.AddCert(parsedCert) + } + } + + // This TLS configuration trusts the built-in TLS certificates and forces + // the server name to be the public hostname. + return &tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: meshRootCA, + ServerName: hostname, + }, nil +} diff --git a/enterprise/replicasync/replicasync_test.go b/enterprise/replicasync/replicasync_test.go index 600eb839e28bf..f04ba9ccecc31 100644 --- a/enterprise/replicasync/replicasync_test.go +++ b/enterprise/replicasync/replicasync_test.go @@ -286,7 +286,7 @@ func (d *derpyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { d.Add(1) return } - w.WriteHeader(http.StatusUpgradeRequired) + w.WriteHeader(http.StatusOK) } func (d *derpyHandler) requireOnlyDERPPaths(t *testing.T) { diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index 17fae2b791695..55567014fb4e0 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -3,14 +3,15 @@ package wsproxy import ( "context" "crypto/tls" - "crypto/x509" "errors" "fmt" "net/http" "net/url" "reflect" "regexp" + "slices" "strings" + "sync" "sync/atomic" "time" @@ -19,6 +20,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/singleflight" "golang.org/x/xerrors" "tailscale.com/derp" "tailscale.com/derp/derphttp" @@ -35,6 +37,7 @@ import ( "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/enterprise/derpmesh" + "github.com/coder/coder/v2/enterprise/replicasync" "github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk" "github.com/coder/coder/v2/site" "github.com/coder/coder/v2/tailnet" @@ -76,6 +79,10 @@ type Options struct { // provide access to workspace apps/terminal. DERPOnly bool + // ReplicaErrCallback is called when the proxy replica successfully or + // unsuccessfully pings its peers in the mesh. + ReplicaErrCallback func(replicas []codersdk.Replica, err string) + ProxySessionToken string // AllowAllCors will set all CORs headers to '*'. // By default, CORs is set to accept external requests @@ -121,8 +128,12 @@ type Server struct { SDKClient *wsproxysdk.Client // DERP - derpMesh *derpmesh.Mesh - latestDERPMap atomic.Pointer[tailcfg.DERPMap] + derpMesh *derpmesh.Mesh + derpMeshTLSConfig *tls.Config + replicaPingSingleflight singleflight.Group + replicaErrMut sync.Mutex + replicaErr string + latestDERPMap atomic.Pointer[tailcfg.DERPMap] // Used for graceful shutdown. Required for the dialer. ctx context.Context @@ -166,29 +177,10 @@ func New(ctx context.Context, opts *Options) (*Server, error) { return nil, xerrors.Errorf("%q is a workspace proxy, not a primary coderd instance", opts.DashboardURL) } - meshRootCA := x509.NewCertPool() - for _, certificate := range opts.TLSCertificates { - for _, certificatePart := range certificate.Certificate { - certificate, err := x509.ParseCertificate(certificatePart) - if err != nil { - return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err) - } - meshRootCA.AddCert(certificate) - } - } - // This TLS configuration spoofs access from the access URL hostname - // assuming that the certificates provided will cover that hostname. - // - // Replica sync and DERP meshing require accessing replicas via their - // internal IP addresses, and if TLS is configured we use the same - // certificates. - meshTLSConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, - Certificates: opts.TLSCertificates, - RootCAs: meshRootCA, - ServerName: opts.AccessURL.Hostname(), + meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(opts.AccessURL.Hostname(), opts.TLSCertificates) + if err != nil { + return nil, xerrors.Errorf("create DERP mesh tls config: %w", err) } - derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(opts.Logger.Named("net.derp"))) ctx, cancel := context.WithCancel(context.Background()) @@ -202,14 +194,13 @@ func New(ctx context.Context, opts *Options) (*Server, error) { PrometheusRegistry: opts.PrometheusRegistry, SDKClient: client, derpMesh: derpmesh.New(opts.Logger.Named("net.derpmesh"), derpServer, meshTLSConfig), + derpMeshTLSConfig: meshTLSConfig, ctx: ctx, cancel: cancel, } // Register the workspace proxy with the primary coderd instance and start a // goroutine to periodically re-register. - replicaID := uuid.New() - osHostname := cliutil.Hostname() registerLoop, regResp, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{ Logger: opts.Logger, Request: wsproxysdk.RegisterWorkspaceProxyRequest{ @@ -217,8 +208,8 @@ func New(ctx context.Context, opts *Options) (*Server, error) { WildcardHostname: opts.AppHostname, DerpEnabled: opts.DERPEnabled, DerpOnly: opts.DERPOnly, - ReplicaID: replicaID, - ReplicaHostname: osHostname, + ReplicaID: uuid.New(), + ReplicaHostname: cliutil.Hostname(), ReplicaError: "", ReplicaRelayAddress: opts.DERPServerRelayAddress, Version: buildinfo.Version(), @@ -433,9 +424,10 @@ func (s *Server) Close() error { return err } -func (*Server) mutateRegister(_ *wsproxysdk.RegisterWorkspaceProxyRequest) { - // TODO: we should probably ping replicas similarly to the replicasync - // package in the primary and update req.ReplicaError accordingly. +func (s *Server) mutateRegister(req *wsproxysdk.RegisterWorkspaceProxyRequest) { + s.replicaErrMut.Lock() + defer s.replicaErrMut.Unlock() + req.ReplicaError = s.replicaErr } func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) error { @@ -448,9 +440,82 @@ func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) e s.latestDERPMap.Store(res.DERPMap) + go s.pingSiblingReplicas(res.SiblingReplicas) return nil } +func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { + if len(replicas) == 0 { + return + } + + // Avoid pinging multiple times at once if the list hasn't changed. + relayURLs := make([]string, len(replicas)) + for i, r := range replicas { + relayURLs[i] = r.RelayAddress + } + slices.Sort(relayURLs) + singleflightStr := strings.Join(relayURLs, " ") // URLs can't contain spaces. + + //nolint:dogsled + _, _, _ = s.replicaPingSingleflight.Do(singleflightStr, func() (any, error) { + const ( + perReplicaTimeout = 3 * time.Second + fullTimeout = 10 * time.Second + ) + ctx, cancel := context.WithTimeout(s.ctx, fullTimeout) + defer cancel() + + client := http.Client{ + Timeout: perReplicaTimeout, + Transport: &http.Transport{ + TLSClientConfig: s.derpMeshTLSConfig, + DisableKeepAlives: true, + }, + } + defer client.CloseIdleConnections() + + errs := make(chan error, len(replicas)) + for _, peer := range replicas { + go func(peer codersdk.Replica) { + err := replicasync.PingPeerReplica(ctx, client, peer.RelayAddress) + if err != nil { + errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err) + s.Logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", + slog.F("replica_hostname", peer.Hostname), + slog.F("replica_relay_address", peer.RelayAddress), + slog.Error(err), + ) + return + } + errs <- nil + }(peer) + } + + replicaErrs := make([]string, 0, len(replicas)) + for i := 0; i < len(replicas); i++ { + err := <-errs + if err != nil { + replicaErrs = append(replicaErrs, err.Error()) + } + } + + s.replicaErrMut.Lock() + defer s.replicaErrMut.Unlock() + s.replicaErr = "" + if len(replicaErrs) > 0 { + s.replicaErr = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", ")) + } + if s.Options.ReplicaErrCallback != nil { + s.Options.ReplicaErrCallback(replicas, s.replicaErr) + } + + //nolint:nilnil // we don't actually use the return value of the + // singleflight here + return nil, nil + }) +} + func (s *Server) handleRegisterFailure(err error) { if s.ctx.Err() != nil { return @@ -519,8 +584,14 @@ func (s *Server) healthReport(rw http.ResponseWriter, r *http.Request) { fmt.Sprintf("version mismatch: primary coderd (%s) != workspace proxy (%s)", primaryBuild.Version, buildinfo.Version())) } + s.replicaErrMut.Lock() + if s.replicaErr != "" { + report.Errors = append(report.Errors, "High availability networking: it appears you are running more than one replica of the proxy, but the replicas are unable to establish a mesh for networking: "+s.replicaErr) + } + s.replicaErrMut.Unlock() + // TODO: We should hit the deployment config endpoint and do some config - // checks. We can check the version from the X-CODER-BUILD-VERSION header + // checks. httpapi.Write(r.Context(), rw, http.StatusOK, report) } diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 13d908c07c8e2..ab0a63e69d2df 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -2,8 +2,11 @@ package wsproxy_test import ( "context" + "encoding/json" "fmt" "net" + "net/http" + "net/http/httptest" "net/url" "testing" "time" @@ -20,6 +23,7 @@ import ( "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/agent/agenttest" + "github.com/coder/coder/v2/buildinfo" "github.com/coder/coder/v2/cli/clibase" "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/healthcheck/derphealth" @@ -29,6 +33,7 @@ import ( "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/enterprise/coderd/coderdenttest" "github.com/coder/coder/v2/enterprise/coderd/license" + "github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk" "github.com/coder/coder/v2/provisioner/echo" "github.com/coder/coder/v2/testutil" ) @@ -128,21 +133,20 @@ func TestDERP(t *testing.T) { }) // Create a proxy that is never started. - createProxyCtx := testutil.Context(t, testutil.WaitLong) - _, err := client.CreateWorkspaceProxy(createProxyCtx, codersdk.CreateWorkspaceProxyRequest{ + ctx := testutil.Context(t, testutil.WaitLong) + _, err := client.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ Name: "never-started-proxy", }) require.NoError(t, err) // Wait for both running proxies to become healthy. require.Eventually(t, func() bool { - healthCtx := testutil.Context(t, testutil.WaitLong) - err := api.ProxyHealth.ForceUpdate(healthCtx) + err := api.ProxyHealth.ForceUpdate(ctx) if !assert.NoError(t, err) { return false } - regions, err := client.Regions(healthCtx) + regions, err := client.Regions(ctx) if !assert.NoError(t, err) { return false } @@ -264,7 +268,8 @@ resourceLoop: t.Run("ConnectDERP", func(t *testing.T) { t.Parallel() - connInfo, err := client.WorkspaceAgentConnectionInfo(testutil.Context(t, testutil.WaitLong), agentID) + ctx := testutil.Context(t, testutil.WaitLong) + connInfo, err := client.WorkspaceAgentConnectionInfo(ctx, agentID) require.NoError(t, err) require.NotNil(t, connInfo.DERPMap) require.Len(t, connInfo.DERPMap.Regions, 3+len(api.DeploymentValues.DERP.Server.STUNAddresses.Value())) @@ -287,7 +292,6 @@ resourceLoop: OmitDefaultRegions: true, } - ctx := testutil.Context(t, testutil.WaitLong) report := derphealth.Report{} report.Run(ctx, &derphealth.ReportOptions{ DERPMap: derpMap, @@ -350,14 +354,14 @@ func TestDERPEndToEnd(t *testing.T) { }) // Wait for the proxy to become healthy. + ctx := testutil.Context(t, testutil.WaitLong) require.Eventually(t, func() bool { - healthCtx := testutil.Context(t, testutil.WaitLong) - err := api.ProxyHealth.ForceUpdate(healthCtx) + err := api.ProxyHealth.ForceUpdate(ctx) if !assert.NoError(t, err) { return false } - regions, err := client.Regions(healthCtx) + regions, err := client.Regions(ctx) if !assert.NoError(t, err) { return false } @@ -425,7 +429,6 @@ resourceLoop: _ = coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID) // Connect to the workspace agent. - ctx := testutil.Context(t, testutil.WaitLong) conn, err := client.DialWorkspaceAgent(ctx, agentID, &codersdk.DialWorkspaceAgentOptions{ Logger: slogtest.Make(t, &slogtest.Options{ IgnoreErrors: true, @@ -546,6 +549,243 @@ func TestDERPMesh(t *testing.T) { } } +// TestWorkspaceProxyDERPMeshProbe ensures that each replica pings every other +// replica in the same region as itself periodically. +func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { + t.Parallel() + createProxyRegion := func(ctx context.Context, t *testing.T, client *codersdk.Client, name string) codersdk.UpdateWorkspaceProxyResponse { + t.Helper() + proxyRes, err := client.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ + Name: name, + Icon: "/emojis/flag.png", + }) + require.NoError(t, err, "failed to create workspace proxy") + return proxyRes + } + + registerBrokenProxy := func(ctx context.Context, t *testing.T, primaryAccessURL *url.URL, accessURL, token string) { + t.Helper() + // Create a HTTP server that always replies with 500. + srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + rw.WriteHeader(http.StatusInternalServerError) + })) + t.Cleanup(srv.Close) + + // Register a proxy. + wsproxyClient := wsproxysdk.New(primaryAccessURL) + wsproxyClient.SetSessionToken(token) + + hostname, err := cryptorand.String(6) + require.NoError(t, err) + _, err = wsproxyClient.RegisterWorkspaceProxy(ctx, wsproxysdk.RegisterWorkspaceProxyRequest{ + AccessURL: accessURL, + WildcardHostname: "", + DerpEnabled: true, + DerpOnly: false, + ReplicaID: uuid.New(), + ReplicaHostname: hostname, + ReplicaError: "", + ReplicaRelayAddress: srv.URL, + Version: buildinfo.Version(), + }) + require.NoError(t, err) + } + + t.Run("ProbeOK", func(t *testing.T) { + t.Parallel() + + deploymentValues := coderdtest.DeploymentValues(t) + deploymentValues.Experiments = []string{ + "*", + } + + client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ + Options: &coderdtest.Options{ + DeploymentValues: deploymentValues, + AppHostname: "*.primary.test.coder.com", + IncludeProvisionerDaemon: true, + RealIPConfig: &httpmw.RealIPConfig{ + TrustedOrigins: []*net.IPNet{{ + IP: net.ParseIP("127.0.0.1"), + Mask: net.CIDRMask(8, 32), + }}, + TrustedHeaders: []string{ + "CF-Connecting-IP", + }, + }, + }, + LicenseOptions: &coderdenttest.LicenseOptions{ + Features: license.Features{ + codersdk.FeatureWorkspaceProxy: 1, + }, + }, + }) + t.Cleanup(func() { + _ = closer.Close() + }) + + // Register but don't start a proxy in a different region. This + // shouldn't affect the mesh since it's in a different region. + ctx := testutil.Context(t, testutil.WaitLong) + fakeProxyRes := createProxyRegion(ctx, t, client, "fake-proxy") + registerBrokenProxy(ctx, t, api.AccessURL, "https://fake-proxy.test.coder.com", fakeProxyRes.ProxyToken) + + proxyURL, err := url.Parse("https://proxy1.test.coder.com") + require.NoError(t, err) + + // Create 6 proxy replicas. + const count = 6 + var ( + sessionToken = "" + proxies = [count]coderdenttest.WorkspaceProxy{} + replicaPingDone = [count]bool{} + ) + for i := range proxies { + i := i + proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ + Name: "proxy-1", + Token: sessionToken, + ProxyURL: proxyURL, + ReplicaPingCallback: func(replicas []codersdk.Replica, err string) { + if len(replicas) != count-1 { + // Still warming up... + return + } + replicaPingDone[i] = true + assert.Emptyf(t, err, "replica %d ping callback error", i) + }, + }) + if i == 0 { + sessionToken = proxies[i].Options.ProxySessionToken + } + } + + // Force all proxies to re-register immediately. This ensures the DERP + // mesh is up-to-date. In production this will happen automatically + // after about 15 seconds. + for i, proxy := range proxies { + err := proxy.RegisterNow() + require.NoErrorf(t, err, "failed to force proxy %d to re-register", i) + } + + // Ensure that all proxies have pinged. + require.Eventually(t, func() bool { + ok := true + for i := range proxies { + if !replicaPingDone[i] { + t.Logf("replica %d has not pinged yet", i) + ok = false + } + } + return ok + }, testutil.WaitLong, testutil.IntervalSlow) + t.Log("all replicas have pinged") + + // Check they're all healthy according to /healthz-report. + for _, proxy := range proxies { + // GET /healthz-report + u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"}) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + var respJSON codersdk.ProxyHealthReport + err = json.NewDecoder(resp.Body).Decode(&respJSON) + resp.Body.Close() + require.NoError(t, err) + + require.Empty(t, respJSON.Errors, "proxy is not healthy") + } + }) + + // Register one proxy, then pretend to register 5 others. This should cause + // the mesh to fail and return an error. + t.Run("ProbeFail", func(t *testing.T) { + t.Parallel() + + deploymentValues := coderdtest.DeploymentValues(t) + deploymentValues.Experiments = []string{ + "*", + } + + client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ + Options: &coderdtest.Options{ + DeploymentValues: deploymentValues, + AppHostname: "*.primary.test.coder.com", + IncludeProvisionerDaemon: true, + RealIPConfig: &httpmw.RealIPConfig{ + TrustedOrigins: []*net.IPNet{{ + IP: net.ParseIP("127.0.0.1"), + Mask: net.CIDRMask(8, 32), + }}, + TrustedHeaders: []string{ + "CF-Connecting-IP", + }, + }, + }, + LicenseOptions: &coderdenttest.LicenseOptions{ + Features: license.Features{ + codersdk.FeatureWorkspaceProxy: 1, + }, + }, + }) + t.Cleanup(func() { + _ = closer.Close() + }) + + proxyURL, err := url.Parse("https://proxy2.test.coder.com") + require.NoError(t, err) + + // Create 1 real proxy replica. + const fakeCount = 5 + replicaPingErr := make(chan string, 4) + proxy := coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ + Name: "proxy-2", + ProxyURL: proxyURL, + ReplicaPingCallback: func(replicas []codersdk.Replica, err string) { + if len(replicas) != fakeCount { + // Still warming up... + return + } + replicaPingErr <- err + }, + }) + + // Register (but don't start wsproxy.Server) 5 other proxies in the same + // region. Since they registered recently they should be included in the + // mesh. We create a HTTP server on the relay address that always + // responds with 500 so probes fail. + ctx := testutil.Context(t, testutil.WaitLong) + for i := 0; i < fakeCount; i++ { + registerBrokenProxy(ctx, t, api.AccessURL, proxyURL.String(), proxy.Options.ProxySessionToken) + } + + // Force the proxy to re-register immediately. + err = proxy.RegisterNow() + require.NoError(t, err, "failed to force proxy to re-register") + + // Wait for the ping to fail. + replicaErr := testutil.RequireRecvCtx(ctx, t, replicaPingErr) + require.NotEmpty(t, replicaErr, "replica ping error") + + // GET /healthz-report + u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"}) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + var respJSON codersdk.ProxyHealthReport + err = json.NewDecoder(resp.Body).Decode(&respJSON) + resp.Body.Close() + require.NoError(t, err) + + require.Len(t, respJSON.Errors, 1, "proxy is healthy") + require.Contains(t, respJSON.Errors[0], "High availability networking") + }) +} + func TestWorkspaceProxyWorkspaceApps(t *testing.T) { t.Parallel()