From 9cd470a0840b37db2a6282d6adee0d2187bc2d85 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Tue, 20 Feb 2024 07:32:44 +0000 Subject: [PATCH 1/5] chore: add test for workspace proxy derp meshing --- enterprise/coderd/coderdenttest/proxytest.go | 34 ++- enterprise/wsproxy/wsproxy.go | 26 ++- enterprise/wsproxy/wsproxy_test.go | 165 +++++++++++++ enterprise/wsproxy/wsproxysdk/wsproxysdk.go | 234 ++++++++++++------- 4 files changed, 354 insertions(+), 105 deletions(-) diff --git a/enterprise/coderd/coderdenttest/proxytest.go b/enterprise/coderd/coderdenttest/proxytest.go index 9b43cbe6c316d..d3385f2c12034 100644 --- a/enterprise/coderd/coderdenttest/proxytest.go +++ b/enterprise/coderd/coderdenttest/proxytest.go @@ -38,15 +38,24 @@ type ProxyOptions struct { // ProxyURL is optional ProxyURL *url.URL + // Token is optional. If specified, a new proxy won't be created. + Token string + // FlushStats is optional FlushStats chan chan<- struct{} } +type WorkspaceProxy struct { + *wsproxy.Server + + ServerURL *url.URL +} + // NewWorkspaceProxy will configure a wsproxy.Server with the given options. // The new wsproxy will register itself with the given coderd.API instance. // The first user owner client is required to create the wsproxy on the coderd // api server. -func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Client, options *ProxyOptions) *wsproxy.Server { +func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Client, options *ProxyOptions) WorkspaceProxy { ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(cancelFunc) @@ -107,11 +116,15 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie options.Name = namesgenerator.GetRandomName(1) } - proxyRes, err := owner.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ - Name: options.Name, - Icon: "/emojis/flag.png", - }) - require.NoError(t, err, "failed to create workspace proxy") + token := options.Token + if token == "" { + proxyRes, err := owner.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ + Name: options.Name, + Icon: "/emojis/flag.png", + }) + require.NoError(t, err, "failed to create workspace proxy") + token = proxyRes.ProxyToken + } // Inherit collector options from coderd, but keep the wsproxy reporter. statsCollectorOptions := coderdAPI.Options.WorkspaceAppsStatsCollectorOptions @@ -131,14 +144,14 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie Tracing: coderdAPI.TracerProvider, APIRateLimit: coderdAPI.APIRateLimit, SecureAuthCookie: coderdAPI.SecureAuthCookie, - ProxySessionToken: proxyRes.ProxyToken, + ProxySessionToken: token, DisablePathApps: options.DisablePathApps, // We need a new registry to not conflict with the coderd internal // proxy metrics. PrometheusRegistry: prometheus.NewRegistry(), DERPEnabled: !options.DerpDisabled, DERPOnly: options.DerpOnly, - DERPServerRelayAddress: accessURL.String(), + DERPServerRelayAddress: serverURL.String(), StatsCollectorOptions: statsCollectorOptions, }) require.NoError(t, err) @@ -151,5 +164,8 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie handler = wssrv.Handler mutex.Unlock() - return wssrv + return WorkspaceProxy{ + Server: wssrv, + ServerURL: serverURL, + } } diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index 68693f4633871..5d4eca0244e5f 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -128,7 +128,7 @@ type Server struct { ctx context.Context cancel context.CancelFunc derpCloseFunc func() - registerDone <-chan struct{} + registerLoop *wsproxysdk.RegisterWorkspaceProxyLoop } // New creates a new workspace proxy server. This requires a primary coderd @@ -210,7 +210,7 @@ func New(ctx context.Context, opts *Options) (*Server, error) { // goroutine to periodically re-register. replicaID := uuid.New() osHostname := cliutil.Hostname() - regResp, registerDone, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{ + registerLoop, regResp, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{ Logger: opts.Logger, Request: wsproxysdk.RegisterWorkspaceProxyRequest{ AccessURL: opts.AccessURL.String(), @@ -230,12 +230,13 @@ func New(ctx context.Context, opts *Options) (*Server, error) { if err != nil { return nil, xerrors.Errorf("register proxy: %w", err) } - s.registerDone = registerDone - err = s.handleRegister(ctx, regResp) + s.registerLoop = registerLoop + + derpServer.SetMeshKey(regResp.DERPMeshKey) + err = s.handleRegister(regResp) if err != nil { return nil, xerrors.Errorf("handle register: %w", err) } - derpServer.SetMeshKey(regResp.DERPMeshKey) secKey, err := workspaceapps.KeyFromString(regResp.AppSecurityKey) if err != nil { @@ -409,16 +410,16 @@ func New(ctx context.Context, opts *Options) (*Server, error) { return s, nil } +func (s *Server) RegisterNow(ctx context.Context) error { + _, err := s.registerLoop.RegisterNow(ctx) + return err +} + func (s *Server) Close() error { s.cancel() var err error - registerDoneWaitTicker := time.NewTicker(11 * time.Second) // the attempt timeout is 10s - select { - case <-registerDoneWaitTicker.C: - err = multierror.Append(err, xerrors.New("timed out waiting for registerDone")) - case <-s.registerDone: - } + s.registerLoop.Close() s.derpCloseFunc() appServerErr := s.AppServer.Close() if appServerErr != nil { @@ -437,11 +438,12 @@ func (*Server) mutateRegister(_ *wsproxysdk.RegisterWorkspaceProxyRequest) { // package in the primary and update req.ReplicaError accordingly. } -func (s *Server) handleRegister(_ context.Context, res wsproxysdk.RegisterWorkspaceProxyResponse) error { +func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) error { addresses := make([]string, len(res.SiblingReplicas)) for i, replica := range res.SiblingReplicas { addresses[i] = replica.RelayAddress } + s.Logger.Debug(s.ctx, "setting DERP mesh sibling addresses", slog.F("addresses", addresses)) s.derpMesh.SetAddresses(addresses, false) s.latestDERPMap.Store(res.DERPMap) diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 0d440165dfb16..24ebac15c208c 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -3,12 +3,15 @@ package wsproxy_test import ( "fmt" "net" + "net/url" "testing" + "time" "github.com/davecgh/go-spew/spew" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "tailscale.com/derp" "tailscale.com/derp/derphttp" "tailscale.com/tailcfg" "tailscale.com/types/key" @@ -22,6 +25,7 @@ import ( "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/workspaceapps/apptest" "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/enterprise/coderd/coderdenttest" "github.com/coder/coder/v2/enterprise/coderd/license" "github.com/coder/coder/v2/provisioner/echo" @@ -430,6 +434,167 @@ resourceLoop: require.False(t, p2p) } +// TestDERPMesh spawns 10 workspace proxy replicas and tries to connect to a +// single DERP peer via every single one. +func TestDERPMesh(t *testing.T) { + t.Parallel() + + deploymentValues := coderdtest.DeploymentValues(t) + deploymentValues.Experiments = []string{ + "*", + } + + client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ + Options: &coderdtest.Options{ + DeploymentValues: deploymentValues, + AppHostname: "*.primary.test.coder.com", + IncludeProvisionerDaemon: true, + RealIPConfig: &httpmw.RealIPConfig{ + TrustedOrigins: []*net.IPNet{{ + IP: net.ParseIP("127.0.0.1"), + Mask: net.CIDRMask(8, 32), + }}, + TrustedHeaders: []string{ + "CF-Connecting-IP", + }, + }, + }, + LicenseOptions: &coderdenttest.LicenseOptions{ + Features: license.Features{ + codersdk.FeatureWorkspaceProxy: 1, + }, + }, + }) + t.Cleanup(func() { + _ = closer.Close() + }) + + proxyURL, err := url.Parse("https://proxy.test.coder.com") + require.NoError(t, err) + + // Create 10 proxy replicas. + const count = 10 + var ( + sessionToken = "" + proxies = [count]coderdenttest.WorkspaceProxy{} + derpURLs = [count]string{} + ) + for i := range proxies { + proxies[i] = coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + Name: "best-proxy", + Token: sessionToken, + ProxyURL: proxyURL, + }) + if i == 0 { + sessionToken = proxies[i].Options.ProxySessionToken + } + + derpURL := *proxies[i].ServerURL + derpURL.Path = "/derp" + derpURLs[i] = derpURL.String() + } + + // Force all proxies to re-register immediately. This ensures the DERP mesh + // is up-to-date. In production this will happen automatically after about + // 15 seconds. + for i, proxy := range proxies { + err := proxy.RegisterNow(testutil.Context(t, testutil.WaitLong)) + require.NoErrorf(t, err, "failed to force proxy %d to re-register", i) + } + + createClient := func(t *testing.T, name string, derpURL string) (*derphttp.Client, <-chan derp.ReceivedPacket) { + t.Helper() + + client, err := derphttp.NewClient(key.NewNode(), derpURLs[0], func(format string, args ...any) { + t.Logf(name+": "+format, args...) + }) + require.NoError(t, err, "create client") + err = client.Connect(testutil.Context(t, testutil.WaitLong)) + require.NoError(t, err, "connect to DERP server") + + ch := make(chan derp.ReceivedPacket, 64) + go func() { + defer close(ch) + for { + msg, err := client.Recv() + if err != nil { + t.Logf("Recv error: %v", err) + return + } + switch msg := msg.(type) { + case derp.ReceivedPacket: + ch <- msg + return + default: + // We don't care about other messages. + } + } + }() + + return client, ch + } + + sendTest := func(t *testing.T, dstKey key.NodePublic, dstCh <-chan derp.ReceivedPacket, src *derphttp.Client) { + t.Helper() + + const msgStrPrefix = "test_packet_" + msgStr, err := cryptorand.String(64 - len(msgStrPrefix)) + require.NoError(t, err, "generate random msg string") + msg := []byte(msgStrPrefix + msgStr) + + err = src.Send(dstKey, msg) + require.NoError(t, err, "send message via DERP") + + waitCtx := testutil.Context(t, testutil.WaitLong) + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + for { + select { + case pkt := <-dstCh: + assert.Equal(t, src.SelfPublicKey(), pkt.Source, "packet came from wrong source") + assert.Equal(t, msg, pkt.Data, "packet data is wrong") + return + case <-waitCtx.Done(): + t.Fatal("timed out waiting for packet") + return + case <-ticker.C: + } + + // Send another packet. Since we're sending packets immediately + // after opening the clients, they might not be meshed together + // properly yet. + err = src.Send(dstKey, msg) + require.NoError(t, err, "send message via DERP") + } + } + + for i, derpURL := range derpURLs { + i, derpURL := i, derpURL + t.Run(fmt.Sprintf("Proxy%d", i), func(t *testing.T) { + t.Parallel() + t.Logf("derp1=%s, derp2=%s", derpURLs[0], derpURL) + + // Client 1 is always on the first proxy as a "control". + client1, client1Recv := createClient(t, "client1", derpURLs[0]) + t.Cleanup(func() { + _ = client1.Close() + }) + + // Client 2 is on the current proxy. + client2, client2Recv := createClient(t, "client2", derpURL) + t.Cleanup(func() { + _ = client2.Close() + }) + + // Send a packet from client 1 to client 2. + sendTest(t, client2.SelfPublicKey(), client2Recv, client1) + + // Send a packet from client 2 to client 1. + sendTest(t, client1.SelfPublicKey(), client1Recv, client2) + }) + } +} + func TestWorkspaceProxyWorkspaceApps(t *testing.T) { t.Parallel() diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index 1163f7c435001..448c493307d2e 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -277,135 +277,201 @@ type RegisterWorkspaceProxyLoopOpts struct { // called in a blocking manner, so it should avoid blocking for too long. If // the callback returns an error, the loop will stop immediately and the // error will be returned to the FailureFn. - CallbackFn func(ctx context.Context, res RegisterWorkspaceProxyResponse) error + CallbackFn func(res RegisterWorkspaceProxyResponse) error // FailureFn is called with the last error returned from the server if the // context is canceled, registration fails for more than MaxFailureCount, // or if any permanent values in the response change. FailureFn func(err error) } -// RegisterWorkspaceProxyLoop will register the workspace proxy and then start a -// goroutine to keep registering periodically in the background. -// -// The first response is returned immediately, and subsequent responses will be -// notified to the given CallbackFn. When the context is canceled the loop will -// stop immediately and the context error will be returned to the FailureFn. -// -// The returned channel will be closed when the loop stops and can be used to -// ensure the loop is dead before continuing. When a fatal error is encountered, -// the proxy will be deregistered (with the same ReplicaID and AttemptTimeout) -// before calling the FailureFn. -func (c *Client) RegisterWorkspaceProxyLoop(ctx context.Context, opts RegisterWorkspaceProxyLoopOpts) (RegisterWorkspaceProxyResponse, <-chan struct{}, error) { - if opts.Interval == 0 { - opts.Interval = 30 * time.Second - } - if opts.MaxFailureCount == 0 { - opts.MaxFailureCount = 10 - } - if opts.AttemptTimeout == 0 { - opts.AttemptTimeout = 10 * time.Second - } - if opts.MutateFn == nil { - opts.MutateFn = func(_ *RegisterWorkspaceProxyRequest) {} - } - if opts.CallbackFn == nil { - opts.CallbackFn = func(_ context.Context, _ RegisterWorkspaceProxyResponse) error { - return nil - } +type RegisterWorkspaceProxyLoop struct { + opts RegisterWorkspaceProxyLoopOpts + c *Client + originalRes *RegisterWorkspaceProxyResponse + + closedCtx context.Context + close context.CancelFunc + done chan struct{} +} + +// register registers once. It returns the response, whether the loop should +// exit immediately, and any error that occurred. +func (l *RegisterWorkspaceProxyLoop) register(ctx context.Context) (RegisterWorkspaceProxyResponse, bool, error) { + // If it's not the first registration, call the mutate function. + if l.originalRes != nil { + l.mutateFn(&l.opts.Request) } - failureFn := func(err error) { - // We have to use background context here because the original context - // may be canceled. - deregisterCtx, cancel := context.WithTimeout(context.Background(), opts.AttemptTimeout) - defer cancel() - deregisterErr := c.DeregisterWorkspaceProxy(deregisterCtx, DeregisterWorkspaceProxyRequest{ - ReplicaID: opts.Request.ReplicaID, - }) - if deregisterErr != nil { - opts.Logger.Error(ctx, - "failed to deregister workspace proxy with Coder primary (it will be automatically deregistered shortly)", - slog.Error(deregisterErr), - ) - } + registerCtx, registerCancel := context.WithTimeout(ctx, l.opts.AttemptTimeout) + res, err := l.c.RegisterWorkspaceProxy(registerCtx, l.opts.Request) + registerCancel() + if err != nil { + return RegisterWorkspaceProxyResponse{}, false, xerrors.Errorf("register workspace proxy: %w", err) + } + + // Catastrophic failures if any of the "permanent" values change. + if l.originalRes != nil && res.AppSecurityKey != l.originalRes.AppSecurityKey { + return RegisterWorkspaceProxyResponse{}, true, xerrors.New("app security key has changed, proxy must be restarted") + } + if l.originalRes != nil && res.DERPMeshKey != l.originalRes.DERPMeshKey { + return RegisterWorkspaceProxyResponse{}, true, xerrors.New("DERP mesh key has changed, proxy must be restarted") + } + if l.originalRes != nil && res.DERPRegionID != l.originalRes.DERPRegionID { + return RegisterWorkspaceProxyResponse{}, true, xerrors.New("DERP region ID has changed, proxy must be restarted") + } - if opts.FailureFn != nil { - opts.FailureFn(err) + // If it's not the first registration, call the callback function. + if l.originalRes != nil { + err = l.callbackFn(res) + if err != nil { + return RegisterWorkspaceProxyResponse{}, true, xerrors.Errorf("callback err: %w", err) } + } else { + l.originalRes = &res + } + + return res, false, nil +} + +// Start starts the proxy registration loop. The provided context is only used +// for the initial registration. Use Close() to stop. +func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspaceProxyResponse, error) { + if l.opts.Interval == 0 { + l.opts.Interval = 15 * time.Second + } + if l.opts.MaxFailureCount == 0 { + l.opts.MaxFailureCount = 10 + } + if l.opts.AttemptTimeout == 0 { + l.opts.AttemptTimeout = 10 * time.Second } - originalRes, err := c.RegisterWorkspaceProxy(ctx, opts.Request) + originalRes, _, err := l.register(ctx) if err != nil { - return RegisterWorkspaceProxyResponse{}, nil, xerrors.Errorf("register workspace proxy: %w", err) + return RegisterWorkspaceProxyResponse{}, xerrors.Errorf("initial registration: %w", err) } - done := make(chan struct{}) go func() { - defer close(done) + defer close(l.done) var ( failedAttempts = 0 - ticker = time.NewTicker(opts.Interval) + ticker = time.NewTicker(l.opts.Interval) ) for { select { - case <-ctx.Done(): - failureFn(ctx.Err()) + case <-l.closedCtx.Done(): + l.failureFn(xerrors.Errorf("proxy registration loop closed")) return case <-ticker.C: } - opts.Logger.Debug(ctx, + l.opts.Logger.Debug(context.Background(), "re-registering workspace proxy with Coder primary", - slog.F("req", opts.Request), - slog.F("timeout", opts.AttemptTimeout), + slog.F("req", l.opts.Request), + slog.F("timeout", l.opts.AttemptTimeout), slog.F("failed_attempts", failedAttempts), ) - opts.MutateFn(&opts.Request) - registerCtx, cancel := context.WithTimeout(ctx, opts.AttemptTimeout) - res, err := c.RegisterWorkspaceProxy(registerCtx, opts.Request) - cancel() + + _, catastrophicErr, err := l.register(l.closedCtx) if err != nil { + if catastrophicErr { + l.failureFn(err) + return + } failedAttempts++ - opts.Logger.Warn(ctx, + l.opts.Logger.Warn(context.Background(), "failed to re-register workspace proxy with Coder primary", - slog.F("req", opts.Request), - slog.F("timeout", opts.AttemptTimeout), + slog.F("req", l.opts.Request), + slog.F("timeout", l.opts.AttemptTimeout), slog.F("failed_attempts", failedAttempts), slog.Error(err), ) - if failedAttempts > opts.MaxFailureCount { - failureFn(xerrors.Errorf("exceeded re-registration failure count of %d: last error: %w", opts.MaxFailureCount, err)) + if failedAttempts > l.opts.MaxFailureCount { + l.failureFn(xerrors.Errorf("exceeded re-registration failure count of %d: last error: %w", l.opts.MaxFailureCount, err)) return } continue } failedAttempts = 0 - if res.AppSecurityKey != originalRes.AppSecurityKey { - failureFn(xerrors.New("app security key has changed, proxy must be restarted")) - return - } - if res.DERPMeshKey != originalRes.DERPMeshKey { - failureFn(xerrors.New("DERP mesh key has changed, proxy must be restarted")) - return - } - if res.DERPRegionID != originalRes.DERPRegionID { - failureFn(xerrors.New("DERP region ID has changed, proxy must be restarted")) - } - - err = opts.CallbackFn(ctx, res) - if err != nil { - failureFn(xerrors.Errorf("callback fn returned error: %w", err)) - return - } - - ticker.Reset(opts.Interval) + ticker.Reset(l.opts.Interval) } }() - return originalRes, done, nil + return originalRes, nil +} + +func (l *RegisterWorkspaceProxyLoop) RegisterNow(ctx context.Context) (RegisterWorkspaceProxyResponse, error) { + // Catastrophic failures don't affect the loop when manually re-registering. + res, _, err := l.register(ctx) + return res, err +} + +func (l *RegisterWorkspaceProxyLoop) Close() { + l.close() + <-l.done +} + +func (l *RegisterWorkspaceProxyLoop) mutateFn(req *RegisterWorkspaceProxyRequest) { + if l.opts.MutateFn != nil { + l.opts.MutateFn(req) + } +} + +func (l *RegisterWorkspaceProxyLoop) callbackFn(res RegisterWorkspaceProxyResponse) error { + if l.opts.CallbackFn != nil { + return l.opts.CallbackFn(res) + } + return nil +} + +func (l *RegisterWorkspaceProxyLoop) failureFn(err error) { + // We have to use background context here because the original context may + // be canceled. + deregisterCtx, cancel := context.WithTimeout(context.Background(), l.opts.AttemptTimeout) + defer cancel() + deregisterErr := l.c.DeregisterWorkspaceProxy(deregisterCtx, DeregisterWorkspaceProxyRequest{ + ReplicaID: l.opts.Request.ReplicaID, + }) + if deregisterErr != nil { + l.opts.Logger.Error(context.Background(), + "failed to deregister workspace proxy with Coder primary (it will be automatically deregistered shortly)", + slog.Error(deregisterErr), + ) + } + + if l.opts.FailureFn != nil { + l.opts.FailureFn(err) + } +} + +// RegisterWorkspaceProxyLoop will register the workspace proxy and then start a +// goroutine to keep registering periodically in the background. +// +// The first response is returned immediately, and subsequent responses will be +// notified to the given CallbackFn. When the loop is Close()d it will stop +// immediately and an error will be returned to the FailureFn. +// +// When a fatal error is encountered (or the proxy is closed), the proxy will be +// deregistered (with the same ReplicaID and AttemptTimeout) before calling the +// FailureFn. +func (c *Client) RegisterWorkspaceProxyLoop(ctx context.Context, opts RegisterWorkspaceProxyLoopOpts) (*RegisterWorkspaceProxyLoop, RegisterWorkspaceProxyResponse, error) { + closedCtx, closeFn := context.WithCancel(context.Background()) + loop := &RegisterWorkspaceProxyLoop{ + opts: opts, + c: c, + closedCtx: closedCtx, + close: closeFn, + done: make(chan struct{}), + } + + regResp, err := loop.Start(ctx) + if err != nil { + return nil, RegisterWorkspaceProxyResponse{}, xerrors.Errorf("start loop: %w", err) + } + return loop, regResp, nil } type CoordinateMessageType int From a754acfe8b9c8d1ca79c5793859d5637d2d11a85 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Tue, 20 Feb 2024 07:46:06 +0000 Subject: [PATCH 2/5] Reduce replicas to 6 in test, test all paths --- enterprise/wsproxy/wsproxy_test.go | 38 +++++++++++++++++------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 24ebac15c208c..77ebf621575aa 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -434,7 +434,7 @@ resourceLoop: require.False(t, p2p) } -// TestDERPMesh spawns 10 workspace proxy replicas and tries to connect to a +// TestDERPMesh spawns 6 workspace proxy replicas and tries to connect to a // single DERP peer via every single one. func TestDERPMesh(t *testing.T) { t.Parallel() @@ -472,8 +472,8 @@ func TestDERPMesh(t *testing.T) { proxyURL, err := url.Parse("https://proxy.test.coder.com") require.NoError(t, err) - // Create 10 proxy replicas. - const count = 10 + // Create 6 proxy replicas. + const count = 6 var ( sessionToken = "" proxies = [count]coderdenttest.WorkspaceProxy{} @@ -509,6 +509,9 @@ func TestDERPMesh(t *testing.T) { t.Logf(name+": "+format, args...) }) require.NoError(t, err, "create client") + t.Cleanup(func() { + _ = client.Close() + }) err = client.Connect(testutil.Context(t, testutil.WaitLong)) require.NoError(t, err, "connect to DERP server") @@ -568,23 +571,26 @@ func TestDERPMesh(t *testing.T) { } } + // Generate cases. We have a case for: + // - Each proxy to itself. + // - Each proxy to each other proxy (one way, no duplicates). + cases := [][2]string{} for i, derpURL := range derpURLs { - i, derpURL := i, derpURL + cases = append(cases, [2]string{derpURL, derpURL}) + for j := i + 1; j < len(derpURLs); j++ { + cases = append(cases, [2]string{derpURL, derpURLs[j]}) + } + } + require.Len(t, cases, (count*(count+1))/2) // triangle number + + for i, c := range cases { + i, c := i, c t.Run(fmt.Sprintf("Proxy%d", i), func(t *testing.T) { t.Parallel() - t.Logf("derp1=%s, derp2=%s", derpURLs[0], derpURL) - - // Client 1 is always on the first proxy as a "control". - client1, client1Recv := createClient(t, "client1", derpURLs[0]) - t.Cleanup(func() { - _ = client1.Close() - }) - // Client 2 is on the current proxy. - client2, client2Recv := createClient(t, "client2", derpURL) - t.Cleanup(func() { - _ = client2.Close() - }) + t.Logf("derp1=%s, derp2=%s", c[0], c[1]) + client1, client1Recv := createClient(t, "client1", c[0]) + client2, client2Recv := createClient(t, "client2", c[1]) // Send a packet from client 1 to client 2. sendTest(t, client2.SelfPublicKey(), client2Recv, client1) From 79a79e91139dd20ef391ed3215e36ffc41ab52a0 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Tue, 20 Feb 2024 09:45:23 +0000 Subject: [PATCH 3/5] feat: add derp mesh health checking in workspace proxies --- enterprise/cli/proxyserver.go | 9 +- enterprise/coderd/coderd.go | 26 +-- enterprise/coderd/coderdenttest/proxytest.go | 4 + enterprise/replicasync/replicasync.go | 54 ++++- enterprise/replicasync/replicasync_test.go | 2 +- enterprise/wsproxy/wsproxy.go | 161 +++++++++++--- enterprise/wsproxy/wsproxy_test.go | 218 +++++++++++++++++++ 7 files changed, 406 insertions(+), 68 deletions(-) diff --git a/enterprise/cli/proxyserver.go b/enterprise/cli/proxyserver.go index 9ac59735b120d..37e90ce56cbe7 100644 --- a/enterprise/cli/proxyserver.go +++ b/enterprise/cli/proxyserver.go @@ -244,7 +244,7 @@ func (r *RootCmd) proxyServer() *clibase.Cmd { closers.Add(closeFunc) } - proxy, err := wsproxy.New(ctx, &wsproxy.Options{ + options := &wsproxy.Options{ Logger: logger, Experiments: coderd.ReadExperiments(logger, cfg.Experiments.Value()), HTTPClient: httpClient, @@ -263,7 +263,12 @@ func (r *RootCmd) proxyServer() *clibase.Cmd { DERPEnabled: cfg.DERP.Server.Enable.Value(), DERPOnly: derpOnly.Value(), DERPServerRelayAddress: cfg.DERP.Server.RelayURL.String(), - }) + } + if httpServers.TLSConfig != nil { + options.TLSCertificates = httpServers.TLSConfig.Certificates + } + + proxy, err := wsproxy.New(ctx, options) if err != nil { return xerrors.Errorf("create workspace proxy: %w", err) } diff --git a/enterprise/coderd/coderd.go b/enterprise/coderd/coderd.go index 95611f671d545..066679db70fab 100644 --- a/enterprise/coderd/coderd.go +++ b/enterprise/coderd/coderd.go @@ -3,8 +3,6 @@ package coderd import ( "context" "crypto/ed25519" - "crypto/tls" - "crypto/x509" "fmt" "math" "net/http" @@ -367,27 +365,9 @@ func New(ctx context.Context, options *Options) (_ *API, err error) { }) } - meshRootCA := x509.NewCertPool() - for _, certificate := range options.TLSCertificates { - for _, certificatePart := range certificate.Certificate { - certificate, err := x509.ParseCertificate(certificatePart) - if err != nil { - return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err) - } - meshRootCA.AddCert(certificate) - } - } - // This TLS configuration spoofs access from the access URL hostname - // assuming that the certificates provided will cover that hostname. - // - // Replica sync and DERP meshing require accessing replicas via their - // internal IP addresses, and if TLS is configured we use the same - // certificates. - meshTLSConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, - Certificates: options.TLSCertificates, - RootCAs: meshRootCA, - ServerName: options.AccessURL.Hostname(), + meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(options.AccessURL.Hostname(), options.TLSCertificates) + if err != nil { + return nil, xerrors.Errorf("create DERP mesh TLS config: %w", err) } api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{ ID: api.AGPL.ID, diff --git a/enterprise/coderd/coderdenttest/proxytest.go b/enterprise/coderd/coderdenttest/proxytest.go index d3385f2c12034..98dc20dd37186 100644 --- a/enterprise/coderd/coderdenttest/proxytest.go +++ b/enterprise/coderd/coderdenttest/proxytest.go @@ -41,6 +41,9 @@ type ProxyOptions struct { // Token is optional. If specified, a new proxy won't be created. Token string + // ReplicaPingCallback is optional. + ReplicaPingCallback func(replicas []codersdk.Replica, err string) + // FlushStats is optional FlushStats chan chan<- struct{} } @@ -152,6 +155,7 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie DERPEnabled: !options.DerpDisabled, DERPOnly: options.DerpOnly, DERPServerRelayAddress: serverURL.String(), + ReplicaErrCallback: options.ReplicaPingCallback, StatsCollectorOptions: statsCollectorOptions, }) require.NoError(t, err) diff --git a/enterprise/replicasync/replicasync.go b/enterprise/replicasync/replicasync.go index b7e46b99f6124..122eb5c1faa17 100644 --- a/enterprise/replicasync/replicasync.go +++ b/enterprise/replicasync/replicasync.go @@ -3,6 +3,7 @@ package replicasync import ( "context" "crypto/tls" + "crypto/x509" "database/sql" "errors" "fmt" @@ -272,31 +273,41 @@ func (m *Manager) syncReplicas(ctx context.Context) error { wg.Add(1) go func(peer database.Replica) { defer wg.Done() + + fail := func(err error) { + mu.Lock() + failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err)) + mu.Unlock() + m.logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", + slog.F("replica_hostname", peer.Hostname), + slog.F("replica_relay_address", peer.RelayAddress), + slog.Error(err), + ) + } + ra, err := url.Parse(peer.RelayAddress) if err != nil { - m.logger.Warn(ctx, "could not parse relay address", - slog.F("relay_address", peer.RelayAddress), slog.Error(err)) + fail(xerrors.Errorf("parse relay address %q: %w", peer.RelayAddress, err)) return } target, err := ra.Parse("/derp/latency-check") if err != nil { - m.logger.Warn(ctx, "could not resolve /derp/latency-check endpoint", - slog.F("relay_address", peer.RelayAddress), slog.Error(err)) + fail(xerrors.Errorf("parse latency-check URL: %w", err)) return } req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil) if err != nil { - m.logger.Warn(ctx, "create http request for relay probe", - slog.F("relay_address", peer.RelayAddress), slog.Error(err)) + fail(xerrors.Errorf("create request: %w", err)) return } res, err := client.Do(req) if err != nil { - mu.Lock() - failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err)) - mu.Unlock() + fail(xerrors.Errorf("do probe: %w", err)) return } + if res.StatusCode != http.StatusOK { + fail(xerrors.Errorf("unexpected status code: %d", res.StatusCode)) + } _ = res.Body.Close() }(peer) } @@ -466,3 +477,28 @@ func (m *Manager) Close() error { } return nil } + +func CreateDERPMeshTLSConfig(hostname string, tlsCertificates []tls.Certificate) (*tls.Config, error) { + meshRootCA := x509.NewCertPool() + for _, certificate := range tlsCertificates { + for _, certificatePart := range certificate.Certificate { + certificate, err := x509.ParseCertificate(certificatePart) + if err != nil { + return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err) + } + meshRootCA.AddCert(certificate) + } + } + // This TLS configuration spoofs access from the access URL hostname + // assuming that the certificates provided will cover that hostname. + // + // Replica sync and DERP meshing require accessing replicas via their + // internal IP addresses, and if TLS is configured we use the same + // certificates. + return &tls.Config{ + MinVersion: tls.VersionTLS12, + Certificates: tlsCertificates, + RootCAs: meshRootCA, + ServerName: hostname, + }, nil +} diff --git a/enterprise/replicasync/replicasync_test.go b/enterprise/replicasync/replicasync_test.go index 600eb839e28bf..f04ba9ccecc31 100644 --- a/enterprise/replicasync/replicasync_test.go +++ b/enterprise/replicasync/replicasync_test.go @@ -286,7 +286,7 @@ func (d *derpyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { d.Add(1) return } - w.WriteHeader(http.StatusUpgradeRequired) + w.WriteHeader(http.StatusOK) } func (d *derpyHandler) requireOnlyDERPPaths(t *testing.T) { diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index 5d4eca0244e5f..fa46ef8fab7b3 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -3,14 +3,15 @@ package wsproxy import ( "context" "crypto/tls" - "crypto/x509" "errors" "fmt" "net/http" "net/url" "reflect" "regexp" + "slices" "strings" + "sync" "sync/atomic" "time" @@ -19,6 +20,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/singleflight" "golang.org/x/xerrors" "tailscale.com/derp" "tailscale.com/derp/derphttp" @@ -35,6 +37,7 @@ import ( "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/enterprise/derpmesh" + "github.com/coder/coder/v2/enterprise/replicasync" "github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk" "github.com/coder/coder/v2/site" "github.com/coder/coder/v2/tailnet" @@ -76,6 +79,10 @@ type Options struct { // provide access to workspace apps/terminal. DERPOnly bool + // ReplicaErrCallback is called when the proxy replica successfully or + // unsuccessfully pings it's peers in the mesh. + ReplicaErrCallback func(replicas []codersdk.Replica, err string) + ProxySessionToken string // AllowAllCors will set all CORs headers to '*'. // By default, CORs is set to accept external requests @@ -121,8 +128,12 @@ type Server struct { SDKClient *wsproxysdk.Client // DERP - derpMesh *derpmesh.Mesh - latestDERPMap atomic.Pointer[tailcfg.DERPMap] + derpMesh *derpmesh.Mesh + derpMeshTLSConfig *tls.Config + replicaPingSingleflight singleflight.Group + replicaErrMut sync.Mutex + replicaErr string + latestDERPMap atomic.Pointer[tailcfg.DERPMap] // Used for graceful shutdown. Required for the dialer. ctx context.Context @@ -166,29 +177,10 @@ func New(ctx context.Context, opts *Options) (*Server, error) { return nil, xerrors.Errorf("%q is a workspace proxy, not a primary coderd instance", opts.DashboardURL) } - meshRootCA := x509.NewCertPool() - for _, certificate := range opts.TLSCertificates { - for _, certificatePart := range certificate.Certificate { - certificate, err := x509.ParseCertificate(certificatePart) - if err != nil { - return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err) - } - meshRootCA.AddCert(certificate) - } - } - // This TLS configuration spoofs access from the access URL hostname - // assuming that the certificates provided will cover that hostname. - // - // Replica sync and DERP meshing require accessing replicas via their - // internal IP addresses, and if TLS is configured we use the same - // certificates. - meshTLSConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, - Certificates: opts.TLSCertificates, - RootCAs: meshRootCA, - ServerName: opts.AccessURL.Hostname(), + meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(opts.AccessURL.Hostname(), opts.TLSCertificates) + if err != nil { + return nil, xerrors.Errorf("create DERP mesh tls config: %w", err) } - derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(opts.Logger.Named("net.derp"))) ctx, cancel := context.WithCancel(context.Background()) @@ -202,14 +194,13 @@ func New(ctx context.Context, opts *Options) (*Server, error) { PrometheusRegistry: opts.PrometheusRegistry, SDKClient: client, derpMesh: derpmesh.New(opts.Logger.Named("net.derpmesh"), derpServer, meshTLSConfig), + derpMeshTLSConfig: meshTLSConfig, ctx: ctx, cancel: cancel, } // Register the workspace proxy with the primary coderd instance and start a // goroutine to periodically re-register. - replicaID := uuid.New() - osHostname := cliutil.Hostname() registerLoop, regResp, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{ Logger: opts.Logger, Request: wsproxysdk.RegisterWorkspaceProxyRequest{ @@ -217,8 +208,8 @@ func New(ctx context.Context, opts *Options) (*Server, error) { WildcardHostname: opts.AppHostname, DerpEnabled: opts.DERPEnabled, DerpOnly: opts.DERPOnly, - ReplicaID: replicaID, - ReplicaHostname: osHostname, + ReplicaID: uuid.New(), + ReplicaHostname: cliutil.Hostname(), ReplicaError: "", ReplicaRelayAddress: opts.DERPServerRelayAddress, Version: buildinfo.Version(), @@ -433,9 +424,10 @@ func (s *Server) Close() error { return err } -func (*Server) mutateRegister(_ *wsproxysdk.RegisterWorkspaceProxyRequest) { - // TODO: we should probably ping replicas similarly to the replicasync - // package in the primary and update req.ReplicaError accordingly. +func (s *Server) mutateRegister(req *wsproxysdk.RegisterWorkspaceProxyRequest) { + s.replicaErrMut.Lock() + defer s.replicaErrMut.Unlock() + req.ReplicaError = s.replicaErr } func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) error { @@ -448,9 +440,106 @@ func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) e s.latestDERPMap.Store(res.DERPMap) + go s.pingSiblingReplicas(res.SiblingReplicas) return nil } +func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { + if len(replicas) == 0 { + return + } + + // Avoid pinging multiple times at once if the list hasn't changed. + relayURLs := make([]string, len(replicas)) + for i, r := range replicas { + relayURLs[i] = r.RelayAddress + } + slices.Sort(relayURLs) + singleflightStr := strings.Join(relayURLs, ",") + + //nolint:dogsled + _, _, _ = s.replicaPingSingleflight.Do(singleflightStr, func() (any, error) { + const ( + perReplicaTimeout = 3 * time.Second + fullTimeout = 10 * time.Second + ) + ctx, cancel := context.WithTimeout(s.ctx, fullTimeout) + defer cancel() + + client := http.Client{ + Timeout: perReplicaTimeout, + Transport: &http.Transport{ + TLSClientConfig: s.derpMeshTLSConfig, + DisableKeepAlives: true, + }, + } + defer client.CloseIdleConnections() + + var ( + wg sync.WaitGroup + mu sync.Mutex + failed = []string{} + ) + for _, peer := range replicas { + wg.Add(1) + go func(peer codersdk.Replica) { + defer wg.Done() + + fail := func(err error) { + mu.Lock() + failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err)) + mu.Unlock() + s.Logger.Warn(s.ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", + slog.F("replica_hostname", peer.Hostname), + slog.F("replica_relay_address", peer.RelayAddress), + slog.Error(err), + ) + } + + ra, err := url.Parse(peer.RelayAddress) + if err != nil { + fail(xerrors.Errorf("parse relay address %q: %w", peer.RelayAddress, err)) + return + } + target, err := ra.Parse("/derp/latency-check") + if err != nil { + fail(xerrors.Errorf("parse latency-check URL: %w", err)) + return + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil) + if err != nil { + fail(xerrors.Errorf("create request: %w", err)) + return + } + res, err := client.Do(req) + if err != nil { + fail(xerrors.Errorf("do probe: %w", err)) + return + } + if res.StatusCode != http.StatusOK { + fail(xerrors.Errorf("unexpected status code: %d", res.StatusCode)) + } + _ = res.Body.Close() + }(peer) + } + wg.Wait() + + s.replicaErrMut.Lock() + defer s.replicaErrMut.Unlock() + s.replicaErr = "" + if len(failed) > 0 { + s.replicaErr = fmt.Sprintf("Failed to dial peers: %s", strings.Join(failed, ", ")) + } + if s.Options.ReplicaErrCallback != nil { + s.Options.ReplicaErrCallback(replicas, s.replicaErr) + } + + //nolint:nilnil // we don't actually use the return value of the + // singleflight here + return nil, nil + }) +} + func (s *Server) handleRegisterFailure(err error) { if s.ctx.Err() != nil { return @@ -519,8 +608,14 @@ func (s *Server) healthReport(rw http.ResponseWriter, r *http.Request) { fmt.Sprintf("version mismatch: primary coderd (%s) != workspace proxy (%s)", primaryBuild.Version, buildinfo.Version())) } + s.replicaErrMut.Lock() + if s.replicaErr != "" { + report.Errors = append(report.Errors, "High availability networking: it appears you are running more than one replica of the proxy, but the replicas are unable to establish a mesh for networking: "+s.replicaErr) + } + s.replicaErrMut.Unlock() + // TODO: We should hit the deployment config endpoint and do some config - // checks. We can check the version from the X-CODER-BUILD-VERSION header + // checks. httpapi.Write(r.Context(), rw, http.StatusOK, report) } diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 77ebf621575aa..da95205c5c466 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -1,8 +1,11 @@ package wsproxy_test import ( + "encoding/json" "fmt" "net" + "net/http" + "net/http/httptest" "net/url" "testing" "time" @@ -19,6 +22,7 @@ import ( "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/agent/agenttest" + "github.com/coder/coder/v2/buildinfo" "github.com/coder/coder/v2/cli/clibase" "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/healthcheck/derphealth" @@ -28,6 +32,7 @@ import ( "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/enterprise/coderd/coderdenttest" "github.com/coder/coder/v2/enterprise/coderd/license" + "github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk" "github.com/coder/coder/v2/provisioner/echo" "github.com/coder/coder/v2/testutil" ) @@ -601,6 +606,219 @@ func TestDERPMesh(t *testing.T) { } } +// TestWorkspaceProxyDERPMeshProbe ensures that each replica pings every other +// replica in the same region as itself periodically. +func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { + t.Parallel() + + deploymentValues := coderdtest.DeploymentValues(t) + deploymentValues.Experiments = []string{ + "*", + } + + client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ + Options: &coderdtest.Options{ + DeploymentValues: deploymentValues, + AppHostname: "*.primary.test.coder.com", + IncludeProvisionerDaemon: true, + RealIPConfig: &httpmw.RealIPConfig{ + TrustedOrigins: []*net.IPNet{{ + IP: net.ParseIP("127.0.0.1"), + Mask: net.CIDRMask(8, 32), + }}, + TrustedHeaders: []string{ + "CF-Connecting-IP", + }, + }, + }, + LicenseOptions: &coderdenttest.LicenseOptions{ + Features: license.Features{ + codersdk.FeatureWorkspaceProxy: 1, + }, + }, + }) + t.Cleanup(func() { + _ = closer.Close() + }) + + createProxyRegion := func(t *testing.T, name string) codersdk.UpdateWorkspaceProxyResponse { + t.Helper() + ctx := testutil.Context(t, testutil.WaitLong) + proxyRes, err := client.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ + Name: name, + Icon: "/emojis/flag.png", + }) + require.NoError(t, err, "failed to create workspace proxy") + return proxyRes + } + + registerBrokenProxy := func(t *testing.T, accessURL, token string) { + t.Helper() + // Create a HTTP server that always replies with 500. + srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + rw.WriteHeader(http.StatusInternalServerError) + })) + t.Cleanup(srv.Close) + + // Register a proxy. + wsproxyClient := wsproxysdk.New(api.AccessURL) + wsproxyClient.SetSessionToken(token) + ctx := testutil.Context(t, testutil.WaitLong) + + hostname, err := cryptorand.String(6) + require.NoError(t, err) + _, err = wsproxyClient.RegisterWorkspaceProxy(ctx, wsproxysdk.RegisterWorkspaceProxyRequest{ + AccessURL: accessURL, + WildcardHostname: "", + DerpEnabled: true, + DerpOnly: false, + ReplicaID: uuid.New(), + ReplicaHostname: hostname, + ReplicaError: "", + ReplicaRelayAddress: srv.URL, + Version: buildinfo.Version(), + }) + require.NoError(t, err) + } + + t.Run("ProbeOK", func(t *testing.T) { + t.Parallel() + + // Register but don't start a proxy in a different region. This + // shouldn't affect the mesh since it's in a different region. + fakeProxyRes := createProxyRegion(t, "fake-proxy") + registerBrokenProxy(t, "https://fake-proxy.test.coder.com", fakeProxyRes.ProxyToken) + + proxyURL, err := url.Parse("https://proxy1.test.coder.com") + require.NoError(t, err) + + // Create 6 proxy replicas. + const count = 6 + var ( + sessionToken = "" + proxies = [count]coderdenttest.WorkspaceProxy{} + replicaPingDone = [count]bool{} + ) + for i := range proxies { + i := i + proxies[i] = coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + Name: "proxy-1", + Token: sessionToken, + ProxyURL: proxyURL, + ReplicaPingCallback: func(replicas []codersdk.Replica, err string) { + if len(replicas) != count-1 { + // Still warming up... + return + } + replicaPingDone[i] = true + assert.Emptyf(t, err, "replica %d ping callback error", i) + }, + }) + if i == 0 { + sessionToken = proxies[i].Options.ProxySessionToken + } + } + + // Force all proxies to re-register immediately. This ensures the DERP + // mesh is up-to-date. In production this will happen automatically + // after about 15 seconds. + for i, proxy := range proxies { + err := proxy.RegisterNow(testutil.Context(t, testutil.WaitLong)) + require.NoErrorf(t, err, "failed to force proxy %d to re-register", i) + } + + // Ensure that all proxies have pinged. + require.Eventually(t, func() bool { + ok := true + for i := range proxies { + if !replicaPingDone[i] { + t.Logf("replica %d has not pinged yet", i) + ok = false + } + } + return ok + }, testutil.WaitLong, testutil.IntervalSlow) + t.Log("all replicas have pinged") + + // Check they're all healthy according to /healthz-report. + for _, proxy := range proxies { + // GET /healthz-report + ctx := testutil.Context(t, testutil.WaitLong) + u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"}) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + var respJSON codersdk.ProxyHealthReport + err = json.NewDecoder(resp.Body).Decode(&respJSON) + resp.Body.Close() + require.NoError(t, err) + + assert.Empty(t, respJSON.Errors, "proxy is not healthy") + } + }) + + // Register one proxy, then pretend to register 5 others. This should cause + // the mesh to fail and return an error. + t.Run("ProbeFail", func(t *testing.T) { + t.Parallel() + + proxyURL, err := url.Parse("https://proxy2.test.coder.com") + require.NoError(t, err) + + // Create 1 real proxy replica. + const fakeCount = 5 + replicaPingErr := make(chan string, 4) + proxy := coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + Name: "proxy-2", + ProxyURL: proxyURL, + ReplicaPingCallback: func(replicas []codersdk.Replica, err string) { + if len(replicas) != fakeCount { + // Still warming up... + return + } + replicaPingErr <- err + }, + }) + + // Register (but don't start) 5 other proxies in the same region. Since + // they registered recently they should be included in the mesh (but + // will fail as the server always responds with 500). + for i := 0; i < fakeCount; i++ { + registerBrokenProxy(t, proxyURL.String(), proxy.Options.ProxySessionToken) + } + + // Force the proxy to re-register immediately. + err = proxy.RegisterNow(testutil.Context(t, testutil.WaitLong)) + require.NoError(t, err, "failed to force proxy to re-register") + + // Wait for the ping to fail. + select { + case err := <-replicaPingErr: + require.NotEmpty(t, err, "replica ping error") + case <-testutil.Context(t, testutil.WaitLong).Done(): + t.Fatal("timed out waiting for replica ping error") + } + + // GET /healthz-report + ctx := testutil.Context(t, testutil.WaitLong) + u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"}) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + var respJSON codersdk.ProxyHealthReport + err = json.NewDecoder(resp.Body).Decode(&respJSON) + resp.Body.Close() + require.NoError(t, err) + + require.Len(t, respJSON.Errors, 1, "proxy is healthy") + require.Contains(t, respJSON.Errors[0], "High availability networking") + }) +} + func TestWorkspaceProxyWorkspaceApps(t *testing.T) { t.Parallel() From 45791ae46ca34547d4e9db94e0424013049feb94 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Fri, 23 Feb 2024 11:50:57 +0000 Subject: [PATCH 4/5] PR comments --- enterprise/replicasync/replicasync.go | 109 ++++++++++++++------------ enterprise/wsproxy/wsproxy.go | 61 ++++---------- 2 files changed, 75 insertions(+), 95 deletions(-) diff --git a/enterprise/replicasync/replicasync.go b/enterprise/replicasync/replicasync.go index 122eb5c1faa17..aa8adb3a04ad5 100644 --- a/enterprise/replicasync/replicasync.go +++ b/enterprise/replicasync/replicasync.go @@ -266,55 +266,35 @@ func (m *Manager) syncReplicas(ctx context.Context) error { }, } defer client.CloseIdleConnections() - var wg sync.WaitGroup - var mu sync.Mutex - failed := make([]string, 0) - for _, peer := range m.Regional() { - wg.Add(1) - go func(peer database.Replica) { - defer wg.Done() - fail := func(err error) { - mu.Lock() - failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err)) - mu.Unlock() + peers := m.Regional() + errs := make(chan error, len(peers)) + for _, peer := range peers { + go func(peer database.Replica) { + err := PingPeerReplica(ctx, client, peer.RelayAddress) + if err != nil { + errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err) m.logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", slog.F("replica_hostname", peer.Hostname), slog.F("replica_relay_address", peer.RelayAddress), slog.Error(err), ) - } - - ra, err := url.Parse(peer.RelayAddress) - if err != nil { - fail(xerrors.Errorf("parse relay address %q: %w", peer.RelayAddress, err)) - return - } - target, err := ra.Parse("/derp/latency-check") - if err != nil { - fail(xerrors.Errorf("parse latency-check URL: %w", err)) return } - req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil) - if err != nil { - fail(xerrors.Errorf("create request: %w", err)) - return - } - res, err := client.Do(req) - if err != nil { - fail(xerrors.Errorf("do probe: %w", err)) - return - } - if res.StatusCode != http.StatusOK { - fail(xerrors.Errorf("unexpected status code: %d", res.StatusCode)) - } - _ = res.Body.Close() + errs <- nil }(peer) } - wg.Wait() + + replicaErrs := make([]string, 0, len(peers)) + for i := 0; i < len(peers); i++ { + err := <-errs + if err != nil { + replicaErrs = append(replicaErrs, err.Error()) + } + } replicaError := "" - if len(failed) > 0 { - replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(failed, ", ")) + if len(replicaErrs) > 0 { + replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", ")) } databaseLatency, err := m.db.Ping(ctx) @@ -374,6 +354,32 @@ func (m *Manager) syncReplicas(ctx context.Context) error { return nil } +// PingPeerReplica pings a peer replica over it's internal relay address to +// ensure it's reachable and alive for health purposes. +func PingPeerReplica(ctx context.Context, client http.Client, relayAddress string) error { + ra, err := url.Parse(relayAddress) + if err != nil { + return xerrors.Errorf("parse relay address %q: %w", relayAddress, err) + } + target, err := ra.Parse("/derp/latency-check") + if err != nil { + return xerrors.Errorf("parse latency-check URL: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil) + if err != nil { + return xerrors.Errorf("create request: %w", err) + } + res, err := client.Do(req) + if err != nil { + return xerrors.Errorf("do probe: %w", err) + } + _ = res.Body.Close() + if res.StatusCode != http.StatusOK { + return xerrors.Errorf("unexpected status code: %d", res.StatusCode) + } + return nil +} + // Self represents the current replica. func (m *Manager) Self() database.Replica { m.mutex.Lock() @@ -478,27 +484,28 @@ func (m *Manager) Close() error { return nil } +// CreateDERPMeshTLSConfig creates a TLS configuration for connecting to peers +// in the DERP mesh over private networking. It overrides the ServerName to be +// the expected public hostname of the peer, and trusts all of the TLS server +// certificates used by this replica (as we expect all replicas to use the same +// TLS certificates). func CreateDERPMeshTLSConfig(hostname string, tlsCertificates []tls.Certificate) (*tls.Config, error) { meshRootCA := x509.NewCertPool() for _, certificate := range tlsCertificates { for _, certificatePart := range certificate.Certificate { - certificate, err := x509.ParseCertificate(certificatePart) + parsedCert, err := x509.ParseCertificate(certificatePart) if err != nil { - return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err) + return nil, xerrors.Errorf("parse certificate %s: %w", parsedCert.Subject.CommonName, err) } - meshRootCA.AddCert(certificate) + meshRootCA.AddCert(parsedCert) } } - // This TLS configuration spoofs access from the access URL hostname - // assuming that the certificates provided will cover that hostname. - // - // Replica sync and DERP meshing require accessing replicas via their - // internal IP addresses, and if TLS is configured we use the same - // certificates. + + // This TLS configuration trusts the built-in TLS certificates and forces + // the server name to be the public hostname. return &tls.Config{ - MinVersion: tls.VersionTLS12, - Certificates: tlsCertificates, - RootCAs: meshRootCA, - ServerName: hostname, + MinVersion: tls.VersionTLS12, + RootCAs: meshRootCA, + ServerName: hostname, }, nil } diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index fa46ef8fab7b3..2dfc2c9b67f4c 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -80,7 +80,7 @@ type Options struct { DERPOnly bool // ReplicaErrCallback is called when the proxy replica successfully or - // unsuccessfully pings it's peers in the mesh. + // unsuccessfully pings its peers in the mesh. ReplicaErrCallback func(replicas []codersdk.Replica, err string) ProxySessionToken string @@ -455,7 +455,7 @@ func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { relayURLs[i] = r.RelayAddress } slices.Sort(relayURLs) - singleflightStr := strings.Join(relayURLs, ",") + singleflightStr := strings.Join(relayURLs, " ") // URLs can't contain spaces. //nolint:dogsled _, _, _ = s.replicaPingSingleflight.Do(singleflightStr, func() (any, error) { @@ -475,60 +475,33 @@ func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { } defer client.CloseIdleConnections() - var ( - wg sync.WaitGroup - mu sync.Mutex - failed = []string{} - ) + errs := make(chan error, len(replicas)) for _, peer := range replicas { - wg.Add(1) go func(peer codersdk.Replica) { - defer wg.Done() - - fail := func(err error) { - mu.Lock() - failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err)) - mu.Unlock() - s.Logger.Warn(s.ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", + err := replicasync.PingPeerReplica(ctx, client, peer.RelayAddress) + if err != nil { + errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err) + s.Logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown", slog.F("replica_hostname", peer.Hostname), slog.F("replica_relay_address", peer.RelayAddress), slog.Error(err), ) - } - - ra, err := url.Parse(peer.RelayAddress) - if err != nil { - fail(xerrors.Errorf("parse relay address %q: %w", peer.RelayAddress, err)) - return - } - target, err := ra.Parse("/derp/latency-check") - if err != nil { - fail(xerrors.Errorf("parse latency-check URL: %w", err)) return } - req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil) - if err != nil { - fail(xerrors.Errorf("create request: %w", err)) - return - } - res, err := client.Do(req) - if err != nil { - fail(xerrors.Errorf("do probe: %w", err)) - return - } - if res.StatusCode != http.StatusOK { - fail(xerrors.Errorf("unexpected status code: %d", res.StatusCode)) - } - _ = res.Body.Close() + errs <- nil }(peer) } - wg.Wait() - s.replicaErrMut.Lock() - defer s.replicaErrMut.Unlock() + replicaErrs := make([]string, 0, len(replicas)) + for i := 0; i < len(replicas); i++ { + err := <-errs + if err != nil { + replicaErrs = append(replicaErrs, err.Error()) + } + } s.replicaErr = "" - if len(failed) > 0 { - s.replicaErr = fmt.Sprintf("Failed to dial peers: %s", strings.Join(failed, ", ")) + if len(replicaErrs) > 0 { + s.replicaErr = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", ")) } if s.Options.ReplicaErrCallback != nil { s.Options.ReplicaErrCallback(replicas, s.replicaErr) From 268115da119547e98624bcb234facb9720b9a6f2 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Fri, 8 Mar 2024 05:37:11 +0000 Subject: [PATCH 5/5] PR comments --- enterprise/wsproxy/wsproxy.go | 3 + enterprise/wsproxy/wsproxy_test.go | 146 +++++++++++++++++------------ 2 files changed, 87 insertions(+), 62 deletions(-) diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index a32e0e12d7072..55567014fb4e0 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -499,6 +499,9 @@ func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) { replicaErrs = append(replicaErrs, err.Error()) } } + + s.replicaErrMut.Lock() + defer s.replicaErrMut.Unlock() s.replicaErr = "" if len(replicaErrs) > 0 { s.replicaErr = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", ")) diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 8ea4321ec6970..ab0a63e69d2df 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -133,21 +133,20 @@ func TestDERP(t *testing.T) { }) // Create a proxy that is never started. - createProxyCtx := testutil.Context(t, testutil.WaitLong) - _, err := client.CreateWorkspaceProxy(createProxyCtx, codersdk.CreateWorkspaceProxyRequest{ + ctx := testutil.Context(t, testutil.WaitLong) + _, err := client.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ Name: "never-started-proxy", }) require.NoError(t, err) // Wait for both running proxies to become healthy. require.Eventually(t, func() bool { - healthCtx := testutil.Context(t, testutil.WaitLong) - err := api.ProxyHealth.ForceUpdate(healthCtx) + err := api.ProxyHealth.ForceUpdate(ctx) if !assert.NoError(t, err) { return false } - regions, err := client.Regions(healthCtx) + regions, err := client.Regions(ctx) if !assert.NoError(t, err) { return false } @@ -269,7 +268,8 @@ resourceLoop: t.Run("ConnectDERP", func(t *testing.T) { t.Parallel() - connInfo, err := client.WorkspaceAgentConnectionInfo(testutil.Context(t, testutil.WaitLong), agentID) + ctx := testutil.Context(t, testutil.WaitLong) + connInfo, err := client.WorkspaceAgentConnectionInfo(ctx, agentID) require.NoError(t, err) require.NotNil(t, connInfo.DERPMap) require.Len(t, connInfo.DERPMap.Regions, 3+len(api.DeploymentValues.DERP.Server.STUNAddresses.Value())) @@ -292,7 +292,6 @@ resourceLoop: OmitDefaultRegions: true, } - ctx := testutil.Context(t, testutil.WaitLong) report := derphealth.Report{} report.Run(ctx, &derphealth.ReportOptions{ DERPMap: derpMap, @@ -355,14 +354,14 @@ func TestDERPEndToEnd(t *testing.T) { }) // Wait for the proxy to become healthy. + ctx := testutil.Context(t, testutil.WaitLong) require.Eventually(t, func() bool { - healthCtx := testutil.Context(t, testutil.WaitLong) - err := api.ProxyHealth.ForceUpdate(healthCtx) + err := api.ProxyHealth.ForceUpdate(ctx) if !assert.NoError(t, err) { return false } - regions, err := client.Regions(healthCtx) + regions, err := client.Regions(ctx) if !assert.NoError(t, err) { return false } @@ -430,7 +429,6 @@ resourceLoop: _ = coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID) // Connect to the workspace agent. - ctx := testutil.Context(t, testutil.WaitLong) conn, err := client.DialWorkspaceAgent(ctx, agentID, &codersdk.DialWorkspaceAgentOptions{ Logger: slogtest.Make(t, &slogtest.Options{ IgnoreErrors: true, @@ -555,40 +553,8 @@ func TestDERPMesh(t *testing.T) { // replica in the same region as itself periodically. func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { t.Parallel() - - deploymentValues := coderdtest.DeploymentValues(t) - deploymentValues.Experiments = []string{ - "*", - } - - client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ - Options: &coderdtest.Options{ - DeploymentValues: deploymentValues, - AppHostname: "*.primary.test.coder.com", - IncludeProvisionerDaemon: true, - RealIPConfig: &httpmw.RealIPConfig{ - TrustedOrigins: []*net.IPNet{{ - IP: net.ParseIP("127.0.0.1"), - Mask: net.CIDRMask(8, 32), - }}, - TrustedHeaders: []string{ - "CF-Connecting-IP", - }, - }, - }, - LicenseOptions: &coderdenttest.LicenseOptions{ - Features: license.Features{ - codersdk.FeatureWorkspaceProxy: 1, - }, - }, - }) - t.Cleanup(func() { - _ = closer.Close() - }) - - createProxyRegion := func(t *testing.T, name string) codersdk.UpdateWorkspaceProxyResponse { + createProxyRegion := func(ctx context.Context, t *testing.T, client *codersdk.Client, name string) codersdk.UpdateWorkspaceProxyResponse { t.Helper() - ctx := testutil.Context(t, testutil.WaitLong) proxyRes, err := client.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ Name: name, Icon: "/emojis/flag.png", @@ -597,7 +563,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { return proxyRes } - registerBrokenProxy := func(t *testing.T, accessURL, token string) { + registerBrokenProxy := func(ctx context.Context, t *testing.T, primaryAccessURL *url.URL, accessURL, token string) { t.Helper() // Create a HTTP server that always replies with 500. srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { @@ -606,9 +572,8 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { t.Cleanup(srv.Close) // Register a proxy. - wsproxyClient := wsproxysdk.New(api.AccessURL) + wsproxyClient := wsproxysdk.New(primaryAccessURL) wsproxyClient.SetSessionToken(token) - ctx := testutil.Context(t, testutil.WaitLong) hostname, err := cryptorand.String(6) require.NoError(t, err) @@ -629,10 +594,41 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { t.Run("ProbeOK", func(t *testing.T) { t.Parallel() + deploymentValues := coderdtest.DeploymentValues(t) + deploymentValues.Experiments = []string{ + "*", + } + + client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ + Options: &coderdtest.Options{ + DeploymentValues: deploymentValues, + AppHostname: "*.primary.test.coder.com", + IncludeProvisionerDaemon: true, + RealIPConfig: &httpmw.RealIPConfig{ + TrustedOrigins: []*net.IPNet{{ + IP: net.ParseIP("127.0.0.1"), + Mask: net.CIDRMask(8, 32), + }}, + TrustedHeaders: []string{ + "CF-Connecting-IP", + }, + }, + }, + LicenseOptions: &coderdenttest.LicenseOptions{ + Features: license.Features{ + codersdk.FeatureWorkspaceProxy: 1, + }, + }, + }) + t.Cleanup(func() { + _ = closer.Close() + }) + // Register but don't start a proxy in a different region. This // shouldn't affect the mesh since it's in a different region. - fakeProxyRes := createProxyRegion(t, "fake-proxy") - registerBrokenProxy(t, "https://fake-proxy.test.coder.com", fakeProxyRes.ProxyToken) + ctx := testutil.Context(t, testutil.WaitLong) + fakeProxyRes := createProxyRegion(ctx, t, client, "fake-proxy") + registerBrokenProxy(ctx, t, api.AccessURL, "https://fake-proxy.test.coder.com", fakeProxyRes.ProxyToken) proxyURL, err := url.Parse("https://proxy1.test.coder.com") require.NoError(t, err) @@ -688,7 +684,6 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { // Check they're all healthy according to /healthz-report. for _, proxy := range proxies { // GET /healthz-report - ctx := testutil.Context(t, testutil.WaitLong) u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"}) req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) require.NoError(t, err) @@ -700,7 +695,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { resp.Body.Close() require.NoError(t, err) - assert.Empty(t, respJSON.Errors, "proxy is not healthy") + require.Empty(t, respJSON.Errors, "proxy is not healthy") } }) @@ -709,6 +704,36 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { t.Run("ProbeFail", func(t *testing.T) { t.Parallel() + deploymentValues := coderdtest.DeploymentValues(t) + deploymentValues.Experiments = []string{ + "*", + } + + client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ + Options: &coderdtest.Options{ + DeploymentValues: deploymentValues, + AppHostname: "*.primary.test.coder.com", + IncludeProvisionerDaemon: true, + RealIPConfig: &httpmw.RealIPConfig{ + TrustedOrigins: []*net.IPNet{{ + IP: net.ParseIP("127.0.0.1"), + Mask: net.CIDRMask(8, 32), + }}, + TrustedHeaders: []string{ + "CF-Connecting-IP", + }, + }, + }, + LicenseOptions: &coderdenttest.LicenseOptions{ + Features: license.Features{ + codersdk.FeatureWorkspaceProxy: 1, + }, + }, + }) + t.Cleanup(func() { + _ = closer.Close() + }) + proxyURL, err := url.Parse("https://proxy2.test.coder.com") require.NoError(t, err) @@ -727,11 +752,13 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { }, }) - // Register (but don't start) 5 other proxies in the same region. Since - // they registered recently they should be included in the mesh (but - // will fail as the server always responds with 500). + // Register (but don't start wsproxy.Server) 5 other proxies in the same + // region. Since they registered recently they should be included in the + // mesh. We create a HTTP server on the relay address that always + // responds with 500 so probes fail. + ctx := testutil.Context(t, testutil.WaitLong) for i := 0; i < fakeCount; i++ { - registerBrokenProxy(t, proxyURL.String(), proxy.Options.ProxySessionToken) + registerBrokenProxy(ctx, t, api.AccessURL, proxyURL.String(), proxy.Options.ProxySessionToken) } // Force the proxy to re-register immediately. @@ -739,15 +766,10 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { require.NoError(t, err, "failed to force proxy to re-register") // Wait for the ping to fail. - select { - case err := <-replicaPingErr: - require.NotEmpty(t, err, "replica ping error") - case <-testutil.Context(t, testutil.WaitLong).Done(): - t.Fatal("timed out waiting for replica ping error") - } + replicaErr := testutil.RequireRecvCtx(ctx, t, replicaPingErr) + require.NotEmpty(t, replicaErr, "replica ping error") // GET /healthz-report - ctx := testutil.Context(t, testutil.WaitLong) u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"}) req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) require.NoError(t, err)