From 75b2f0d87a859336a6dbc4926a982efe1567f2ee Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 23 Feb 2023 11:43:26 +0000 Subject: [PATCH 1/3] fix(tailnet): Skip nodes without DERP, avoid use of RemoveAllPeers --- agent/agent.go | 4 +++- agent/agent_test.go | 15 +++++++++++--- cli/speedtest_test.go | 3 ++- cli/ssh_test.go | 2 ++ coderd/coderd_test.go | 6 +++--- coderd/workspaceagents.go | 28 +++++++++++++------------- coderd/wsconncache/wsconncache_test.go | 13 ++++++++++-- codersdk/workspaceagents.go | 22 +++++++++++++------- tailnet/conn.go | 26 ++++++++++++++++-------- tailnet/conn_test.go | 8 ++++---- 10 files changed, 84 insertions(+), 43 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a23a781537c3a..a04bb7c6b80b9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -601,7 +601,9 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error } defer coordinator.Close() a.logger.Info(ctx, "connected to coordination server") - sendNodes, errChan := tailnet.ServeCoordinator(coordinator, network.UpdateNodes) + sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(nodes []*tailnet.Node) error { + return network.UpdateNodes(nodes, false) + }) network.SetNodeCallback(sendNodes) select { case <-ctx.Done(): diff --git a/agent/agent_test.go b/agent/agent_test.go index 815833cf22764..e8076c5f5b769 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1179,12 +1179,21 @@ func setupAgent(t *testing.T, metadata agentsdk.Metadata, ptyTimeout time.Durati coordinator.ServeClient(serverConn, uuid.New(), agentID) }() sendNode, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error { - return conn.UpdateNodes(node) + return conn.UpdateNodes(node, false) }) conn.SetNodeCallback(sendNode) - return &codersdk.WorkspaceAgentConn{ + agentConn := &codersdk.WorkspaceAgentConn{ Conn: conn, - }, c, statsCh, fs + } + t.Cleanup(func() { + _ = agentConn.Close() + }) + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium) + defer cancel() + if !agentConn.AwaitReachable(ctx) { + t.Fatal("agent not reachable") + } + return agentConn, c, statsCh, fs } var dialTestPayload = []byte("dean-was-here123") diff --git a/cli/speedtest_test.go b/cli/speedtest_test.go index e31f8f79d08ff..cdb70e97b558f 100644 --- a/cli/speedtest_test.go +++ b/cli/speedtest_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/agent" "github.com/coder/coder/cli/clitest" @@ -28,7 +29,7 @@ func TestSpeedtest(t *testing.T) { agentClient.SetSessionToken(agentToken) agentCloser := agent.New(agent.Options{ Client: agentClient, - Logger: slogtest.Make(t, nil).Named("agent"), + Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), }) defer agentCloser.Close() coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID) diff --git a/cli/ssh_test.go b/cli/ssh_test.go index a943de616a34c..1fb13c0593d87 100644 --- a/cli/ssh_test.go +++ b/cli/ssh_test.go @@ -24,6 +24,7 @@ import ( "golang.org/x/crypto/ssh" gosshagent "golang.org/x/crypto/ssh/agent" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/agent" @@ -47,6 +48,7 @@ func setupWorkspaceForAgent(t *testing.T, mutate func([]*proto.Agent) []*proto.A } } client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) + client.Logger = slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug) user := coderdtest.CreateFirstUser(t, client) agentToken := uuid.NewString() version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ diff --git a/coderd/coderd_test.go b/coderd/coderd_test.go index f9501d384a390..4772fb5a51686 100644 --- a/coderd/coderd_test.go +++ b/coderd/coderd_test.go @@ -80,16 +80,16 @@ func TestDERP(t *testing.T) { }) require.NoError(t, err) - w2Ready := make(chan struct{}, 1) + w2Ready := make(chan struct{}) w2ReadyOnce := sync.Once{} w1.SetNodeCallback(func(node *tailnet.Node) { - w2.UpdateNodes([]*tailnet.Node{node}) + w2.UpdateNodes([]*tailnet.Node{node}, false) w2ReadyOnce.Do(func() { close(w2Ready) }) }) w2.SetNodeCallback(func(node *tailnet.Node) { - w1.UpdateNodes([]*tailnet.Node{node}) + w1.UpdateNodes([]*tailnet.Node{node}, false) }) conn := make(chan struct{}) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 3c6a4d432420e..86fae2842653e 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -453,32 +453,32 @@ func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (* } sendNodes, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error { - err := conn.RemoveAllPeers() - if err != nil { - return xerrors.Errorf("remove all peers: %w", err) - } - - err = conn.UpdateNodes(node) + err = conn.UpdateNodes(node, true) if err != nil { return xerrors.Errorf("update nodes: %w", err) } return nil }) conn.SetNodeCallback(sendNodes) + agentConn := &codersdk.WorkspaceAgentConn{ + Conn: conn, + CloseFunc: func() { + _ = clientConn.Close() + _ = serverConn.Close() + }, + } go func() { err := (*api.TailnetCoordinator.Load()).ServeClient(serverConn, uuid.New(), agentID) if err != nil { api.Logger.Warn(r.Context(), "tailnet coordinator client error", slog.Error(err)) - _ = conn.Close() + _ = agentConn.Close() } }() - return &codersdk.WorkspaceAgentConn{ - Conn: conn, - CloseFunc: func() { - _ = clientConn.Close() - _ = serverConn.Close() - }, - }, nil + if !agentConn.AwaitReachable(context.TODO()) { + _ = agentConn.Close() + return nil, xerrors.Errorf("agent not reachable") + } + return agentConn, nil } // @Summary Get connection info for workspace agent diff --git a/coderd/wsconncache/wsconncache_test.go b/coderd/wsconncache/wsconncache_test.go index fd1b25a836608..b74b142a84088 100644 --- a/coderd/wsconncache/wsconncache_test.go +++ b/coderd/wsconncache/wsconncache_test.go @@ -191,12 +191,21 @@ func setupAgent(t *testing.T, metadata agentsdk.Metadata, ptyTimeout time.Durati }) go coordinator.ServeClient(serverConn, uuid.New(), agentID) sendNode, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error { - return conn.UpdateNodes(node) + return conn.UpdateNodes(node, false) }) conn.SetNodeCallback(sendNode) - return &codersdk.WorkspaceAgentConn{ + agentConn := &codersdk.WorkspaceAgentConn{ Conn: conn, } + t.Cleanup(func() { + _ = agentConn.Close() + }) + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium) + defer cancel() + if !agentConn.AwaitReachable(ctx) { + t.Fatal("agent not reachable") + } + return agentConn } type client struct { diff --git a/codersdk/workspaceagents.go b/codersdk/workspaceagents.go index b3940e154abc5..9364783d29953 100644 --- a/codersdk/workspaceagents.go +++ b/codersdk/workspaceagents.go @@ -100,7 +100,7 @@ type DialWorkspaceAgentOptions struct { BlockEndpoints bool } -func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, options *DialWorkspaceAgentOptions) (*WorkspaceAgentConn, error) { +func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, options *DialWorkspaceAgentOptions) (agentConn *WorkspaceAgentConn, err error) { if options == nil { options = &DialWorkspaceAgentOptions{} } @@ -128,6 +128,11 @@ func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, opti if err != nil { return nil, xerrors.Errorf("create tailnet: %w", err) } + defer func() { + if err != nil { + _ = conn.Close() + } + }() coordinateURL, err := c.URL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/coordinate", agentID)) if err != nil { @@ -145,7 +150,12 @@ func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, opti Jar: jar, Transport: c.HTTPClient.Transport, } - ctx, cancelFunc := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) + defer func() { + if err != nil { + cancel() + } + }() closed := make(chan struct{}) first := make(chan error) go func() { @@ -175,7 +185,7 @@ func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, opti continue } sendNode, errChan := tailnet.ServeCoordinator(websocket.NetConn(ctx, ws, websocket.MessageBinary), func(node []*tailnet.Node) error { - return conn.UpdateNodes(node) + return conn.UpdateNodes(node, false) }) conn.SetNodeCallback(sendNode) options.Logger.Debug(ctx, "serving coordinator") @@ -194,15 +204,13 @@ func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, opti }() err = <-first if err != nil { - cancelFunc() - _ = conn.Close() return nil, err } - agentConn := &WorkspaceAgentConn{ + agentConn = &WorkspaceAgentConn{ Conn: conn, CloseFunc: func() { - cancelFunc() + cancel() <-closed }, } diff --git a/tailnet/conn.go b/tailnet/conn.go index 9846d4096cde9..ba76bd7f74bac 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -130,7 +130,7 @@ func NewConn(options *Options) (conn *Conn, err error) { }() dialer := &tsdial.Dialer{ - Logf: Logger(options.Logger), + Logf: Logger(options.Logger.Named("tsdial")), } wireguardEngine, err := wgengine.NewUserspaceEngine(Logger(options.Logger.Named("wgengine")), wgengine.Config{ LinkMonitor: wireguardMonitor, @@ -179,6 +179,7 @@ func NewConn(options *Options) (conn *Conn, err error) { wireguardEngine = wgengine.NewWatchdog(wireguardEngine) wireguardEngine.SetDERPMap(options.DERPMap) netMapCopy := *netMap + options.Logger.Debug(context.Background(), "updating network map", slog.F("net_map", netMapCopy)) wireguardEngine.SetNetworkMap(&netMapCopy) localIPSet := netipx.IPSetBuilder{} @@ -329,9 +330,11 @@ func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) { c.mutex.Lock() defer c.mutex.Unlock() c.logger.Debug(context.Background(), "updating derp map", slog.F("derp_map", derpMap)) - c.netMap.DERPMap = derpMap - c.wireguardEngine.SetNetworkMap(c.netMap) c.wireguardEngine.SetDERPMap(derpMap) + c.netMap.DERPMap = derpMap + netMapCopy := *c.netMap + c.logger.Debug(context.Background(), "updating network map", slog.F("net_map", netMapCopy)) + c.wireguardEngine.SetNetworkMap(&netMapCopy) } func (c *Conn) RemoveAllPeers() error { @@ -341,6 +344,7 @@ func (c *Conn) RemoveAllPeers() error { c.netMap.Peers = []*tailcfg.Node{} c.peerMap = map[tailcfg.NodeID]*tailcfg.Node{} netMapCopy := *c.netMap + c.logger.Debug(context.Background(), "updating network map", slog.F("net_map", netMapCopy)) c.wireguardEngine.SetNetworkMap(&netMapCopy) cfg, err := nmcfg.WGCfg(c.netMap, Logger(c.logger.Named("wgconfig")), netmap.AllowSingleHosts, "") if err != nil { @@ -361,10 +365,14 @@ func (c *Conn) RemoveAllPeers() error { // UpdateNodes connects with a set of peers. This can be constantly updated, // and peers will continually be reconnected as necessary. -func (c *Conn) UpdateNodes(nodes []*Node) error { +func (c *Conn) UpdateNodes(nodes []*Node, replace bool) error { c.mutex.Lock() defer c.mutex.Unlock() status := c.Status() + if replace { + c.netMap.Peers = []*tailcfg.Node{} + c.peerMap = map[tailcfg.NodeID]*tailcfg.Node{} + } for _, peer := range c.netMap.Peers { peerStatus, ok := status.Peer[peer.Key] if !ok { @@ -384,6 +392,11 @@ func (c *Conn) UpdateNodes(nodes []*Node) error { delete(c.peerMap, peer.ID) } for _, node := range nodes { + // If no preferred DERP is provided, we can't reach the node. + if node.PreferredDERP == 0 { + c.logger.Debug(context.Background(), "no preferred DERP, skipping node", slog.F("node", node)) + continue + } c.logger.Debug(context.Background(), "adding node", slog.F("node", node)) peerStatus, ok := status.Peer[node.Key] @@ -402,10 +415,6 @@ func (c *Conn) UpdateNodes(nodes []*Node) error { // reason. TODO: @kylecarbs debug this! KeepAlive: ok && peerStatus.Active, } - // If no preferred DERP is provided, don't set an IP! - if node.PreferredDERP == 0 { - peerNode.DERP = "" - } if c.blockEndpoints { peerNode.Endpoints = nil } @@ -416,6 +425,7 @@ func (c *Conn) UpdateNodes(nodes []*Node) error { c.netMap.Peers = append(c.netMap.Peers, peer.Clone()) } netMapCopy := *c.netMap + c.logger.Debug(context.Background(), "updating network map", slog.F("net_map", netMapCopy)) c.wireguardEngine.SetNetworkMap(&netMapCopy) cfg, err := nmcfg.WGCfg(c.netMap, Logger(c.logger.Named("wgconfig")), netmap.AllowSingleHosts, "") if err != nil { diff --git a/tailnet/conn_test.go b/tailnet/conn_test.go index a967a0772cdd8..f181aff022892 100644 --- a/tailnet/conn_test.go +++ b/tailnet/conn_test.go @@ -55,12 +55,12 @@ func TestTailnet(t *testing.T) { _ = w2.Close() }) w1.SetNodeCallback(func(node *tailnet.Node) { - err := w2.UpdateNodes([]*tailnet.Node{node}) - require.NoError(t, err) + err := w2.UpdateNodes([]*tailnet.Node{node}, false) + assert.NoError(t, err) }) w2.SetNodeCallback(func(node *tailnet.Node) { - err := w1.UpdateNodes([]*tailnet.Node{node}) - require.NoError(t, err) + err := w1.UpdateNodes([]*tailnet.Node{node}, false) + assert.NoError(t, err) }) require.True(t, w2.AwaitReachable(context.Background(), w1IP)) conn := make(chan struct{}) From 7131afeaa3e2e218f526a9f2e03517701490d81a Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 23 Feb 2023 11:57:50 +0000 Subject: [PATCH 2/3] Update docs, fix lint --- tailnet/conn.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index ba76bd7f74bac..e2e55a971c589 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -364,12 +364,15 @@ func (c *Conn) RemoveAllPeers() error { } // UpdateNodes connects with a set of peers. This can be constantly updated, -// and peers will continually be reconnected as necessary. -func (c *Conn) UpdateNodes(nodes []*Node, replace bool) error { +// and peers will continually be reconnected as necessary. If replacePeers is +// true, all peers will be removed before adding the new ones. +// +//nolint:revive // Complains about replacePeers. +func (c *Conn) UpdateNodes(nodes []*Node, replacePeers bool) error { c.mutex.Lock() defer c.mutex.Unlock() status := c.Status() - if replace { + if replacePeers { c.netMap.Peers = []*tailcfg.Node{} c.peerMap = map[tailcfg.NodeID]*tailcfg.Node{} } From a3f3776e558cf570fe0402844666e0ec15db4856 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 23 Feb 2023 12:05:21 +0000 Subject: [PATCH 3/3] Remove todo context --- coderd/workspaceagents.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 86fae2842653e..649fb37f0664d 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -404,6 +404,7 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req } func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*codersdk.WorkspaceAgentConn, error) { + ctx := r.Context() clientConn, serverConn := net.Pipe() derpMap := api.DERPMap.Clone() @@ -474,7 +475,7 @@ func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (* _ = agentConn.Close() } }() - if !agentConn.AwaitReachable(context.TODO()) { + if !agentConn.AwaitReachable(ctx) { _ = agentConn.Close() return nil, xerrors.Errorf("agent not reachable") }