Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
eae4c3a
chore: add derpserver to proxy, add proxies to derpmap
deansheather Apr 27, 2023
ac99525
progress
deansheather May 1, 2023
4b68a0b
Merge branch 'main' into dean/proxy-derp-map
deansheather May 2, 2023
4ba7af6
progress
deansheather May 3, 2023
dcf072e
derp mesh probably working
deansheather May 4, 2023
2d2f1a3
deregister
deansheather May 4, 2023
28ae155
tests and various fixes
deansheather May 4, 2023
2baa362
Merge branch 'main' into dean/proxy-derp-map
deansheather May 4, 2023
5f5d4ff
more tests
deansheather May 5, 2023
5441dc8
merge main, remove proxy goingaway route
deansheather May 30, 2023
e4a3008
derp tests work
deansheather May 30, 2023
3caa692
Merge branch 'main' into dean/proxy-derp-map
deansheather May 30, 2023
404c3e4
update derp map on new connection
deansheather May 31, 2023
8544882
Merge branch 'main' into dean/proxy-derp-map
deansheather Jun 13, 2023
9b503fa
fixes
deansheather Jun 13, 2023
0e6d39a
tests for derp map changing
deansheather Jun 13, 2023
bb699fb
Merge branch 'main' into dean/proxy-derp-map
deansheather Jun 13, 2023
2943ac2
backwards compatible
deansheather Jun 20, 2023
f0fa578
other comments
deansheather Jun 20, 2023
9d90dc2
Merge branch 'main' into dean/proxy-derp-map
deansheather Jun 25, 2023
b405113
fixup! Merge branch 'main' into dean/proxy-derp-map
deansheather Jun 25, 2023
6a08a59
change derp map updates to be separate websocket
deansheather Jun 28, 2023
403eac5
Merge branch 'main' into dean/proxy-derp-map
deansheather Jul 17, 2023
d220266
Merge branch 'main' into dean/proxy-derp-map
deansheather Jul 24, 2023
9e658d6
fixup! Merge branch 'main' into dean/proxy-derp-map
deansheather Jul 24, 2023
67f2e5c
Working tests
deansheather Jul 25, 2023
3c96149
Merge branch 'main' into dean/proxy-derp-map
deansheather Jul 25, 2023
c26936a
fixup! Merge branch 'main' into dean/proxy-derp-map
deansheather Jul 25, 2023
e59de5a
fixup! Merge branch 'main' into dean/proxy-derp-map
deansheather Jul 25, 2023
2df067f
fixup! Merge branch 'main' into dean/proxy-derp-map
deansheather Jul 25, 2023
dfbfa96
Please
deansheather Jul 26, 2023
8223a35
fixup! Please
deansheather Jul 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update derp map on new connection
  • Loading branch information
deansheather committed May 31, 2023
commit 404c3e4d94ec1d3b49c400871b1ba132ecc7dd03
10 changes: 8 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,14 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
}
defer coordinator.Close()
a.logger.Info(ctx, "connected to coordination endpoint")
sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(nodes []*tailnet.Node) error {
return network.UpdateNodes(nodes, false)
sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(update tailnet.CoordinatorNodeUpdate) error {
// Check if we need to update our DERP map.
if !tailnet.CompareDERPMaps(network.DERPMap(), update.DERPMap) {
a.logger.Info(ctx, "updating DERP map on connection request due to changes", slog.F("old", network.DERPMap()), slog.F("new", update.DERPMap))
network.SetDERPMap(update.DERPMap)
}

return network.UpdateNodes(update.Nodes, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really know DERP, but if we have to update our DERP map, could the update also be from removing nodes, and therefore removing peers?

The UpdateNodes does a pure addition of new peers, it does not delete old peers unless we pass a ,true here.

Is this something we should worry about? If peers can go stale on their own and remove themselves, then we do not need to worry about this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a separate concern that Spike is thinking about. We currently do leak nodes but this is an existing problem

})
network.SetNodeCallback(sendNodes)
select {
Expand Down
24 changes: 16 additions & 8 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ func TestAgent_StartupScript(t *testing.T) {
DERPMap: &tailcfg.DERPMap{},
},
statsChan: make(chan *agentsdk.Stats),
coordinator: tailnet.NewCoordinator(logger),
coordinator: tailnet.NewCoordinator(logger, emptyDerpMapFn),
}
closer := agent.New(agent.Options{
Client: client,
Expand Down Expand Up @@ -930,7 +930,7 @@ func TestAgent_StartupScript(t *testing.T) {
return codersdk.ReadBodyAsError(res)
},
statsChan: make(chan *agentsdk.Stats),
coordinator: tailnet.NewCoordinator(logger),
coordinator: tailnet.NewCoordinator(logger, emptyDerpMapFn),
}
closer := agent.New(agent.Options{
Client: client,
Expand Down Expand Up @@ -1327,7 +1327,7 @@ func TestAgent_Lifecycle(t *testing.T) {
ShutdownScript: "echo " + expected,
},
statsChan: make(chan *agentsdk.Stats),
coordinator: tailnet.NewCoordinator(logger),
coordinator: tailnet.NewCoordinator(logger, emptyDerpMapFn),
}

fs := afero.NewMemMapFs()
Expand Down Expand Up @@ -1585,7 +1585,7 @@ func TestAgent_Reconnect(t *testing.T) {
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
// After the agent is disconnected from a coordinator, it's supposed
// to reconnect!
coordinator := tailnet.NewCoordinator(logger)
coordinator := tailnet.NewCoordinator(logger, emptyDerpMapFn)
defer coordinator.Close()

agentID := uuid.New()
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func TestAgent_Reconnect(t *testing.T) {
func TestAgent_WriteVSCodeConfigs(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator := tailnet.NewCoordinator(logger)
coordinator := tailnet.NewCoordinator(logger, emptyDerpMapFn)
defer coordinator.Close()

client := &client{
Expand Down Expand Up @@ -1737,7 +1737,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
if metadata.DERPMap == nil {
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)
}
coordinator := tailnet.NewCoordinator(logger)
coordinator := tailnet.NewCoordinator(logger, func() *tailcfg.DERPMap {
return metadata.DERPMap
})
t.Cleanup(func() {
_ = coordinator.Close()
})
Expand Down Expand Up @@ -1785,8 +1787,10 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
defer close(serveClientDone)
coordinator.ServeClient(serverConn, uuid.New(), agentID)
}()
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error {
return conn.UpdateNodes(node, false)
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(update tailnet.CoordinatorNodeUpdate) error {
// Don't need to worry about updating the DERP map since it'll never
// change in this test (as we aren't dealing with proxies etc.)
return conn.UpdateNodes(update.Nodes, false)
})
conn.SetNodeCallback(sendNode)
agentConn := &codersdk.WorkspaceAgentConn{
Expand Down Expand Up @@ -2095,3 +2099,7 @@ func verifyCollectedMetrics(t *testing.T, expected []agentsdk.AgentMetric, actua
}
return true
}

func emptyDerpMapFn() *tailcfg.DERPMap {
return &tailcfg.DERPMap{}
}
6 changes: 3 additions & 3 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,6 @@ func New(options *Options) *API {
if options.PrometheusRegistry == nil {
options.PrometheusRegistry = prometheus.NewRegistry()
}
if options.TailnetCoordinator == nil {
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger)
}
if options.DERPServer == nil {
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp")))
}
Expand Down Expand Up @@ -314,6 +311,9 @@ func New(options *Options) *API {
Experiments: experiments,
healthCheckGroup: &singleflight.Group[string, *healthcheck.Report]{},
}
if options.TailnetCoordinator == nil {
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger, api.DERPMap)
}
if options.HealthcheckFunc == nil {
options.HealthcheckFunc = func(ctx context.Context, apiKey string) (*healthcheck.Report, error) {
return healthcheck.Run(ctx, &healthcheck.ReportOptions{
Expand Down
6 changes: 3 additions & 3 deletions coderd/prometheusmetrics/prometheusmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,13 @@ func TestAgents(t *testing.T) {
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)

// given
coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug))
coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{}
coordinatorPtr.Store(&coordinator)
derpMap := tailnettest.RunDERPAndSTUN(t)
derpMapFn := func() *tailcfg.DERPMap {
return derpMap
}
coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug), derpMapFn)
coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{}
coordinatorPtr.Store(&coordinator)
agentInactiveDisconnectTimeout := 1 * time.Hour // don't need to focus on this value in tests
registry := prometheus.NewRegistry()

Expand Down
9 changes: 7 additions & 2 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,13 @@ func (api *API) dialWorkspaceAgentTailnet(agentID uuid.UUID) (*codersdk.Workspac
return left
})

sendNodes, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error {
err = conn.UpdateNodes(node, true)
sendNodes, _ := tailnet.ServeCoordinator(clientConn, func(update tailnet.CoordinatorNodeUpdate) error {
// Check if we need to update the DERP map used by the connection.
if !tailnet.CompareDERPMaps(conn.DERPMap(), update.DERPMap) {
conn.SetDERPMap(update.DERPMap)
}

err = conn.UpdateNodes(update.Nodes, true)
if err != nil {
return xerrors.Errorf("update nodes: %w", err)
}
Expand Down
11 changes: 8 additions & 3 deletions coderd/wsconncache/wsconncache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
"tailscale.com/tailcfg"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
Expand Down Expand Up @@ -159,7 +160,9 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
manifest.DERPMap = tailnettest.RunDERPAndSTUN(t)

coordinator := tailnet.NewCoordinator(logger)
coordinator := tailnet.NewCoordinator(logger, func() *tailcfg.DERPMap {
return manifest.DERPMap
})
t.Cleanup(func() {
_ = coordinator.Close()
})
Expand Down Expand Up @@ -190,8 +193,10 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
_ = conn.Close()
})
go coordinator.ServeClient(serverConn, uuid.New(), agentID)
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error {
return conn.UpdateNodes(node, false)
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(update tailnet.CoordinatorNodeUpdate) error {
// Don't need to worry about updating the DERP map since it'll never
// change in this test (as we aren't dealing with proxies etc.)
return conn.UpdateNodes(update.Nodes, false)
})
conn.SetNodeCallback(sendNode)
agentConn := &codersdk.WorkspaceAgentConn{
Expand Down
9 changes: 7 additions & 2 deletions codersdk/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,13 @@ func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, opti
options.Logger.Debug(ctx, "failed to dial", slog.Error(err))
continue
}
sendNode, errChan := tailnet.ServeCoordinator(websocket.NetConn(ctx, ws, websocket.MessageBinary), func(node []*tailnet.Node) error {
return conn.UpdateNodes(node, false)
sendNode, errChan := tailnet.ServeCoordinator(websocket.NetConn(ctx, ws, websocket.MessageBinary), func(update tailnet.CoordinatorNodeUpdate) error {
// Check if we need to update the DERP map used by the connection.
if !tailnet.CompareDERPMaps(conn.DERPMap(), update.DERPMap) {
options.Logger.Debug(ctx, "updating DERP map on connection request due to changes", slog.F("old", conn.DERPMap()), slog.F("new", update.DERPMap))
conn.SetDERPMap(update.DERPMap)
}
return conn.UpdateNodes(update.Nodes, false)
})
conn.SetNodeCallback(sendNode)
options.Logger.Debug(ctx, "serving coordinator")
Expand Down
4 changes: 2 additions & 2 deletions enterprise/coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ func (api *API) updateEntitlements(ctx context.Context) error {
}

if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
coordinator := agpltailnet.NewCoordinator(api.Logger)
coordinator := agpltailnet.NewCoordinator(api.Logger, api.AGPL.DERPMap)
if enabled {
haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub)
haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub, api.AGPL.DERPMap)
if err != nil {
api.Logger.Error(ctx, "unable to set up high availability coordinator", slog.Error(err))
// If we try to setup the HA coordinator and it fails, nothing
Expand Down
34 changes: 27 additions & 7 deletions enterprise/tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/xerrors"
"tailscale.com/tailcfg"

"cdr.dev/slog"
"github.com/coder/coder/coderd/database"
Expand All @@ -22,7 +23,7 @@ import (

// NewCoordinator creates a new high availability coordinator
// that uses PostgreSQL pubsub to exchange handshakes.
func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) {
func NewCoordinator(logger slog.Logger, pubsub database.Pubsub, derpMapFn func() *tailcfg.DERPMap) (agpl.Coordinator, error) {
ctx, cancelFunc := context.WithCancel(context.Background())

nameCache, err := lru.New[uuid.UUID, string](512)
Expand All @@ -34,8 +35,9 @@ func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinato
id: uuid.New(),
log: logger,
pubsub: pubsub,
closeFunc: cancelFunc,
close: make(chan struct{}),
closeFunc: cancelFunc,
derpMapFn: derpMapFn,
nodes: map[uuid.UUID]*agpl.Node{},
agentSockets: map[uuid.UUID]*agpl.TrackedConn{},
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]*agpl.TrackedConn{},
Expand All @@ -57,6 +59,8 @@ type haCoordinator struct {
close chan struct{}
closeFunc context.CancelFunc

derpMapFn func() *tailcfg.DERPMap

// nodes maps agent and connection IDs their respective node.
nodes map[uuid.UUID]*agpl.Node
// agentSockets maps agent IDs to their open websocket.
Expand Down Expand Up @@ -109,7 +113,10 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID
// node of the agent. This allows the connection to establish.
node, ok := c.nodes[agent]
if ok {
err := tc.Enqueue([]*agpl.Node{node})
err := tc.Enqueue(agpl.CoordinatorNodeUpdate{
DERPMap: c.derpMapFn(),
Nodes: []*agpl.Node{node},
})
c.mutex.Unlock()
if err != nil {
return xerrors.Errorf("enqueue node: %w", err)
Expand Down Expand Up @@ -177,7 +184,10 @@ func (c *haCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *js
}
return nil
}
err = agentSocket.Enqueue([]*agpl.Node{&node})
err = agentSocket.Enqueue(agpl.CoordinatorNodeUpdate{
DERPMap: c.derpMapFn(),
Nodes: []*agpl.Node{&node},
})
c.mutex.Unlock()
if err != nil {
return xerrors.Errorf("enqueu nodes: %w", err)
Expand Down Expand Up @@ -212,7 +222,10 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) err
// Publish all nodes on this instance that want to connect to this agent.
nodes := c.nodesSubscribedToAgent(id)
if len(nodes) > 0 {
err := tc.Enqueue(nodes)
err := tc.Enqueue(agpl.CoordinatorNodeUpdate{
DERPMap: c.derpMapFn(),
Nodes: nodes,
})
if err != nil {
c.mutex.Unlock()
return xerrors.Errorf("enqueue nodes: %w", err)
Expand Down Expand Up @@ -308,8 +321,12 @@ func (c *haCoordinator) handleAgentUpdate(id uuid.UUID, decoder *json.Decoder) (
}

// Publish the new node to every listening socket.
derpMap := c.derpMapFn()
for _, connectionSocket := range connectionSockets {
_ = connectionSocket.Enqueue([]*agpl.Node{&node})
_ = connectionSocket.Enqueue(agpl.CoordinatorNodeUpdate{
DERPMap: derpMap,
Nodes: []*agpl.Node{&node},
})
}
c.mutex.Unlock()
return &node, nil
Expand Down Expand Up @@ -486,7 +503,10 @@ func (c *haCoordinator) handlePubsubMessage(ctx context.Context, message []byte)
if err != nil {
c.log.Error(ctx, "invalid nodes JSON", slog.F("id", agentID), slog.Error(err), slog.F("node", string(nodeJSON)))
}
err = agentSocket.Enqueue(nodes)
err = agentSocket.Enqueue(agpl.CoordinatorNodeUpdate{
DERPMap: c.derpMapFn(),
Nodes: nodes,
})
if err != nil {
c.log.Error(ctx, "send callmemaybe to agent", slog.Error(err))
return
Expand Down
Loading