From 9cd470a0840b37db2a6282d6adee0d2187bc2d85 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Tue, 20 Feb 2024 07:32:44 +0000 Subject: [PATCH 1/4] chore: add test for workspace proxy derp meshing --- enterprise/coderd/coderdenttest/proxytest.go | 34 ++- enterprise/wsproxy/wsproxy.go | 26 ++- enterprise/wsproxy/wsproxy_test.go | 165 +++++++++++++ enterprise/wsproxy/wsproxysdk/wsproxysdk.go | 234 ++++++++++++------- 4 files changed, 354 insertions(+), 105 deletions(-) diff --git a/enterprise/coderd/coderdenttest/proxytest.go b/enterprise/coderd/coderdenttest/proxytest.go index 9b43cbe6c316d..d3385f2c12034 100644 --- a/enterprise/coderd/coderdenttest/proxytest.go +++ b/enterprise/coderd/coderdenttest/proxytest.go @@ -38,15 +38,24 @@ type ProxyOptions struct { // ProxyURL is optional ProxyURL *url.URL + // Token is optional. If specified, a new proxy won't be created. + Token string + // FlushStats is optional FlushStats chan chan<- struct{} } +type WorkspaceProxy struct { + *wsproxy.Server + + ServerURL *url.URL +} + // NewWorkspaceProxy will configure a wsproxy.Server with the given options. // The new wsproxy will register itself with the given coderd.API instance. // The first user owner client is required to create the wsproxy on the coderd // api server. -func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Client, options *ProxyOptions) *wsproxy.Server { +func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Client, options *ProxyOptions) WorkspaceProxy { ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(cancelFunc) @@ -107,11 +116,15 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie options.Name = namesgenerator.GetRandomName(1) } - proxyRes, err := owner.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ - Name: options.Name, - Icon: "/emojis/flag.png", - }) - require.NoError(t, err, "failed to create workspace proxy") + token := options.Token + if token == "" { + proxyRes, err := owner.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{ + Name: options.Name, + Icon: "/emojis/flag.png", + }) + require.NoError(t, err, "failed to create workspace proxy") + token = proxyRes.ProxyToken + } // Inherit collector options from coderd, but keep the wsproxy reporter. statsCollectorOptions := coderdAPI.Options.WorkspaceAppsStatsCollectorOptions @@ -131,14 +144,14 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie Tracing: coderdAPI.TracerProvider, APIRateLimit: coderdAPI.APIRateLimit, SecureAuthCookie: coderdAPI.SecureAuthCookie, - ProxySessionToken: proxyRes.ProxyToken, + ProxySessionToken: token, DisablePathApps: options.DisablePathApps, // We need a new registry to not conflict with the coderd internal // proxy metrics. PrometheusRegistry: prometheus.NewRegistry(), DERPEnabled: !options.DerpDisabled, DERPOnly: options.DerpOnly, - DERPServerRelayAddress: accessURL.String(), + DERPServerRelayAddress: serverURL.String(), StatsCollectorOptions: statsCollectorOptions, }) require.NoError(t, err) @@ -151,5 +164,8 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie handler = wssrv.Handler mutex.Unlock() - return wssrv + return WorkspaceProxy{ + Server: wssrv, + ServerURL: serverURL, + } } diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index 68693f4633871..5d4eca0244e5f 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -128,7 +128,7 @@ type Server struct { ctx context.Context cancel context.CancelFunc derpCloseFunc func() - registerDone <-chan struct{} + registerLoop *wsproxysdk.RegisterWorkspaceProxyLoop } // New creates a new workspace proxy server. This requires a primary coderd @@ -210,7 +210,7 @@ func New(ctx context.Context, opts *Options) (*Server, error) { // goroutine to periodically re-register. replicaID := uuid.New() osHostname := cliutil.Hostname() - regResp, registerDone, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{ + registerLoop, regResp, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{ Logger: opts.Logger, Request: wsproxysdk.RegisterWorkspaceProxyRequest{ AccessURL: opts.AccessURL.String(), @@ -230,12 +230,13 @@ func New(ctx context.Context, opts *Options) (*Server, error) { if err != nil { return nil, xerrors.Errorf("register proxy: %w", err) } - s.registerDone = registerDone - err = s.handleRegister(ctx, regResp) + s.registerLoop = registerLoop + + derpServer.SetMeshKey(regResp.DERPMeshKey) + err = s.handleRegister(regResp) if err != nil { return nil, xerrors.Errorf("handle register: %w", err) } - derpServer.SetMeshKey(regResp.DERPMeshKey) secKey, err := workspaceapps.KeyFromString(regResp.AppSecurityKey) if err != nil { @@ -409,16 +410,16 @@ func New(ctx context.Context, opts *Options) (*Server, error) { return s, nil } +func (s *Server) RegisterNow(ctx context.Context) error { + _, err := s.registerLoop.RegisterNow(ctx) + return err +} + func (s *Server) Close() error { s.cancel() var err error - registerDoneWaitTicker := time.NewTicker(11 * time.Second) // the attempt timeout is 10s - select { - case <-registerDoneWaitTicker.C: - err = multierror.Append(err, xerrors.New("timed out waiting for registerDone")) - case <-s.registerDone: - } + s.registerLoop.Close() s.derpCloseFunc() appServerErr := s.AppServer.Close() if appServerErr != nil { @@ -437,11 +438,12 @@ func (*Server) mutateRegister(_ *wsproxysdk.RegisterWorkspaceProxyRequest) { // package in the primary and update req.ReplicaError accordingly. } -func (s *Server) handleRegister(_ context.Context, res wsproxysdk.RegisterWorkspaceProxyResponse) error { +func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) error { addresses := make([]string, len(res.SiblingReplicas)) for i, replica := range res.SiblingReplicas { addresses[i] = replica.RelayAddress } + s.Logger.Debug(s.ctx, "setting DERP mesh sibling addresses", slog.F("addresses", addresses)) s.derpMesh.SetAddresses(addresses, false) s.latestDERPMap.Store(res.DERPMap) diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 0d440165dfb16..24ebac15c208c 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -3,12 +3,15 @@ package wsproxy_test import ( "fmt" "net" + "net/url" "testing" + "time" "github.com/davecgh/go-spew/spew" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "tailscale.com/derp" "tailscale.com/derp/derphttp" "tailscale.com/tailcfg" "tailscale.com/types/key" @@ -22,6 +25,7 @@ import ( "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/workspaceapps/apptest" "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/enterprise/coderd/coderdenttest" "github.com/coder/coder/v2/enterprise/coderd/license" "github.com/coder/coder/v2/provisioner/echo" @@ -430,6 +434,167 @@ resourceLoop: require.False(t, p2p) } +// TestDERPMesh spawns 10 workspace proxy replicas and tries to connect to a +// single DERP peer via every single one. +func TestDERPMesh(t *testing.T) { + t.Parallel() + + deploymentValues := coderdtest.DeploymentValues(t) + deploymentValues.Experiments = []string{ + "*", + } + + client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ + Options: &coderdtest.Options{ + DeploymentValues: deploymentValues, + AppHostname: "*.primary.test.coder.com", + IncludeProvisionerDaemon: true, + RealIPConfig: &httpmw.RealIPConfig{ + TrustedOrigins: []*net.IPNet{{ + IP: net.ParseIP("127.0.0.1"), + Mask: net.CIDRMask(8, 32), + }}, + TrustedHeaders: []string{ + "CF-Connecting-IP", + }, + }, + }, + LicenseOptions: &coderdenttest.LicenseOptions{ + Features: license.Features{ + codersdk.FeatureWorkspaceProxy: 1, + }, + }, + }) + t.Cleanup(func() { + _ = closer.Close() + }) + + proxyURL, err := url.Parse("https://proxy.test.coder.com") + require.NoError(t, err) + + // Create 10 proxy replicas. + const count = 10 + var ( + sessionToken = "" + proxies = [count]coderdenttest.WorkspaceProxy{} + derpURLs = [count]string{} + ) + for i := range proxies { + proxies[i] = coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + Name: "best-proxy", + Token: sessionToken, + ProxyURL: proxyURL, + }) + if i == 0 { + sessionToken = proxies[i].Options.ProxySessionToken + } + + derpURL := *proxies[i].ServerURL + derpURL.Path = "/derp" + derpURLs[i] = derpURL.String() + } + + // Force all proxies to re-register immediately. This ensures the DERP mesh + // is up-to-date. In production this will happen automatically after about + // 15 seconds. + for i, proxy := range proxies { + err := proxy.RegisterNow(testutil.Context(t, testutil.WaitLong)) + require.NoErrorf(t, err, "failed to force proxy %d to re-register", i) + } + + createClient := func(t *testing.T, name string, derpURL string) (*derphttp.Client, <-chan derp.ReceivedPacket) { + t.Helper() + + client, err := derphttp.NewClient(key.NewNode(), derpURLs[0], func(format string, args ...any) { + t.Logf(name+": "+format, args...) + }) + require.NoError(t, err, "create client") + err = client.Connect(testutil.Context(t, testutil.WaitLong)) + require.NoError(t, err, "connect to DERP server") + + ch := make(chan derp.ReceivedPacket, 64) + go func() { + defer close(ch) + for { + msg, err := client.Recv() + if err != nil { + t.Logf("Recv error: %v", err) + return + } + switch msg := msg.(type) { + case derp.ReceivedPacket: + ch <- msg + return + default: + // We don't care about other messages. + } + } + }() + + return client, ch + } + + sendTest := func(t *testing.T, dstKey key.NodePublic, dstCh <-chan derp.ReceivedPacket, src *derphttp.Client) { + t.Helper() + + const msgStrPrefix = "test_packet_" + msgStr, err := cryptorand.String(64 - len(msgStrPrefix)) + require.NoError(t, err, "generate random msg string") + msg := []byte(msgStrPrefix + msgStr) + + err = src.Send(dstKey, msg) + require.NoError(t, err, "send message via DERP") + + waitCtx := testutil.Context(t, testutil.WaitLong) + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + for { + select { + case pkt := <-dstCh: + assert.Equal(t, src.SelfPublicKey(), pkt.Source, "packet came from wrong source") + assert.Equal(t, msg, pkt.Data, "packet data is wrong") + return + case <-waitCtx.Done(): + t.Fatal("timed out waiting for packet") + return + case <-ticker.C: + } + + // Send another packet. Since we're sending packets immediately + // after opening the clients, they might not be meshed together + // properly yet. + err = src.Send(dstKey, msg) + require.NoError(t, err, "send message via DERP") + } + } + + for i, derpURL := range derpURLs { + i, derpURL := i, derpURL + t.Run(fmt.Sprintf("Proxy%d", i), func(t *testing.T) { + t.Parallel() + t.Logf("derp1=%s, derp2=%s", derpURLs[0], derpURL) + + // Client 1 is always on the first proxy as a "control". + client1, client1Recv := createClient(t, "client1", derpURLs[0]) + t.Cleanup(func() { + _ = client1.Close() + }) + + // Client 2 is on the current proxy. + client2, client2Recv := createClient(t, "client2", derpURL) + t.Cleanup(func() { + _ = client2.Close() + }) + + // Send a packet from client 1 to client 2. + sendTest(t, client2.SelfPublicKey(), client2Recv, client1) + + // Send a packet from client 2 to client 1. + sendTest(t, client1.SelfPublicKey(), client1Recv, client2) + }) + } +} + func TestWorkspaceProxyWorkspaceApps(t *testing.T) { t.Parallel() diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index 1163f7c435001..448c493307d2e 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -277,135 +277,201 @@ type RegisterWorkspaceProxyLoopOpts struct { // called in a blocking manner, so it should avoid blocking for too long. If // the callback returns an error, the loop will stop immediately and the // error will be returned to the FailureFn. - CallbackFn func(ctx context.Context, res RegisterWorkspaceProxyResponse) error + CallbackFn func(res RegisterWorkspaceProxyResponse) error // FailureFn is called with the last error returned from the server if the // context is canceled, registration fails for more than MaxFailureCount, // or if any permanent values in the response change. FailureFn func(err error) } -// RegisterWorkspaceProxyLoop will register the workspace proxy and then start a -// goroutine to keep registering periodically in the background. -// -// The first response is returned immediately, and subsequent responses will be -// notified to the given CallbackFn. When the context is canceled the loop will -// stop immediately and the context error will be returned to the FailureFn. -// -// The returned channel will be closed when the loop stops and can be used to -// ensure the loop is dead before continuing. When a fatal error is encountered, -// the proxy will be deregistered (with the same ReplicaID and AttemptTimeout) -// before calling the FailureFn. -func (c *Client) RegisterWorkspaceProxyLoop(ctx context.Context, opts RegisterWorkspaceProxyLoopOpts) (RegisterWorkspaceProxyResponse, <-chan struct{}, error) { - if opts.Interval == 0 { - opts.Interval = 30 * time.Second - } - if opts.MaxFailureCount == 0 { - opts.MaxFailureCount = 10 - } - if opts.AttemptTimeout == 0 { - opts.AttemptTimeout = 10 * time.Second - } - if opts.MutateFn == nil { - opts.MutateFn = func(_ *RegisterWorkspaceProxyRequest) {} - } - if opts.CallbackFn == nil { - opts.CallbackFn = func(_ context.Context, _ RegisterWorkspaceProxyResponse) error { - return nil - } +type RegisterWorkspaceProxyLoop struct { + opts RegisterWorkspaceProxyLoopOpts + c *Client + originalRes *RegisterWorkspaceProxyResponse + + closedCtx context.Context + close context.CancelFunc + done chan struct{} +} + +// register registers once. It returns the response, whether the loop should +// exit immediately, and any error that occurred. +func (l *RegisterWorkspaceProxyLoop) register(ctx context.Context) (RegisterWorkspaceProxyResponse, bool, error) { + // If it's not the first registration, call the mutate function. + if l.originalRes != nil { + l.mutateFn(&l.opts.Request) } - failureFn := func(err error) { - // We have to use background context here because the original context - // may be canceled. - deregisterCtx, cancel := context.WithTimeout(context.Background(), opts.AttemptTimeout) - defer cancel() - deregisterErr := c.DeregisterWorkspaceProxy(deregisterCtx, DeregisterWorkspaceProxyRequest{ - ReplicaID: opts.Request.ReplicaID, - }) - if deregisterErr != nil { - opts.Logger.Error(ctx, - "failed to deregister workspace proxy with Coder primary (it will be automatically deregistered shortly)", - slog.Error(deregisterErr), - ) - } + registerCtx, registerCancel := context.WithTimeout(ctx, l.opts.AttemptTimeout) + res, err := l.c.RegisterWorkspaceProxy(registerCtx, l.opts.Request) + registerCancel() + if err != nil { + return RegisterWorkspaceProxyResponse{}, false, xerrors.Errorf("register workspace proxy: %w", err) + } + + // Catastrophic failures if any of the "permanent" values change. + if l.originalRes != nil && res.AppSecurityKey != l.originalRes.AppSecurityKey { + return RegisterWorkspaceProxyResponse{}, true, xerrors.New("app security key has changed, proxy must be restarted") + } + if l.originalRes != nil && res.DERPMeshKey != l.originalRes.DERPMeshKey { + return RegisterWorkspaceProxyResponse{}, true, xerrors.New("DERP mesh key has changed, proxy must be restarted") + } + if l.originalRes != nil && res.DERPRegionID != l.originalRes.DERPRegionID { + return RegisterWorkspaceProxyResponse{}, true, xerrors.New("DERP region ID has changed, proxy must be restarted") + } - if opts.FailureFn != nil { - opts.FailureFn(err) + // If it's not the first registration, call the callback function. + if l.originalRes != nil { + err = l.callbackFn(res) + if err != nil { + return RegisterWorkspaceProxyResponse{}, true, xerrors.Errorf("callback err: %w", err) } + } else { + l.originalRes = &res + } + + return res, false, nil +} + +// Start starts the proxy registration loop. The provided context is only used +// for the initial registration. Use Close() to stop. +func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspaceProxyResponse, error) { + if l.opts.Interval == 0 { + l.opts.Interval = 15 * time.Second + } + if l.opts.MaxFailureCount == 0 { + l.opts.MaxFailureCount = 10 + } + if l.opts.AttemptTimeout == 0 { + l.opts.AttemptTimeout = 10 * time.Second } - originalRes, err := c.RegisterWorkspaceProxy(ctx, opts.Request) + originalRes, _, err := l.register(ctx) if err != nil { - return RegisterWorkspaceProxyResponse{}, nil, xerrors.Errorf("register workspace proxy: %w", err) + return RegisterWorkspaceProxyResponse{}, xerrors.Errorf("initial registration: %w", err) } - done := make(chan struct{}) go func() { - defer close(done) + defer close(l.done) var ( failedAttempts = 0 - ticker = time.NewTicker(opts.Interval) + ticker = time.NewTicker(l.opts.Interval) ) for { select { - case <-ctx.Done(): - failureFn(ctx.Err()) + case <-l.closedCtx.Done(): + l.failureFn(xerrors.Errorf("proxy registration loop closed")) return case <-ticker.C: } - opts.Logger.Debug(ctx, + l.opts.Logger.Debug(context.Background(), "re-registering workspace proxy with Coder primary", - slog.F("req", opts.Request), - slog.F("timeout", opts.AttemptTimeout), + slog.F("req", l.opts.Request), + slog.F("timeout", l.opts.AttemptTimeout), slog.F("failed_attempts", failedAttempts), ) - opts.MutateFn(&opts.Request) - registerCtx, cancel := context.WithTimeout(ctx, opts.AttemptTimeout) - res, err := c.RegisterWorkspaceProxy(registerCtx, opts.Request) - cancel() + + _, catastrophicErr, err := l.register(l.closedCtx) if err != nil { + if catastrophicErr { + l.failureFn(err) + return + } failedAttempts++ - opts.Logger.Warn(ctx, + l.opts.Logger.Warn(context.Background(), "failed to re-register workspace proxy with Coder primary", - slog.F("req", opts.Request), - slog.F("timeout", opts.AttemptTimeout), + slog.F("req", l.opts.Request), + slog.F("timeout", l.opts.AttemptTimeout), slog.F("failed_attempts", failedAttempts), slog.Error(err), ) - if failedAttempts > opts.MaxFailureCount { - failureFn(xerrors.Errorf("exceeded re-registration failure count of %d: last error: %w", opts.MaxFailureCount, err)) + if failedAttempts > l.opts.MaxFailureCount { + l.failureFn(xerrors.Errorf("exceeded re-registration failure count of %d: last error: %w", l.opts.MaxFailureCount, err)) return } continue } failedAttempts = 0 - if res.AppSecurityKey != originalRes.AppSecurityKey { - failureFn(xerrors.New("app security key has changed, proxy must be restarted")) - return - } - if res.DERPMeshKey != originalRes.DERPMeshKey { - failureFn(xerrors.New("DERP mesh key has changed, proxy must be restarted")) - return - } - if res.DERPRegionID != originalRes.DERPRegionID { - failureFn(xerrors.New("DERP region ID has changed, proxy must be restarted")) - } - - err = opts.CallbackFn(ctx, res) - if err != nil { - failureFn(xerrors.Errorf("callback fn returned error: %w", err)) - return - } - - ticker.Reset(opts.Interval) + ticker.Reset(l.opts.Interval) } }() - return originalRes, done, nil + return originalRes, nil +} + +func (l *RegisterWorkspaceProxyLoop) RegisterNow(ctx context.Context) (RegisterWorkspaceProxyResponse, error) { + // Catastrophic failures don't affect the loop when manually re-registering. + res, _, err := l.register(ctx) + return res, err +} + +func (l *RegisterWorkspaceProxyLoop) Close() { + l.close() + <-l.done +} + +func (l *RegisterWorkspaceProxyLoop) mutateFn(req *RegisterWorkspaceProxyRequest) { + if l.opts.MutateFn != nil { + l.opts.MutateFn(req) + } +} + +func (l *RegisterWorkspaceProxyLoop) callbackFn(res RegisterWorkspaceProxyResponse) error { + if l.opts.CallbackFn != nil { + return l.opts.CallbackFn(res) + } + return nil +} + +func (l *RegisterWorkspaceProxyLoop) failureFn(err error) { + // We have to use background context here because the original context may + // be canceled. + deregisterCtx, cancel := context.WithTimeout(context.Background(), l.opts.AttemptTimeout) + defer cancel() + deregisterErr := l.c.DeregisterWorkspaceProxy(deregisterCtx, DeregisterWorkspaceProxyRequest{ + ReplicaID: l.opts.Request.ReplicaID, + }) + if deregisterErr != nil { + l.opts.Logger.Error(context.Background(), + "failed to deregister workspace proxy with Coder primary (it will be automatically deregistered shortly)", + slog.Error(deregisterErr), + ) + } + + if l.opts.FailureFn != nil { + l.opts.FailureFn(err) + } +} + +// RegisterWorkspaceProxyLoop will register the workspace proxy and then start a +// goroutine to keep registering periodically in the background. +// +// The first response is returned immediately, and subsequent responses will be +// notified to the given CallbackFn. When the loop is Close()d it will stop +// immediately and an error will be returned to the FailureFn. +// +// When a fatal error is encountered (or the proxy is closed), the proxy will be +// deregistered (with the same ReplicaID and AttemptTimeout) before calling the +// FailureFn. +func (c *Client) RegisterWorkspaceProxyLoop(ctx context.Context, opts RegisterWorkspaceProxyLoopOpts) (*RegisterWorkspaceProxyLoop, RegisterWorkspaceProxyResponse, error) { + closedCtx, closeFn := context.WithCancel(context.Background()) + loop := &RegisterWorkspaceProxyLoop{ + opts: opts, + c: c, + closedCtx: closedCtx, + close: closeFn, + done: make(chan struct{}), + } + + regResp, err := loop.Start(ctx) + if err != nil { + return nil, RegisterWorkspaceProxyResponse{}, xerrors.Errorf("start loop: %w", err) + } + return loop, regResp, nil } type CoordinateMessageType int From a754acfe8b9c8d1ca79c5793859d5637d2d11a85 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Tue, 20 Feb 2024 07:46:06 +0000 Subject: [PATCH 2/4] Reduce replicas to 6 in test, test all paths --- enterprise/wsproxy/wsproxy_test.go | 38 +++++++++++++++++------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 24ebac15c208c..77ebf621575aa 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -434,7 +434,7 @@ resourceLoop: require.False(t, p2p) } -// TestDERPMesh spawns 10 workspace proxy replicas and tries to connect to a +// TestDERPMesh spawns 6 workspace proxy replicas and tries to connect to a // single DERP peer via every single one. func TestDERPMesh(t *testing.T) { t.Parallel() @@ -472,8 +472,8 @@ func TestDERPMesh(t *testing.T) { proxyURL, err := url.Parse("https://proxy.test.coder.com") require.NoError(t, err) - // Create 10 proxy replicas. - const count = 10 + // Create 6 proxy replicas. + const count = 6 var ( sessionToken = "" proxies = [count]coderdenttest.WorkspaceProxy{} @@ -509,6 +509,9 @@ func TestDERPMesh(t *testing.T) { t.Logf(name+": "+format, args...) }) require.NoError(t, err, "create client") + t.Cleanup(func() { + _ = client.Close() + }) err = client.Connect(testutil.Context(t, testutil.WaitLong)) require.NoError(t, err, "connect to DERP server") @@ -568,23 +571,26 @@ func TestDERPMesh(t *testing.T) { } } + // Generate cases. We have a case for: + // - Each proxy to itself. + // - Each proxy to each other proxy (one way, no duplicates). + cases := [][2]string{} for i, derpURL := range derpURLs { - i, derpURL := i, derpURL + cases = append(cases, [2]string{derpURL, derpURL}) + for j := i + 1; j < len(derpURLs); j++ { + cases = append(cases, [2]string{derpURL, derpURLs[j]}) + } + } + require.Len(t, cases, (count*(count+1))/2) // triangle number + + for i, c := range cases { + i, c := i, c t.Run(fmt.Sprintf("Proxy%d", i), func(t *testing.T) { t.Parallel() - t.Logf("derp1=%s, derp2=%s", derpURLs[0], derpURL) - - // Client 1 is always on the first proxy as a "control". - client1, client1Recv := createClient(t, "client1", derpURLs[0]) - t.Cleanup(func() { - _ = client1.Close() - }) - // Client 2 is on the current proxy. - client2, client2Recv := createClient(t, "client2", derpURL) - t.Cleanup(func() { - _ = client2.Close() - }) + t.Logf("derp1=%s, derp2=%s", c[0], c[1]) + client1, client1Recv := createClient(t, "client1", c[0]) + client2, client2Recv := createClient(t, "client2", c[1]) // Send a packet from client 1 to client 2. sendTest(t, client2.SelfPublicKey(), client2Recv, client1) From 4c22aaab628139aa21a9ef6d98f4773560203154 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Fri, 23 Feb 2024 11:24:35 +0000 Subject: [PATCH 3/4] PR comments --- enterprise/coderd/coderdenttest/proxytest.go | 19 +- enterprise/coderd/workspaceproxy_test.go | 4 +- enterprise/wsproxy/wsproxy.go | 4 +- enterprise/wsproxy/wsproxy_test.go | 176 ++++++++++--------- enterprise/wsproxy/wsproxysdk/wsproxysdk.go | 136 ++++++++------ 5 files changed, 195 insertions(+), 144 deletions(-) diff --git a/enterprise/coderd/coderdenttest/proxytest.go b/enterprise/coderd/coderdenttest/proxytest.go index d3385f2c12034..831c4be86f640 100644 --- a/enterprise/coderd/coderdenttest/proxytest.go +++ b/enterprise/coderd/coderdenttest/proxytest.go @@ -38,7 +38,9 @@ type ProxyOptions struct { // ProxyURL is optional ProxyURL *url.URL - // Token is optional. If specified, a new proxy won't be created. + // Token is optional. If specified, a new workspace proxy region will not be + // created, and the proxy will become a replica of the existing proxy + // region. Token string // FlushStats is optional @@ -51,11 +53,14 @@ type WorkspaceProxy struct { ServerURL *url.URL } -// NewWorkspaceProxy will configure a wsproxy.Server with the given options. -// The new wsproxy will register itself with the given coderd.API instance. -// The first user owner client is required to create the wsproxy on the coderd -// api server. -func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Client, options *ProxyOptions) WorkspaceProxy { +// NewWorkspaceProxyReplica will configure a wsproxy.Server with the given +// options. The new wsproxy replica will register itself with the given +// coderd.API instance. +// +// If a token is not provided, a new workspace proxy region is created using the +// owner client. If a token is provided, the proxy will become a replica of the +// existing proxy region. +func NewWorkspaceProxyReplica(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Client, options *ProxyOptions) WorkspaceProxy { ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(cancelFunc) @@ -134,7 +139,7 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie } wssrv, err := wsproxy.New(ctx, &wsproxy.Options{ - Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug).With(slog.F("server_url", serverURL.String())), Experiments: options.Experiments, DashboardURL: coderdAPI.AccessURL, AccessURL: accessURL, diff --git a/enterprise/coderd/workspaceproxy_test.go b/enterprise/coderd/workspaceproxy_test.go index 17e17240dcace..b7d4e8cf2f8f9 100644 --- a/enterprise/coderd/workspaceproxy_test.go +++ b/enterprise/coderd/workspaceproxy_test.go @@ -99,7 +99,7 @@ func TestRegions(t *testing.T) { require.NoError(t, err) const proxyName = "hello" - _ = coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + _ = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: proxyName, AppHostname: appHostname + ".proxy", }) @@ -734,7 +734,7 @@ func TestReconnectingPTYSignedToken(t *testing.T) { proxyURL, err := url.Parse(fmt.Sprintf("https://%s.com", namesgenerator.GetRandomName(1))) require.NoError(t, err) - _ = coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + _ = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: namesgenerator.GetRandomName(1), ProxyURL: proxyURL, AppHostname: "*.sub.example.com", diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index 5d4eca0244e5f..17fae2b791695 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -410,8 +410,8 @@ func New(ctx context.Context, opts *Options) (*Server, error) { return s, nil } -func (s *Server) RegisterNow(ctx context.Context) error { - _, err := s.registerLoop.RegisterNow(ctx) +func (s *Server) RegisterNow() error { + _, err := s.registerLoop.RegisterNow() return err } diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 77ebf621575aa..44790b34d82ac 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -66,7 +66,7 @@ func TestDERPOnly(t *testing.T) { }) // Create an external proxy. - _ = coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + _ = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "best-proxy", DerpOnly: true, }) @@ -113,15 +113,15 @@ func TestDERP(t *testing.T) { }) // Create two running external proxies. - proxyAPI1 := coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + proxyAPI1 := coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "best-proxy", }) - proxyAPI2 := coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + proxyAPI2 := coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "worst-proxy", }) // Create a running external proxy with DERP disabled. - proxyAPI3 := coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + proxyAPI3 := coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "no-derp-proxy", DerpDisabled: true, }) @@ -344,7 +344,7 @@ func TestDERPEndToEnd(t *testing.T) { _ = closer.Close() }) - coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "best-proxy", }) @@ -480,7 +480,7 @@ func TestDERPMesh(t *testing.T) { derpURLs = [count]string{} ) for i := range proxies { - proxies[i] = coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "best-proxy", Token: sessionToken, ProxyURL: proxyURL, @@ -498,79 +498,10 @@ func TestDERPMesh(t *testing.T) { // is up-to-date. In production this will happen automatically after about // 15 seconds. for i, proxy := range proxies { - err := proxy.RegisterNow(testutil.Context(t, testutil.WaitLong)) + err := proxy.RegisterNow() require.NoErrorf(t, err, "failed to force proxy %d to re-register", i) } - createClient := func(t *testing.T, name string, derpURL string) (*derphttp.Client, <-chan derp.ReceivedPacket) { - t.Helper() - - client, err := derphttp.NewClient(key.NewNode(), derpURLs[0], func(format string, args ...any) { - t.Logf(name+": "+format, args...) - }) - require.NoError(t, err, "create client") - t.Cleanup(func() { - _ = client.Close() - }) - err = client.Connect(testutil.Context(t, testutil.WaitLong)) - require.NoError(t, err, "connect to DERP server") - - ch := make(chan derp.ReceivedPacket, 64) - go func() { - defer close(ch) - for { - msg, err := client.Recv() - if err != nil { - t.Logf("Recv error: %v", err) - return - } - switch msg := msg.(type) { - case derp.ReceivedPacket: - ch <- msg - return - default: - // We don't care about other messages. - } - } - }() - - return client, ch - } - - sendTest := func(t *testing.T, dstKey key.NodePublic, dstCh <-chan derp.ReceivedPacket, src *derphttp.Client) { - t.Helper() - - const msgStrPrefix = "test_packet_" - msgStr, err := cryptorand.String(64 - len(msgStrPrefix)) - require.NoError(t, err, "generate random msg string") - msg := []byte(msgStrPrefix + msgStr) - - err = src.Send(dstKey, msg) - require.NoError(t, err, "send message via DERP") - - waitCtx := testutil.Context(t, testutil.WaitLong) - ticker := time.NewTicker(time.Millisecond * 500) - defer ticker.Stop() - for { - select { - case pkt := <-dstCh: - assert.Equal(t, src.SelfPublicKey(), pkt.Source, "packet came from wrong source") - assert.Equal(t, msg, pkt.Data, "packet data is wrong") - return - case <-waitCtx.Done(): - t.Fatal("timed out waiting for packet") - return - case <-ticker.C: - } - - // Send another packet. Since we're sending packets immediately - // after opening the clients, they might not be meshed together - // properly yet. - err = src.Send(dstKey, msg) - require.NoError(t, err, "send message via DERP") - } - } - // Generate cases. We have a case for: // - Each proxy to itself. // - Each proxy to each other proxy (one way, no duplicates). @@ -589,14 +520,14 @@ func TestDERPMesh(t *testing.T) { t.Parallel() t.Logf("derp1=%s, derp2=%s", c[0], c[1]) - client1, client1Recv := createClient(t, "client1", c[0]) - client2, client2Recv := createClient(t, "client2", c[1]) + client1, client1Recv := createDERPClient(t, "client1", c[0]) + client2, client2Recv := createDERPClient(t, "client2", c[1]) // Send a packet from client 1 to client 2. - sendTest(t, client2.SelfPublicKey(), client2Recv, client1) + testDERPSend(t, client2.SelfPublicKey(), client2Recv, client1) // Send a packet from client 2 to client 1. - sendTest(t, client1.SelfPublicKey(), client1Recv, client2) + testDERPSend(t, client1.SelfPublicKey(), client1Recv, client2) }) } } @@ -653,7 +584,7 @@ func TestWorkspaceProxyWorkspaceApps(t *testing.T) { if opts.DisableSubdomainApps { opts.AppHost = "" } - proxyAPI := coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{ + proxyAPI := coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "best-proxy", AppHostname: opts.AppHost, DisablePathApps: opts.DisablePathApps, @@ -669,3 +600,86 @@ func TestWorkspaceProxyWorkspaceApps(t *testing.T) { } }) } + +// createDERPClient creates a DERP client and spawns a goroutine that reads from +// the client and sends the received packets to a channel. +func createDERPClient(t *testing.T, name string, derpURL string) (*derphttp.Client, <-chan derp.ReceivedPacket) { + t.Helper() + + client, err := derphttp.NewClient(key.NewNode(), derpURL, func(format string, args ...any) { + t.Logf(name+": "+format, args...) + }) + require.NoError(t, err, "create client") + t.Cleanup(func() { + _ = client.Close() + }) + err = client.Connect(testutil.Context(t, testutil.WaitLong)) + require.NoError(t, err, "connect to DERP server") + + ch := make(chan derp.ReceivedPacket, 1) + done := make(chan struct{}) + go func() { + defer close(ch) + defer close(done) + for { + msg, err := client.Recv() + if err != nil { + t.Logf("Recv error: %v", err) + return + } + switch msg := msg.(type) { + case derp.ReceivedPacket: + ch <- msg + return + default: + // We don't care about other messages. + } + } + }() + t.Cleanup(func() { + <-done + }) + + return client, ch +} + +// testDERPSend sends a message from src to dstKey and waits for it to be +// received on dstCh. +// +// If the packet doesn't arrive within 500ms, it will try to send it again until +// testutil.WaitLong is reached. +func testDERPSend(t *testing.T, dstKey key.NodePublic, dstCh <-chan derp.ReceivedPacket, src *derphttp.Client) { + t.Helper() + + // The prefix helps identify where the packet starts if you get garbled data + // in logs. + const msgStrPrefix = "test_packet_" + msgStr, err := cryptorand.String(64 - len(msgStrPrefix)) + require.NoError(t, err, "generate random msg string") + msg := []byte(msgStrPrefix + msgStr) + + err = src.Send(dstKey, msg) + require.NoError(t, err, "send message via DERP") + + waitCtx := testutil.Context(t, testutil.WaitLong) + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + for { + select { + case pkt := <-dstCh: + require.Equal(t, src.SelfPublicKey(), pkt.Source, "packet came from wrong source") + require.Equal(t, msg, pkt.Data, "packet data is wrong") + return + case <-waitCtx.Done(): + t.Fatal("timed out waiting for packet") + return + case <-ticker.C: + } + + // Send another packet. Since we're sending packets immediately + // after opening the clients, they might not be meshed together + // properly yet. + err = src.Send(dstKey, msg) + require.NoError(t, err, "send message via DERP") + } +} diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index 448c493307d2e..2b751f11416f2 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -285,52 +285,26 @@ type RegisterWorkspaceProxyLoopOpts struct { } type RegisterWorkspaceProxyLoop struct { - opts RegisterWorkspaceProxyLoopOpts - c *Client - originalRes *RegisterWorkspaceProxyResponse + opts RegisterWorkspaceProxyLoopOpts + c *Client - closedCtx context.Context - close context.CancelFunc - done chan struct{} + // runLoopNow takes a response channel to send the response to and triggers + // the loop to run immediately if it's waiting. + runLoopNow chan chan RegisterWorkspaceProxyResponse + closedCtx context.Context + close context.CancelFunc + done chan struct{} } -// register registers once. It returns the response, whether the loop should -// exit immediately, and any error that occurred. -func (l *RegisterWorkspaceProxyLoop) register(ctx context.Context) (RegisterWorkspaceProxyResponse, bool, error) { - // If it's not the first registration, call the mutate function. - if l.originalRes != nil { - l.mutateFn(&l.opts.Request) - } - +func (l *RegisterWorkspaceProxyLoop) register(ctx context.Context) (RegisterWorkspaceProxyResponse, error) { registerCtx, registerCancel := context.WithTimeout(ctx, l.opts.AttemptTimeout) res, err := l.c.RegisterWorkspaceProxy(registerCtx, l.opts.Request) registerCancel() if err != nil { - return RegisterWorkspaceProxyResponse{}, false, xerrors.Errorf("register workspace proxy: %w", err) - } - - // Catastrophic failures if any of the "permanent" values change. - if l.originalRes != nil && res.AppSecurityKey != l.originalRes.AppSecurityKey { - return RegisterWorkspaceProxyResponse{}, true, xerrors.New("app security key has changed, proxy must be restarted") - } - if l.originalRes != nil && res.DERPMeshKey != l.originalRes.DERPMeshKey { - return RegisterWorkspaceProxyResponse{}, true, xerrors.New("DERP mesh key has changed, proxy must be restarted") - } - if l.originalRes != nil && res.DERPRegionID != l.originalRes.DERPRegionID { - return RegisterWorkspaceProxyResponse{}, true, xerrors.New("DERP region ID has changed, proxy must be restarted") + return RegisterWorkspaceProxyResponse{}, xerrors.Errorf("register workspace proxy: %w", err) } - // If it's not the first registration, call the callback function. - if l.originalRes != nil { - err = l.callbackFn(res) - if err != nil { - return RegisterWorkspaceProxyResponse{}, true, xerrors.Errorf("callback err: %w", err) - } - } else { - l.originalRes = &res - } - - return res, false, nil + return res, nil } // Start starts the proxy registration loop. The provided context is only used @@ -346,7 +320,8 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa l.opts.AttemptTimeout = 10 * time.Second } - originalRes, _, err := l.register(ctx) + var err error + originalRes, err := l.register(ctx) if err != nil { return RegisterWorkspaceProxyResponse{}, xerrors.Errorf("initial registration: %w", err) } @@ -359,10 +334,12 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa ticker = time.NewTicker(l.opts.Interval) ) for { + var respCh chan RegisterWorkspaceProxyResponse select { case <-l.closedCtx.Done(): l.failureFn(xerrors.Errorf("proxy registration loop closed")) return + case respCh = <-l.runLoopNow: case <-ticker.C: } @@ -373,12 +350,9 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa slog.F("failed_attempts", failedAttempts), ) - _, catastrophicErr, err := l.register(l.closedCtx) + l.mutateFn(&l.opts.Request) + resp, err := l.register(l.closedCtx) if err != nil { - if catastrophicErr { - l.failureFn(err) - return - } failedAttempts++ l.opts.Logger.Warn(context.Background(), "failed to re-register workspace proxy with Coder primary", @@ -396,6 +370,44 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa } failedAttempts = 0 + // Check for consistency. + if originalRes.AppSecurityKey != resp.AppSecurityKey { + l.failureFn(xerrors.New("app security key has changed, proxy must be restarted")) + return + } + if originalRes.DERPMeshKey != resp.DERPMeshKey { + l.failureFn(xerrors.New("DERP mesh key has changed, proxy must be restarted")) + return + } + if originalRes.DERPRegionID != resp.DERPRegionID { + l.failureFn(xerrors.New("DERP region ID has changed, proxy must be restarted")) + return + } + + err = l.callbackFn(resp) + if err != nil { + l.failureFn(xerrors.Errorf("callback function returned an error: %w", err)) + return + } + + // If we were triggered by RegisterNow(), send the response back. + // Use a blocking send with 1s timeout in case the caller somehow + // exited. + // + // If this does block then the interval will be off by 1s, but since + // we only call RegisterNow() in tests it's fine. + if respCh != nil { + timeout := time.NewTimer(time.Second) + select { + case respCh <- resp: + close(respCh) + case <-timeout.C: + l.opts.Logger.Warn(ctx, "timeout sending response to RegisterNow from registration loop") + close(respCh) + } + timeout.Stop() + } + ticker.Reset(l.opts.Interval) } }() @@ -403,10 +415,29 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa return originalRes, nil } -func (l *RegisterWorkspaceProxyLoop) RegisterNow(ctx context.Context) (RegisterWorkspaceProxyResponse, error) { - // Catastrophic failures don't affect the loop when manually re-registering. - res, _, err := l.register(ctx) - return res, err +// RegisterNow asks the registration loop to register immediately. A timeout of +// 2x the attempt timeout is used to wait for the response. +func (l *RegisterWorkspaceProxyLoop) RegisterNow() (RegisterWorkspaceProxyResponse, error) { + // Pre-check before we do anything. + select { + case <-l.done: + return RegisterWorkspaceProxyResponse{}, xerrors.New("proxy registration loop closed") + default: + } + + // The channel is closed by the loop after sending the response. + respCh := make(chan RegisterWorkspaceProxyResponse) + select { + case <-l.done: + return RegisterWorkspaceProxyResponse{}, xerrors.New("proxy registration loop closed") + case l.runLoopNow <- respCh: + } + select { + case <-l.done: + return RegisterWorkspaceProxyResponse{}, xerrors.New("proxy registration loop closed") + case resp := <-respCh: + return resp, nil + } } func (l *RegisterWorkspaceProxyLoop) Close() { @@ -460,11 +491,12 @@ func (l *RegisterWorkspaceProxyLoop) failureFn(err error) { func (c *Client) RegisterWorkspaceProxyLoop(ctx context.Context, opts RegisterWorkspaceProxyLoopOpts) (*RegisterWorkspaceProxyLoop, RegisterWorkspaceProxyResponse, error) { closedCtx, closeFn := context.WithCancel(context.Background()) loop := &RegisterWorkspaceProxyLoop{ - opts: opts, - c: c, - closedCtx: closedCtx, - close: closeFn, - done: make(chan struct{}), + opts: opts, + c: c, + runLoopNow: make(chan chan RegisterWorkspaceProxyResponse), + closedCtx: closedCtx, + close: closeFn, + done: make(chan struct{}), } regResp, err := loop.Start(ctx) From 99ab1cc72f8504cd21220093e9bde91ffcad7d68 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Fri, 1 Mar 2024 14:14:09 +0000 Subject: [PATCH 4/4] PR comments --- enterprise/wsproxy/wsproxy_test.go | 28 ++++++++++----------- enterprise/wsproxy/wsproxysdk/wsproxysdk.go | 25 +++--------------- 2 files changed, 17 insertions(+), 36 deletions(-) diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 44790b34d82ac..e8fed4f35c594 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -1,6 +1,7 @@ package wsproxy_test import ( + "context" "fmt" "net" "net/url" @@ -520,14 +521,15 @@ func TestDERPMesh(t *testing.T) { t.Parallel() t.Logf("derp1=%s, derp2=%s", c[0], c[1]) - client1, client1Recv := createDERPClient(t, "client1", c[0]) - client2, client2Recv := createDERPClient(t, "client2", c[1]) + ctx := testutil.Context(t, testutil.WaitLong) + client1, client1Recv := createDERPClient(t, ctx, "client1", c[0]) + client2, client2Recv := createDERPClient(t, ctx, "client2", c[1]) // Send a packet from client 1 to client 2. - testDERPSend(t, client2.SelfPublicKey(), client2Recv, client1) + testDERPSend(t, ctx, client2.SelfPublicKey(), client2Recv, client1) // Send a packet from client 2 to client 1. - testDERPSend(t, client1.SelfPublicKey(), client1Recv, client2) + testDERPSend(t, ctx, client1.SelfPublicKey(), client1Recv, client2) }) } } @@ -603,7 +605,9 @@ func TestWorkspaceProxyWorkspaceApps(t *testing.T) { // createDERPClient creates a DERP client and spawns a goroutine that reads from // the client and sends the received packets to a channel. -func createDERPClient(t *testing.T, name string, derpURL string) (*derphttp.Client, <-chan derp.ReceivedPacket) { +// +//nolint:revive +func createDERPClient(t *testing.T, ctx context.Context, name string, derpURL string) (*derphttp.Client, <-chan derp.ReceivedPacket) { t.Helper() client, err := derphttp.NewClient(key.NewNode(), derpURL, func(format string, args ...any) { @@ -613,14 +617,12 @@ func createDERPClient(t *testing.T, name string, derpURL string) (*derphttp.Clie t.Cleanup(func() { _ = client.Close() }) - err = client.Connect(testutil.Context(t, testutil.WaitLong)) + err = client.Connect(ctx) require.NoError(t, err, "connect to DERP server") ch := make(chan derp.ReceivedPacket, 1) - done := make(chan struct{}) go func() { defer close(ch) - defer close(done) for { msg, err := client.Recv() if err != nil { @@ -636,9 +638,6 @@ func createDERPClient(t *testing.T, name string, derpURL string) (*derphttp.Clie } } }() - t.Cleanup(func() { - <-done - }) return client, ch } @@ -648,7 +647,9 @@ func createDERPClient(t *testing.T, name string, derpURL string) (*derphttp.Clie // // If the packet doesn't arrive within 500ms, it will try to send it again until // testutil.WaitLong is reached. -func testDERPSend(t *testing.T, dstKey key.NodePublic, dstCh <-chan derp.ReceivedPacket, src *derphttp.Client) { +// +//nolint:revive +func testDERPSend(t *testing.T, ctx context.Context, dstKey key.NodePublic, dstCh <-chan derp.ReceivedPacket, src *derphttp.Client) { t.Helper() // The prefix helps identify where the packet starts if you get garbled data @@ -661,7 +662,6 @@ func testDERPSend(t *testing.T, dstKey key.NodePublic, dstCh <-chan derp.Receive err = src.Send(dstKey, msg) require.NoError(t, err, "send message via DERP") - waitCtx := testutil.Context(t, testutil.WaitLong) ticker := time.NewTicker(time.Millisecond * 500) defer ticker.Stop() for { @@ -670,7 +670,7 @@ func testDERPSend(t *testing.T, dstKey key.NodePublic, dstCh <-chan derp.Receive require.Equal(t, src.SelfPublicKey(), pkt.Source, "packet came from wrong source") require.Equal(t, msg, pkt.Data, "packet data is wrong") return - case <-waitCtx.Done(): + case <-ctx.Done(): t.Fatal("timed out waiting for packet") return case <-ticker.C: diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index 2b751f11416f2..37636102bb413 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -391,21 +391,9 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa } // If we were triggered by RegisterNow(), send the response back. - // Use a blocking send with 1s timeout in case the caller somehow - // exited. - // - // If this does block then the interval will be off by 1s, but since - // we only call RegisterNow() in tests it's fine. if respCh != nil { - timeout := time.NewTimer(time.Second) - select { - case respCh <- resp: - close(respCh) - case <-timeout.C: - l.opts.Logger.Warn(ctx, "timeout sending response to RegisterNow from registration loop") - close(respCh) - } - timeout.Stop() + respCh <- resp + close(respCh) } ticker.Reset(l.opts.Interval) @@ -418,15 +406,8 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa // RegisterNow asks the registration loop to register immediately. A timeout of // 2x the attempt timeout is used to wait for the response. func (l *RegisterWorkspaceProxyLoop) RegisterNow() (RegisterWorkspaceProxyResponse, error) { - // Pre-check before we do anything. - select { - case <-l.done: - return RegisterWorkspaceProxyResponse{}, xerrors.New("proxy registration loop closed") - default: - } - // The channel is closed by the loop after sending the response. - respCh := make(chan RegisterWorkspaceProxyResponse) + respCh := make(chan RegisterWorkspaceProxyResponse, 1) select { case <-l.done: return RegisterWorkspaceProxyResponse{}, xerrors.New("proxy registration loop closed")