Skip to content

Commit 404c3e4

Browse files
committed
update derp map on new connection
1 parent 3caa692 commit 404c3e4

File tree

13 files changed

+481
-103
lines changed

13 files changed

+481
-103
lines changed

agent/agent.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -810,8 +810,14 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
810810
}
811811
defer coordinator.Close()
812812
a.logger.Info(ctx, "connected to coordination endpoint")
813-
sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(nodes []*tailnet.Node) error {
814-
return network.UpdateNodes(nodes, false)
813+
sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(update tailnet.CoordinatorNodeUpdate) error {
814+
// Check if we need to update our DERP map.
815+
if !tailnet.CompareDERPMaps(network.DERPMap(), update.DERPMap) {
816+
a.logger.Info(ctx, "updating DERP map on connection request due to changes", slog.F("old", network.DERPMap()), slog.F("new", update.DERPMap))
817+
network.SetDERPMap(update.DERPMap)
818+
}
819+
820+
return network.UpdateNodes(update.Nodes, false)
815821
})
816822
network.SetNodeCallback(sendNodes)
817823
select {

agent/agent_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,7 @@ func TestAgent_StartupScript(t *testing.T) {
889889
DERPMap: &tailcfg.DERPMap{},
890890
},
891891
statsChan: make(chan *agentsdk.Stats),
892-
coordinator: tailnet.NewCoordinator(logger),
892+
coordinator: tailnet.NewCoordinator(logger, emptyDerpMapFn),
893893
}
894894
closer := agent.New(agent.Options{
895895
Client: client,
@@ -930,7 +930,7 @@ func TestAgent_StartupScript(t *testing.T) {
930930
return codersdk.ReadBodyAsError(res)
931931
},
932932
statsChan: make(chan *agentsdk.Stats),
933-
coordinator: tailnet.NewCoordinator(logger),
933+
coordinator: tailnet.NewCoordinator(logger, emptyDerpMapFn),
934934
}
935935
closer := agent.New(agent.Options{
936936
Client: client,
@@ -1327,7 +1327,7 @@ func TestAgent_Lifecycle(t *testing.T) {
13271327
ShutdownScript: "echo " + expected,
13281328
},
13291329
statsChan: make(chan *agentsdk.Stats),
1330-
coordinator: tailnet.NewCoordinator(logger),
1330+
coordinator: tailnet.NewCoordinator(logger, emptyDerpMapFn),
13311331
}
13321332

13331333
fs := afero.NewMemMapFs()
@@ -1585,7 +1585,7 @@ func TestAgent_Reconnect(t *testing.T) {
15851585
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
15861586
// After the agent is disconnected from a coordinator, it's supposed
15871587
// to reconnect!
1588-
coordinator := tailnet.NewCoordinator(logger)
1588+
coordinator := tailnet.NewCoordinator(logger, emptyDerpMapFn)
15891589
defer coordinator.Close()
15901590

15911591
agentID := uuid.New()
@@ -1623,7 +1623,7 @@ func TestAgent_Reconnect(t *testing.T) {
16231623
func TestAgent_WriteVSCodeConfigs(t *testing.T) {
16241624
t.Parallel()
16251625
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
1626-
coordinator := tailnet.NewCoordinator(logger)
1626+
coordinator := tailnet.NewCoordinator(logger, emptyDerpMapFn)
16271627
defer coordinator.Close()
16281628

16291629
client := &client{
@@ -1737,7 +1737,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
17371737
if metadata.DERPMap == nil {
17381738
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)
17391739
}
1740-
coordinator := tailnet.NewCoordinator(logger)
1740+
coordinator := tailnet.NewCoordinator(logger, func() *tailcfg.DERPMap {
1741+
return metadata.DERPMap
1742+
})
17411743
t.Cleanup(func() {
17421744
_ = coordinator.Close()
17431745
})
@@ -1785,8 +1787,10 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
17851787
defer close(serveClientDone)
17861788
coordinator.ServeClient(serverConn, uuid.New(), agentID)
17871789
}()
1788-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error {
1789-
return conn.UpdateNodes(node, false)
1790+
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(update tailnet.CoordinatorNodeUpdate) error {
1791+
// Don't need to worry about updating the DERP map since it'll never
1792+
// change in this test (as we aren't dealing with proxies etc.)
1793+
return conn.UpdateNodes(update.Nodes, false)
17901794
})
17911795
conn.SetNodeCallback(sendNode)
17921796
agentConn := &codersdk.WorkspaceAgentConn{
@@ -2095,3 +2099,7 @@ func verifyCollectedMetrics(t *testing.T, expected []agentsdk.AgentMetric, actua
20952099
}
20962100
return true
20972101
}
2102+
2103+
func emptyDerpMapFn() *tailcfg.DERPMap {
2104+
return &tailcfg.DERPMap{}
2105+
}

coderd/coderd.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,6 @@ func New(options *Options) *API {
224224
if options.PrometheusRegistry == nil {
225225
options.PrometheusRegistry = prometheus.NewRegistry()
226226
}
227-
if options.TailnetCoordinator == nil {
228-
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger)
229-
}
230227
if options.DERPServer == nil {
231228
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp")))
232229
}
@@ -314,6 +311,9 @@ func New(options *Options) *API {
314311
Experiments: experiments,
315312
healthCheckGroup: &singleflight.Group[string, *healthcheck.Report]{},
316313
}
314+
if options.TailnetCoordinator == nil {
315+
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger, api.DERPMap)
316+
}
317317
if options.HealthcheckFunc == nil {
318318
options.HealthcheckFunc = func(ctx context.Context, apiKey string) (*healthcheck.Report, error) {
319319
return healthcheck.Run(ctx, &healthcheck.ReportOptions{

coderd/prometheusmetrics/prometheusmetrics_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,13 @@ func TestAgents(t *testing.T) {
300300
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
301301

302302
// given
303-
coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug))
304-
coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{}
305-
coordinatorPtr.Store(&coordinator)
306303
derpMap := tailnettest.RunDERPAndSTUN(t)
307304
derpMapFn := func() *tailcfg.DERPMap {
308305
return derpMap
309306
}
307+
coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug), derpMapFn)
308+
coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{}
309+
coordinatorPtr.Store(&coordinator)
310310
agentInactiveDisconnectTimeout := 1 * time.Hour // don't need to focus on this value in tests
311311
registry := prometheus.NewRegistry()
312312

coderd/workspaceagents.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,13 @@ func (api *API) dialWorkspaceAgentTailnet(agentID uuid.UUID) (*codersdk.Workspac
687687
return left
688688
})
689689

690-
sendNodes, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error {
691-
err = conn.UpdateNodes(node, true)
690+
sendNodes, _ := tailnet.ServeCoordinator(clientConn, func(update tailnet.CoordinatorNodeUpdate) error {
691+
// Check if we need to update the DERP map used by the connection.
692+
if !tailnet.CompareDERPMaps(conn.DERPMap(), update.DERPMap) {
693+
conn.SetDERPMap(update.DERPMap)
694+
}
695+
696+
err = conn.UpdateNodes(update.Nodes, true)
692697
if err != nil {
693698
return xerrors.Errorf("update nodes: %w", err)
694699
}

coderd/wsconncache/wsconncache_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/stretchr/testify/require"
2121
"go.uber.org/atomic"
2222
"go.uber.org/goleak"
23+
"tailscale.com/tailcfg"
2324

2425
"cdr.dev/slog"
2526
"cdr.dev/slog/sloggers/slogtest"
@@ -159,7 +160,9 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
159160
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
160161
manifest.DERPMap = tailnettest.RunDERPAndSTUN(t)
161162

162-
coordinator := tailnet.NewCoordinator(logger)
163+
coordinator := tailnet.NewCoordinator(logger, func() *tailcfg.DERPMap {
164+
return manifest.DERPMap
165+
})
163166
t.Cleanup(func() {
164167
_ = coordinator.Close()
165168
})
@@ -190,8 +193,10 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
190193
_ = conn.Close()
191194
})
192195
go coordinator.ServeClient(serverConn, uuid.New(), agentID)
193-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error {
194-
return conn.UpdateNodes(node, false)
196+
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(update tailnet.CoordinatorNodeUpdate) error {
197+
// Don't need to worry about updating the DERP map since it'll never
198+
// change in this test (as we aren't dealing with proxies etc.)
199+
return conn.UpdateNodes(update.Nodes, false)
195200
})
196201
conn.SetNodeCallback(sendNode)
197202
agentConn := &codersdk.WorkspaceAgentConn{

codersdk/workspaceagents.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,13 @@ func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, opti
248248
options.Logger.Debug(ctx, "failed to dial", slog.Error(err))
249249
continue
250250
}
251-
sendNode, errChan := tailnet.ServeCoordinator(websocket.NetConn(ctx, ws, websocket.MessageBinary), func(node []*tailnet.Node) error {
252-
return conn.UpdateNodes(node, false)
251+
sendNode, errChan := tailnet.ServeCoordinator(websocket.NetConn(ctx, ws, websocket.MessageBinary), func(update tailnet.CoordinatorNodeUpdate) error {
252+
// Check if we need to update the DERP map used by the connection.
253+
if !tailnet.CompareDERPMaps(conn.DERPMap(), update.DERPMap) {
254+
options.Logger.Debug(ctx, "updating DERP map on connection request due to changes", slog.F("old", conn.DERPMap()), slog.F("new", update.DERPMap))
255+
conn.SetDERPMap(update.DERPMap)
256+
}
257+
return conn.UpdateNodes(update.Nodes, false)
253258
})
254259
conn.SetNodeCallback(sendNode)
255260
options.Logger.Debug(ctx, "serving coordinator")

enterprise/coderd/coderd.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,9 +403,9 @@ func (api *API) updateEntitlements(ctx context.Context) error {
403403
}
404404

405405
if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
406-
coordinator := agpltailnet.NewCoordinator(api.Logger)
406+
coordinator := agpltailnet.NewCoordinator(api.Logger, api.AGPL.DERPMap)
407407
if enabled {
408-
haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub)
408+
haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub, api.AGPL.DERPMap)
409409
if err != nil {
410410
api.Logger.Error(ctx, "unable to set up high availability coordinator", slog.Error(err))
411411
// If we try to setup the HA coordinator and it fails, nothing

enterprise/tailnet/coordinator.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/google/uuid"
1515
lru "github.com/hashicorp/golang-lru/v2"
1616
"golang.org/x/xerrors"
17+
"tailscale.com/tailcfg"
1718

1819
"cdr.dev/slog"
1920
"github.com/coder/coder/coderd/database"
@@ -22,7 +23,7 @@ import (
2223

2324
// NewCoordinator creates a new high availability coordinator
2425
// that uses PostgreSQL pubsub to exchange handshakes.
25-
func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) {
26+
func NewCoordinator(logger slog.Logger, pubsub database.Pubsub, derpMapFn func() *tailcfg.DERPMap) (agpl.Coordinator, error) {
2627
ctx, cancelFunc := context.WithCancel(context.Background())
2728

2829
nameCache, err := lru.New[uuid.UUID, string](512)
@@ -34,8 +35,9 @@ func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinato
3435
id: uuid.New(),
3536
log: logger,
3637
pubsub: pubsub,
37-
closeFunc: cancelFunc,
3838
close: make(chan struct{}),
39+
closeFunc: cancelFunc,
40+
derpMapFn: derpMapFn,
3941
nodes: map[uuid.UUID]*agpl.Node{},
4042
agentSockets: map[uuid.UUID]*agpl.TrackedConn{},
4143
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]*agpl.TrackedConn{},
@@ -57,6 +59,8 @@ type haCoordinator struct {
5759
close chan struct{}
5860
closeFunc context.CancelFunc
5961

62+
derpMapFn func() *tailcfg.DERPMap
63+
6064
// nodes maps agent and connection IDs their respective node.
6165
nodes map[uuid.UUID]*agpl.Node
6266
// agentSockets maps agent IDs to their open websocket.
@@ -109,7 +113,10 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID
109113
// node of the agent. This allows the connection to establish.
110114
node, ok := c.nodes[agent]
111115
if ok {
112-
err := tc.Enqueue([]*agpl.Node{node})
116+
err := tc.Enqueue(agpl.CoordinatorNodeUpdate{
117+
DERPMap: c.derpMapFn(),
118+
Nodes: []*agpl.Node{node},
119+
})
113120
c.mutex.Unlock()
114121
if err != nil {
115122
return xerrors.Errorf("enqueue node: %w", err)
@@ -177,7 +184,10 @@ func (c *haCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *js
177184
}
178185
return nil
179186
}
180-
err = agentSocket.Enqueue([]*agpl.Node{&node})
187+
err = agentSocket.Enqueue(agpl.CoordinatorNodeUpdate{
188+
DERPMap: c.derpMapFn(),
189+
Nodes: []*agpl.Node{&node},
190+
})
181191
c.mutex.Unlock()
182192
if err != nil {
183193
return xerrors.Errorf("enqueu nodes: %w", err)
@@ -212,7 +222,10 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) err
212222
// Publish all nodes on this instance that want to connect to this agent.
213223
nodes := c.nodesSubscribedToAgent(id)
214224
if len(nodes) > 0 {
215-
err := tc.Enqueue(nodes)
225+
err := tc.Enqueue(agpl.CoordinatorNodeUpdate{
226+
DERPMap: c.derpMapFn(),
227+
Nodes: nodes,
228+
})
216229
if err != nil {
217230
c.mutex.Unlock()
218231
return xerrors.Errorf("enqueue nodes: %w", err)
@@ -308,8 +321,12 @@ func (c *haCoordinator) handleAgentUpdate(id uuid.UUID, decoder *json.Decoder) (
308321
}
309322

310323
// Publish the new node to every listening socket.
324+
derpMap := c.derpMapFn()
311325
for _, connectionSocket := range connectionSockets {
312-
_ = connectionSocket.Enqueue([]*agpl.Node{&node})
326+
_ = connectionSocket.Enqueue(agpl.CoordinatorNodeUpdate{
327+
DERPMap: derpMap,
328+
Nodes: []*agpl.Node{&node},
329+
})
313330
}
314331
c.mutex.Unlock()
315332
return &node, nil
@@ -486,7 +503,10 @@ func (c *haCoordinator) handlePubsubMessage(ctx context.Context, message []byte)
486503
if err != nil {
487504
c.log.Error(ctx, "invalid nodes JSON", slog.F("id", agentID), slog.Error(err), slog.F("node", string(nodeJSON)))
488505
}
489-
err = agentSocket.Enqueue(nodes)
506+
err = agentSocket.Enqueue(agpl.CoordinatorNodeUpdate{
507+
DERPMap: c.derpMapFn(),
508+
Nodes: nodes,
509+
})
490510
if err != nil {
491511
c.log.Error(ctx, "send callmemaybe to agent", slog.Error(err))
492512
return

0 commit comments

Comments
 (0)