Skip to content

feat: add derp mesh health checking in workspace proxies #12222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: add test for workspace proxy derp meshing
  • Loading branch information
deansheather committed Feb 20, 2024
commit 9cd470a0840b37db2a6282d6adee0d2187bc2d85
34 changes: 25 additions & 9 deletions enterprise/coderd/coderdenttest/proxytest.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,24 @@ type ProxyOptions struct {
// ProxyURL is optional
ProxyURL *url.URL

// Token is optional. If specified, a new proxy won't be created.
Token string

// FlushStats is optional
FlushStats chan chan<- struct{}
}

type WorkspaceProxy struct {
*wsproxy.Server

ServerURL *url.URL
}

// NewWorkspaceProxy will configure a wsproxy.Server with the given options.
// The new wsproxy will register itself with the given coderd.API instance.
// The first user owner client is required to create the wsproxy on the coderd
// api server.
func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Client, options *ProxyOptions) *wsproxy.Server {
func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Client, options *ProxyOptions) WorkspaceProxy {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

Expand Down Expand Up @@ -107,11 +116,15 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie
options.Name = namesgenerator.GetRandomName(1)
}

proxyRes, err := owner.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{
Name: options.Name,
Icon: "/emojis/flag.png",
})
require.NoError(t, err, "failed to create workspace proxy")
token := options.Token
if token == "" {
proxyRes, err := owner.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{
Name: options.Name,
Icon: "/emojis/flag.png",
})
require.NoError(t, err, "failed to create workspace proxy")
token = proxyRes.ProxyToken
}

// Inherit collector options from coderd, but keep the wsproxy reporter.
statsCollectorOptions := coderdAPI.Options.WorkspaceAppsStatsCollectorOptions
Expand All @@ -131,14 +144,14 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie
Tracing: coderdAPI.TracerProvider,
APIRateLimit: coderdAPI.APIRateLimit,
SecureAuthCookie: coderdAPI.SecureAuthCookie,
ProxySessionToken: proxyRes.ProxyToken,
ProxySessionToken: token,
DisablePathApps: options.DisablePathApps,
// We need a new registry to not conflict with the coderd internal
// proxy metrics.
PrometheusRegistry: prometheus.NewRegistry(),
DERPEnabled: !options.DerpDisabled,
DERPOnly: options.DerpOnly,
DERPServerRelayAddress: accessURL.String(),
DERPServerRelayAddress: serverURL.String(),
StatsCollectorOptions: statsCollectorOptions,
})
require.NoError(t, err)
Expand All @@ -151,5 +164,8 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie
handler = wssrv.Handler
mutex.Unlock()

return wssrv
return WorkspaceProxy{
Server: wssrv,
ServerURL: serverURL,
}
}
26 changes: 14 additions & 12 deletions enterprise/wsproxy/wsproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type Server struct {
ctx context.Context
cancel context.CancelFunc
derpCloseFunc func()
registerDone <-chan struct{}
registerLoop *wsproxysdk.RegisterWorkspaceProxyLoop
}

// New creates a new workspace proxy server. This requires a primary coderd
Expand Down Expand Up @@ -210,7 +210,7 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
// goroutine to periodically re-register.
replicaID := uuid.New()
osHostname := cliutil.Hostname()
regResp, registerDone, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{
registerLoop, regResp, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{
Logger: opts.Logger,
Request: wsproxysdk.RegisterWorkspaceProxyRequest{
AccessURL: opts.AccessURL.String(),
Expand All @@ -230,12 +230,13 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
if err != nil {
return nil, xerrors.Errorf("register proxy: %w", err)
}
s.registerDone = registerDone
err = s.handleRegister(ctx, regResp)
s.registerLoop = registerLoop

derpServer.SetMeshKey(regResp.DERPMeshKey)
err = s.handleRegister(regResp)
if err != nil {
return nil, xerrors.Errorf("handle register: %w", err)
}
derpServer.SetMeshKey(regResp.DERPMeshKey)

secKey, err := workspaceapps.KeyFromString(regResp.AppSecurityKey)
if err != nil {
Expand Down Expand Up @@ -409,16 +410,16 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
return s, nil
}

func (s *Server) RegisterNow(ctx context.Context) error {
_, err := s.registerLoop.RegisterNow(ctx)
return err
}

func (s *Server) Close() error {
s.cancel()

var err error
registerDoneWaitTicker := time.NewTicker(11 * time.Second) // the attempt timeout is 10s
select {
case <-registerDoneWaitTicker.C:
err = multierror.Append(err, xerrors.New("timed out waiting for registerDone"))
case <-s.registerDone:
}
s.registerLoop.Close()
s.derpCloseFunc()
appServerErr := s.AppServer.Close()
if appServerErr != nil {
Expand All @@ -437,11 +438,12 @@ func (*Server) mutateRegister(_ *wsproxysdk.RegisterWorkspaceProxyRequest) {
// package in the primary and update req.ReplicaError accordingly.
}

func (s *Server) handleRegister(_ context.Context, res wsproxysdk.RegisterWorkspaceProxyResponse) error {
func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) error {
addresses := make([]string, len(res.SiblingReplicas))
for i, replica := range res.SiblingReplicas {
addresses[i] = replica.RelayAddress
}
s.Logger.Debug(s.ctx, "setting DERP mesh sibling addresses", slog.F("addresses", addresses))
s.derpMesh.SetAddresses(addresses, false)

s.latestDERPMap.Store(res.DERPMap)
Expand Down
165 changes: 165 additions & 0 deletions enterprise/wsproxy/wsproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package wsproxy_test
import (
"fmt"
"net"
"net/url"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"tailscale.com/derp"
"tailscale.com/derp/derphttp"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
Expand All @@ -22,6 +25,7 @@ import (
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/workspaceapps/apptest"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
"github.com/coder/coder/v2/enterprise/coderd/license"
"github.com/coder/coder/v2/provisioner/echo"
Expand Down Expand Up @@ -430,6 +434,167 @@ resourceLoop:
require.False(t, p2p)
}

// TestDERPMesh spawns 10 workspace proxy replicas and tries to connect to a
// single DERP peer via every single one.
func TestDERPMesh(t *testing.T) {
t.Parallel()

deploymentValues := coderdtest.DeploymentValues(t)
deploymentValues.Experiments = []string{
"*",
}

client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{
Options: &coderdtest.Options{
DeploymentValues: deploymentValues,
AppHostname: "*.primary.test.coder.com",
IncludeProvisionerDaemon: true,
RealIPConfig: &httpmw.RealIPConfig{
TrustedOrigins: []*net.IPNet{{
IP: net.ParseIP("127.0.0.1"),
Mask: net.CIDRMask(8, 32),
}},
TrustedHeaders: []string{
"CF-Connecting-IP",
},
},
},
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureWorkspaceProxy: 1,
},
},
})
t.Cleanup(func() {
_ = closer.Close()
})

proxyURL, err := url.Parse("https://proxy.test.coder.com")
require.NoError(t, err)

// Create 10 proxy replicas.
const count = 10
var (
sessionToken = ""
proxies = [count]coderdenttest.WorkspaceProxy{}
derpURLs = [count]string{}
)
for i := range proxies {
proxies[i] = coderdenttest.NewWorkspaceProxy(t, api, client, &coderdenttest.ProxyOptions{
Name: "best-proxy",
Token: sessionToken,
ProxyURL: proxyURL,
})
if i == 0 {
sessionToken = proxies[i].Options.ProxySessionToken
}

derpURL := *proxies[i].ServerURL
derpURL.Path = "/derp"
derpURLs[i] = derpURL.String()
}

// Force all proxies to re-register immediately. This ensures the DERP mesh
// is up-to-date. In production this will happen automatically after about
// 15 seconds.
for i, proxy := range proxies {
err := proxy.RegisterNow(testutil.Context(t, testutil.WaitLong))
require.NoErrorf(t, err, "failed to force proxy %d to re-register", i)
}

createClient := func(t *testing.T, name string, derpURL string) (*derphttp.Client, <-chan derp.ReceivedPacket) {
t.Helper()

client, err := derphttp.NewClient(key.NewNode(), derpURLs[0], func(format string, args ...any) {
t.Logf(name+": "+format, args...)
})
require.NoError(t, err, "create client")
err = client.Connect(testutil.Context(t, testutil.WaitLong))
require.NoError(t, err, "connect to DERP server")

ch := make(chan derp.ReceivedPacket, 64)
go func() {
defer close(ch)
for {
msg, err := client.Recv()
if err != nil {
t.Logf("Recv error: %v", err)
return
}
switch msg := msg.(type) {
case derp.ReceivedPacket:
ch <- msg
return
default:
// We don't care about other messages.
}
}
}()

return client, ch
}

sendTest := func(t *testing.T, dstKey key.NodePublic, dstCh <-chan derp.ReceivedPacket, src *derphttp.Client) {
t.Helper()

const msgStrPrefix = "test_packet_"
msgStr, err := cryptorand.String(64 - len(msgStrPrefix))
require.NoError(t, err, "generate random msg string")
msg := []byte(msgStrPrefix + msgStr)

err = src.Send(dstKey, msg)
require.NoError(t, err, "send message via DERP")

waitCtx := testutil.Context(t, testutil.WaitLong)
ticker := time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
for {
select {
case pkt := <-dstCh:
assert.Equal(t, src.SelfPublicKey(), pkt.Source, "packet came from wrong source")
assert.Equal(t, msg, pkt.Data, "packet data is wrong")
return
case <-waitCtx.Done():
t.Fatal("timed out waiting for packet")
return
case <-ticker.C:
}

// Send another packet. Since we're sending packets immediately
// after opening the clients, they might not be meshed together
// properly yet.
err = src.Send(dstKey, msg)
require.NoError(t, err, "send message via DERP")
}
}

for i, derpURL := range derpURLs {
i, derpURL := i, derpURL
t.Run(fmt.Sprintf("Proxy%d", i), func(t *testing.T) {
t.Parallel()
t.Logf("derp1=%s, derp2=%s", derpURLs[0], derpURL)

// Client 1 is always on the first proxy as a "control".
client1, client1Recv := createClient(t, "client1", derpURLs[0])
t.Cleanup(func() {
_ = client1.Close()
})

// Client 2 is on the current proxy.
client2, client2Recv := createClient(t, "client2", derpURL)
t.Cleanup(func() {
_ = client2.Close()
})

// Send a packet from client 1 to client 2.
sendTest(t, client2.SelfPublicKey(), client2Recv, client1)

// Send a packet from client 2 to client 1.
sendTest(t, client1.SelfPublicKey(), client1Recv, client2)
})
}
}

func TestWorkspaceProxyWorkspaceApps(t *testing.T) {
t.Parallel()

Expand Down
Loading