Skip to content

Commit 2f0a999

Browse files
authored
chore: add derpserver to wsproxy, add proxies to derpmap (#7311)
1 parent 70692c2 commit 2f0a999

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+3001
-386
lines changed

agent/agent.go

+50-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/spf13/afero"
2828
"go.uber.org/atomic"
2929
"golang.org/x/exp/slices"
30+
"golang.org/x/sync/errgroup"
3031
"golang.org/x/xerrors"
3132
"tailscale.com/net/speedtest"
3233
"tailscale.com/tailcfg"
@@ -72,6 +73,7 @@ type Options struct {
7273
type Client interface {
7374
Manifest(ctx context.Context) (agentsdk.Manifest, error)
7475
Listen(ctx context.Context) (net.Conn, error)
76+
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
7577
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
7678
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
7779
PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
@@ -699,12 +701,26 @@ func (a *agent) run(ctx context.Context) error {
699701
network.SetBlockEndpoints(manifest.DisableDirectConnections)
700702
}
701703

702-
a.logger.Debug(ctx, "running tailnet connection coordinator")
703-
err = a.runCoordinator(ctx, network)
704-
if err != nil {
705-
return xerrors.Errorf("run coordinator: %w", err)
706-
}
707-
return nil
704+
eg, egCtx := errgroup.WithContext(ctx)
705+
eg.Go(func() error {
706+
a.logger.Debug(egCtx, "running tailnet connection coordinator")
707+
err := a.runCoordinator(egCtx, network)
708+
if err != nil {
709+
return xerrors.Errorf("run coordinator: %w", err)
710+
}
711+
return nil
712+
})
713+
714+
eg.Go(func() error {
715+
a.logger.Debug(egCtx, "running derp map subscriber")
716+
err := a.runDERPMapSubscriber(egCtx, network)
717+
if err != nil {
718+
return xerrors.Errorf("run derp map subscriber: %w", err)
719+
}
720+
return nil
721+
})
722+
723+
return eg.Wait()
708724
}
709725

710726
func (a *agent) wireguardAddresses(agentID uuid.UUID) []netip.Prefix {
@@ -927,6 +943,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
927943
}
928944
}
929945

946+
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
947+
func (a *agent) runDERPMapSubscriber(ctx context.Context, network *tailnet.Conn) error {
948+
ctx, cancel := context.WithCancel(ctx)
949+
defer cancel()
950+
951+
updates, closer, err := a.client.DERPMapUpdates(ctx)
952+
if err != nil {
953+
return err
954+
}
955+
defer closer.Close()
956+
957+
a.logger.Info(ctx, "connected to derp map endpoint")
958+
for {
959+
select {
960+
case <-ctx.Done():
961+
return ctx.Err()
962+
case update := <-updates:
963+
if update.Err != nil {
964+
return update.Err
965+
}
966+
if update.DERPMap != nil && !tailnet.CompareDERPMaps(network.DERPMap(), update.DERPMap) {
967+
a.logger.Info(ctx, "updating derp map due to detected changes")
968+
network.SetDERPMap(update.DERPMap)
969+
}
970+
}
971+
}
972+
}
973+
930974
func (a *agent) runStartupScript(ctx context.Context, script string) error {
931975
return a.runScript(ctx, "startup", script)
932976
}

agent/agent_test.go

+116-2
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,120 @@ func TestAgent_Dial(t *testing.T) {
17171717
}
17181718
}
17191719

1720+
// TestAgent_UpdatedDERP checks that agents can handle their DERP map being
1721+
// updated, and that clients can also handle it.
1722+
func TestAgent_UpdatedDERP(t *testing.T) {
1723+
t.Parallel()
1724+
1725+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
1726+
1727+
originalDerpMap, _ := tailnettest.RunDERPAndSTUN(t)
1728+
require.NotNil(t, originalDerpMap)
1729+
1730+
coordinator := tailnet.NewCoordinator(logger)
1731+
defer func() {
1732+
_ = coordinator.Close()
1733+
}()
1734+
agentID := uuid.New()
1735+
statsCh := make(chan *agentsdk.Stats, 50)
1736+
fs := afero.NewMemMapFs()
1737+
client := agenttest.NewClient(t,
1738+
logger.Named("agent"),
1739+
agentID,
1740+
agentsdk.Manifest{
1741+
DERPMap: originalDerpMap,
1742+
// Force DERP.
1743+
DisableDirectConnections: true,
1744+
},
1745+
statsCh,
1746+
coordinator,
1747+
)
1748+
closer := agent.New(agent.Options{
1749+
Client: client,
1750+
Filesystem: fs,
1751+
Logger: logger.Named("agent"),
1752+
ReconnectingPTYTimeout: time.Minute,
1753+
})
1754+
defer func() {
1755+
_ = closer.Close()
1756+
}()
1757+
1758+
// Setup a client connection.
1759+
newClientConn := func(derpMap *tailcfg.DERPMap) *codersdk.WorkspaceAgentConn {
1760+
conn, err := tailnet.NewConn(&tailnet.Options{
1761+
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
1762+
DERPMap: derpMap,
1763+
Logger: logger.Named("client"),
1764+
})
1765+
require.NoError(t, err)
1766+
clientConn, serverConn := net.Pipe()
1767+
serveClientDone := make(chan struct{})
1768+
t.Cleanup(func() {
1769+
_ = clientConn.Close()
1770+
_ = serverConn.Close()
1771+
_ = conn.Close()
1772+
<-serveClientDone
1773+
})
1774+
go func() {
1775+
defer close(serveClientDone)
1776+
err := coordinator.ServeClient(serverConn, uuid.New(), agentID)
1777+
assert.NoError(t, err)
1778+
}()
1779+
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
1780+
return conn.UpdateNodes(nodes, false)
1781+
})
1782+
conn.SetNodeCallback(sendNode)
1783+
// Force DERP.
1784+
conn.SetBlockEndpoints(true)
1785+
1786+
sdkConn := codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{
1787+
AgentID: agentID,
1788+
CloseFunc: func() error { return codersdk.ErrSkipClose },
1789+
})
1790+
t.Cleanup(func() {
1791+
_ = sdkConn.Close()
1792+
})
1793+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
1794+
defer cancel()
1795+
if !sdkConn.AwaitReachable(ctx) {
1796+
t.Fatal("agent not reachable")
1797+
}
1798+
1799+
return sdkConn
1800+
}
1801+
conn1 := newClientConn(originalDerpMap)
1802+
1803+
// Change the DERP map.
1804+
newDerpMap, _ := tailnettest.RunDERPAndSTUN(t)
1805+
require.NotNil(t, newDerpMap)
1806+
1807+
// Change the region ID.
1808+
newDerpMap.Regions[2] = newDerpMap.Regions[1]
1809+
delete(newDerpMap.Regions, 1)
1810+
newDerpMap.Regions[2].RegionID = 2
1811+
for _, node := range newDerpMap.Regions[2].Nodes {
1812+
node.RegionID = 2
1813+
}
1814+
1815+
// Push a new DERP map to the agent.
1816+
err := client.PushDERPMapUpdate(agentsdk.DERPMapUpdate{
1817+
DERPMap: newDerpMap,
1818+
})
1819+
require.NoError(t, err)
1820+
1821+
// Connect from a second client and make sure it uses the new DERP map.
1822+
conn2 := newClientConn(newDerpMap)
1823+
require.Equal(t, []int{2}, conn2.DERPMap().RegionIDs())
1824+
1825+
// If the first client gets a DERP map update, it should be able to
1826+
// reconnect just fine.
1827+
conn1.SetDERPMap(newDerpMap)
1828+
require.Equal(t, []int{2}, conn1.DERPMap().RegionIDs())
1829+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
1830+
defer cancel()
1831+
require.True(t, conn1.AwaitReachable(ctx))
1832+
}
1833+
17201834
func TestAgent_Speedtest(t *testing.T) {
17211835
t.Parallel()
17221836
t.Skip("This test is relatively flakey because of Tailscale's speedtest code...")
@@ -1940,8 +2054,8 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
19402054
defer close(serveClientDone)
19412055
coordinator.ServeClient(serverConn, uuid.New(), metadata.AgentID)
19422056
}()
1943-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error {
1944-
return conn.UpdateNodes(node, false)
2057+
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
2058+
return conn.UpdateNodes(nodes, false)
19452059
})
19462060
conn.SetNodeCallback(sendNode)
19472061
agentConn := codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{

agent/agenttest/client.go

+30-6
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010

1111
"github.com/google/uuid"
1212
"golang.org/x/exp/maps"
13+
"golang.org/x/xerrors"
1314

1415
"cdr.dev/slog"
1516
"github.com/coder/coder/codersdk"
1617
"github.com/coder/coder/codersdk/agentsdk"
1718
"github.com/coder/coder/tailnet"
19+
"github.com/coder/coder/testutil"
1820
)
1921

2022
func NewClient(t testing.TB,
@@ -28,12 +30,13 @@ func NewClient(t testing.TB,
2830
manifest.AgentID = agentID
2931
}
3032
return &Client{
31-
t: t,
32-
logger: logger.Named("client"),
33-
agentID: agentID,
34-
manifest: manifest,
35-
statsChan: statsChan,
36-
coordinator: coordinator,
33+
t: t,
34+
logger: logger.Named("client"),
35+
agentID: agentID,
36+
manifest: manifest,
37+
statsChan: statsChan,
38+
coordinator: coordinator,
39+
derpMapUpdates: make(chan agentsdk.DERPMapUpdate),
3740
}
3841
}
3942

@@ -53,6 +56,7 @@ type Client struct {
5356
lifecycleStates []codersdk.WorkspaceAgentLifecycle
5457
startup agentsdk.PostStartupRequest
5558
logs []agentsdk.StartupLog
59+
derpMapUpdates chan agentsdk.DERPMapUpdate
5660
}
5761

5862
func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
@@ -191,6 +195,26 @@ func (c *Client) GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerCo
191195
return codersdk.ServiceBannerConfig{}, nil
192196
}
193197

198+
func (c *Client) PushDERPMapUpdate(update agentsdk.DERPMapUpdate) error {
199+
timer := time.NewTimer(testutil.WaitShort)
200+
defer timer.Stop()
201+
select {
202+
case c.derpMapUpdates <- update:
203+
case <-timer.C:
204+
return xerrors.New("timeout waiting to push derp map update")
205+
}
206+
207+
return nil
208+
}
209+
210+
func (c *Client) DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error) {
211+
closed := make(chan struct{})
212+
return c.derpMapUpdates, closeFunc(func() error {
213+
close(closed)
214+
return nil
215+
}), nil
216+
}
217+
194218
type closeFunc func() error
195219

196220
func (c closeFunc) Close() error {

cli/netcheck.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (r *RootCmd) netcheck() *clibase.Cmd {
2626
ctx, cancel := context.WithTimeout(inv.Context(), 30*time.Second)
2727
defer cancel()
2828

29-
connInfo, err := client.WorkspaceAgentConnectionInfo(ctx)
29+
connInfo, err := client.WorkspaceAgentConnectionInfoGeneric(ctx)
3030
if err != nil {
3131
return err
3232
}

cli/server.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
477477
AppHostnameRegex: appHostnameRegex,
478478
Logger: logger.Named("coderd"),
479479
Database: dbfake.New(),
480-
DERPMap: derpMap,
480+
BaseDERPMap: derpMap,
481481
Pubsub: pubsub.NewInMemory(),
482482
CacheDir: cacheDir,
483483
GoogleTokenValidator: googleTokenValidator,
@@ -822,7 +822,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
822822

823823
if cfg.Prometheus.Enable {
824824
// Agent metrics require reference to the tailnet coordinator, so must be initiated after Coder API.
825-
closeAgentsFunc, err := prometheusmetrics.Agents(ctx, logger, options.PrometheusRegistry, coderAPI.Database, &coderAPI.TailnetCoordinator, options.DERPMap, coderAPI.Options.AgentInactiveDisconnectTimeout, 0)
825+
closeAgentsFunc, err := prometheusmetrics.Agents(ctx, logger, options.PrometheusRegistry, coderAPI.Database, &coderAPI.TailnetCoordinator, coderAPI.DERPMap, coderAPI.Options.AgentInactiveDisconnectTimeout, 0)
826826
if err != nil {
827827
return xerrors.Errorf("register agents prometheus metric: %w", err)
828828
}

0 commit comments

Comments
 (0)