diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index e578f84afb8f3..3766064608a47 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -449,8 +449,21 @@ func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) e } func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { + ctx, cancel := context.WithTimeout(s.ctx, 10*time.Second) + defer cancel() + + errStr := pingSiblingReplicas(ctx, s.Logger, &s.replicaPingSingleflight, s.derpMeshTLSConfig, replicas) + s.replicaErrMut.Lock() + s.replicaErr = errStr + s.replicaErrMut.Unlock() + if s.Options.ReplicaErrCallback != nil { + s.Options.ReplicaErrCallback(replicas, s.replicaErr) + } +} + +func pingSiblingReplicas(ctx context.Context, logger slog.Logger, sf *singleflight.Group, derpMeshTLSConfig *tls.Config, replicas []codersdk.Replica) string { if len(replicas) == 0 { - return + return "" } // Avoid pinging multiple times at once if the list hasn't changed. @@ -462,18 +475,11 @@ func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { singleflightStr := strings.Join(relayURLs, " ") // URLs can't contain spaces. //nolint:dogsled - _, _, _ = s.replicaPingSingleflight.Do(singleflightStr, func() (any, error) { - const ( - perReplicaTimeout = 3 * time.Second - fullTimeout = 10 * time.Second - ) - ctx, cancel := context.WithTimeout(s.ctx, fullTimeout) - defer cancel() - + errStrInterface, _, _ := sf.Do(singleflightStr, func() (any, error) { client := http.Client{ - Timeout: perReplicaTimeout, + Timeout: 3 * time.Second, Transport: &http.Transport{ - TLSClientConfig: s.derpMeshTLSConfig, + TLSClientConfig: derpMeshTLSConfig, DisableKeepAlives: true, }, } @@ -485,7 +491,7 @@ func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { err := replicasync.PingPeerReplica(ctx, client, peer.RelayAddress) if err != nil { errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err) - s.Logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", + logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", slog.F("replica_hostname", peer.Hostname), slog.F("replica_relay_address", peer.RelayAddress), slog.Error(err), @@ -504,20 +510,14 @@ func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { } } - s.replicaErrMut.Lock() - defer s.replicaErrMut.Unlock() - s.replicaErr = "" - if len(replicaErrs) > 0 { - s.replicaErr = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", ")) - } - if s.Options.ReplicaErrCallback != nil { - s.Options.ReplicaErrCallback(replicas, s.replicaErr) + if len(replicaErrs) == 0 { + return "", nil } - - //nolint:nilnil // we don't actually use the return value of the - // singleflight here - return nil, nil + return fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", ")), nil }) + + //nolint:forcetypeassert + return errStrInterface.(string) } func (s *Server) handleRegisterFailure(err error) { @@ -590,7 +590,8 @@ func (s *Server) healthReport(rw http.ResponseWriter, r *http.Request) { s.replicaErrMut.Lock() if s.replicaErr != "" { - report.Errors = append(report.Errors, "High availability networking: it appears you are running more than one replica of the proxy, but the replicas are unable to establish a mesh for networking: "+s.replicaErr) + report.Warnings = append(report.Warnings, + "High availability networking: it appears you are running more than one replica of the proxy, but the replicas are unable to establish a mesh for networking: "+s.replicaErr) } s.replicaErrMut.Unlock() diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 97e6814c389fa..c18cd0413feb4 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -563,7 +563,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { return proxyRes } - registerBrokenProxy := func(ctx context.Context, t *testing.T, primaryAccessURL *url.URL, accessURL, token string) { + registerBrokenProxy := func(ctx context.Context, t *testing.T, primaryAccessURL *url.URL, accessURL, token string) uuid.UUID { t.Helper() // Create a HTTP server that always replies with 500. srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { @@ -574,21 +574,23 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { // Register a proxy. wsproxyClient := wsproxysdk.New(primaryAccessURL) wsproxyClient.SetSessionToken(token) - hostname, err := cryptorand.String(6) require.NoError(t, err) + replicaID := uuid.New() _, err = wsproxyClient.RegisterWorkspaceProxy(ctx, wsproxysdk.RegisterWorkspaceProxyRequest{ AccessURL: accessURL, WildcardHostname: "", DerpEnabled: true, DerpOnly: false, - ReplicaID: uuid.New(), + ReplicaID: replicaID, ReplicaHostname: hostname, ReplicaError: "", ReplicaRelayAddress: srv.URL, Version: buildinfo.Version(), }) require.NoError(t, err) + + return replicaID } t.Run("ProbeOK", func(t *testing.T) { @@ -781,8 +783,120 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { resp.Body.Close() require.NoError(t, err) - require.Len(t, respJSON.Errors, 1, "proxy is healthy") - require.Contains(t, respJSON.Errors[0], "High availability networking") + require.Len(t, respJSON.Warnings, 1, "proxy is healthy") + require.Contains(t, respJSON.Warnings[0], "High availability networking") + }) + + // This test catches a regression we detected on dogfood which caused + // proxies to remain unhealthy after a mesh failure if they dropped to zero + // siblings after the failure. + t.Run("HealthyZero", func(t *testing.T) { + t.Parallel() + + deploymentValues := coderdtest.DeploymentValues(t) + deploymentValues.Experiments = []string{ + "*", + } + + client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ + Options: &coderdtest.Options{ + DeploymentValues: deploymentValues, + AppHostname: "*.primary.test.coder.com", + IncludeProvisionerDaemon: true, + RealIPConfig: &httpmw.RealIPConfig{ + TrustedOrigins: []*net.IPNet{{ + IP: net.ParseIP("127.0.0.1"), + Mask: net.CIDRMask(8, 32), + }}, + TrustedHeaders: []string{ + "CF-Connecting-IP", + }, + }, + }, + LicenseOptions: &coderdenttest.LicenseOptions{ + Features: license.Features{ + codersdk.FeatureWorkspaceProxy: 1, + }, + }, + }) + t.Cleanup(func() { + _ = closer.Close() + }) + + proxyURL, err := url.Parse("https://proxy2.test.coder.com") + require.NoError(t, err) + + // Create 1 real proxy replica. + replicaPingErr := make(chan string, 4) + proxy := coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ + Name: "proxy-2", + ProxyURL: proxyURL, + ReplicaPingCallback: func(replicas []codersdk.Replica, err string) { + replicaPingErr <- err + }, + }) + + ctx := testutil.Context(t, testutil.WaitLong) + otherReplicaID := registerBrokenProxy(ctx, t, api.AccessURL, proxyURL.String(), proxy.Options.ProxySessionToken) + + // Force the proxy to re-register immediately. + err = proxy.RegisterNow() + require.NoError(t, err, "failed to force proxy to re-register") + + // Wait for the ping to fail. + for { + replicaErr := testutil.RequireRecvCtx(ctx, t, replicaPingErr) + t.Log("replica ping error:", replicaErr) + if replicaErr != "" { + break + } + } + + // GET /healthz-report + u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"}) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + var respJSON codersdk.ProxyHealthReport + err = json.NewDecoder(resp.Body).Decode(&respJSON) + resp.Body.Close() + require.NoError(t, err) + require.Len(t, respJSON.Warnings, 1, "proxy is healthy") + require.Contains(t, respJSON.Warnings[0], "High availability networking") + + // Deregister the other replica. + wsproxyClient := wsproxysdk.New(api.AccessURL) + wsproxyClient.SetSessionToken(proxy.Options.ProxySessionToken) + err = wsproxyClient.DeregisterWorkspaceProxy(ctx, wsproxysdk.DeregisterWorkspaceProxyRequest{ + ReplicaID: otherReplicaID, + }) + require.NoError(t, err) + + // Force the proxy to re-register immediately. + err = proxy.RegisterNow() + require.NoError(t, err, "failed to force proxy to re-register") + + // Wait for the ping to be skipped. + for { + replicaErr := testutil.RequireRecvCtx(ctx, t, replicaPingErr) + t.Log("replica ping error:", replicaErr) + // Should be empty because there are no more peers. This was where + // the regression was. + if replicaErr == "" { + break + } + } + + // GET /healthz-report + req, err = http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + require.NoError(t, err) + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + err = json.NewDecoder(resp.Body).Decode(&respJSON) + resp.Body.Close() + require.NoError(t, err) + require.Len(t, respJSON.Warnings, 0, "proxy is unhealthy") }) }