diff --git a/codersdk/agentconn.go b/codersdk/agentconn.go index 3a5dab5158a70..f6c4da47b166c 100644 --- a/codersdk/agentconn.go +++ b/codersdk/agentconn.go @@ -46,7 +46,7 @@ type AgentConn struct { func (c *AgentConn) Ping() (time.Duration, error) { errCh := make(chan error, 1) durCh := make(chan time.Duration, 1) - c.Conn.Ping(TailnetIP, tailcfg.PingICMP, func(pr *ipnstate.PingResult) { + c.Conn.Ping(TailnetIP, tailcfg.PingDisco, func(pr *ipnstate.PingResult) { if pr.Err != "" { errCh <- xerrors.New(pr.Err) return diff --git a/tailnet/conn.go b/tailnet/conn.go index 61eb6db17ba87..1b454d6346b97 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -182,6 +182,7 @@ func NewConn(options *Options) (*Conn, error) { magicConn: magicConn, dialer: dialer, listeners: map[listenKey]*listener{}, + peerMap: map[tailcfg.NodeID]*tailcfg.Node{}, tunDevice: tunDevice, netMap: netMap, netStack: netStack, @@ -192,10 +193,17 @@ func NewConn(options *Options) (*Conn, error) { wireguardEngine: wireguardEngine, } wireguardEngine.SetStatusCallback(func(s *wgengine.Status, err error) { + server.logger.Info(context.Background(), "wireguard status", slog.F("status", s), slog.F("err", err)) if err != nil { return } server.lastMutex.Lock() + if s.AsOf.Before(server.lastStatus) { + // Don't process outdated status! + server.lastMutex.Unlock() + return + } + server.lastStatus = s.AsOf server.lastEndpoints = make([]string, 0, len(s.LocalAddrs)) for _, addr := range s.LocalAddrs { server.lastEndpoints = append(server.lastEndpoints, addr.Addr.String()) @@ -240,6 +248,7 @@ type Conn struct { dialer *tsdial.Dialer tunDevice *tstun.Wrapper + peerMap map[tailcfg.NodeID]*tailcfg.Node netMap *netmap.NetworkMap netStack *netstack.Impl magicConn *magicsock.Conn @@ -254,6 +263,7 @@ type Conn struct { nodeChanged bool // It's only possible to store these values via status functions, // so the values must be stored for retrieval later on. + lastStatus time.Time lastEndpoints []string lastPreferredDERP int lastDERPLatency map[string]float64 @@ -282,8 +292,9 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) { func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) { c.mutex.Lock() defer c.mutex.Unlock() - c.netMap.DERPMap = derpMap c.logger.Debug(context.Background(), "updating derp map", slog.F("derp_map", derpMap)) + c.netMap.DERPMap = derpMap + c.wireguardEngine.SetNetworkMap(c.netMap) c.wireguardEngine.SetDERPMap(derpMap) } @@ -292,18 +303,24 @@ func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) { func (c *Conn) UpdateNodes(nodes []*Node) error { c.mutex.Lock() defer c.mutex.Unlock() - peerMap := map[tailcfg.NodeID]*tailcfg.Node{} status := c.Status() for _, peer := range c.netMap.Peers { - if peerStatus, ok := status.Peer[peer.Key]; ok { - // Clear out inactive connections! - // If a connection hasn't been active for a minute post creation, we assume it's dead. - if !peerStatus.Active && peer.Created.Before(time.Now().Add(-time.Minute)) { - c.logger.Debug(context.Background(), "clearing peer", slog.F("peerStatus", peerStatus)) - continue - } + peerStatus, ok := status.Peer[peer.Key] + if !ok { + continue } - peerMap[peer.ID] = peer + // If this peer was added in the last 5 minutes, assume it + // could still be active. + if time.Since(peer.Created) < 5*time.Minute { + continue + } + // We double-check that it's safe to remove by ensuring no + // handshake has been sent in the past 5 minutes as well. Connections that + // are actively exchanging IP traffic will handshake every 2 minutes. + if time.Since(peerStatus.LastHandshake) < 5*time.Minute { + continue + } + delete(c.peerMap, peer.ID) } for _, node := range nodes { peerStatus, ok := status.Peer[node.Key] @@ -322,18 +339,11 @@ func (c *Conn) UpdateNodes(nodes []*Node) error { // reason. TODO: @kylecarbs debug this! KeepAlive: ok && peerStatus.Active, } - existingNode, ok := peerMap[node.ID] - if ok { - peerNode.Created = existingNode.Created - c.logger.Debug(context.Background(), "updating peer", slog.F("peer", peerNode)) - } else { - c.logger.Debug(context.Background(), "adding peer", slog.F("peer", peerNode)) - } - peerMap[node.ID] = peerNode + c.peerMap[node.ID] = peerNode } - c.netMap.Peers = make([]*tailcfg.Node, 0, len(peerMap)) - for _, peer := range peerMap { - c.netMap.Peers = append(c.netMap.Peers, peer) + c.netMap.Peers = make([]*tailcfg.Node, 0, len(c.peerMap)) + for _, peer := range c.peerMap { + c.netMap.Peers = append(c.netMap.Peers, peer.Clone()) } netMapCopy := *c.netMap c.wireguardEngine.SetNetworkMap(&netMapCopy) @@ -425,6 +435,7 @@ func (c *Conn) sendNode() { } c.nodeSending = true go func() { + c.logger.Info(context.Background(), "sending node", slog.F("node", node)) nodeCallback(node) c.lastMutex.Lock() c.nodeSending = false diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index 95209d56559ff..d6e8b6bbb6e59 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -164,6 +164,7 @@ func (c *Coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) c.mutex.Unlock() continue } + c.mutex.Unlock() // Write the new node from this client to the actively // connected agent. data, err := json.Marshal([]*Node{&node}) @@ -173,14 +174,11 @@ func (c *Coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) } _, err = agentSocket.Write(data) if errors.Is(err, io.EOF) { - c.mutex.Unlock() return nil } if err != nil { - c.mutex.Unlock() return xerrors.Errorf("write json: %w", err) } - c.mutex.Unlock() } } @@ -259,7 +257,7 @@ func (c *Coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error { wg.Done() }() } - wg.Wait() c.mutex.Unlock() + wg.Wait() } }