Skip to content

fix: Ensure tailnet coordinations are sent orderly #4198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: Ensure tailnet coordinations are sent orderly
  • Loading branch information
kylecarbs committed Sep 26, 2022
commit 88757ae0c4a8448084f347632bc54c917db52877
2 changes: 1 addition & 1 deletion codersdk/agentconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type AgentConn struct {
func (c *AgentConn) Ping() (time.Duration, error) {
errCh := make(chan error, 1)
durCh := make(chan time.Duration, 1)
c.Conn.Ping(TailnetIP, tailcfg.PingICMP, func(pr *ipnstate.PingResult) {
c.Conn.Ping(TailnetIP, tailcfg.PingDisco, func(pr *ipnstate.PingResult) {
if pr.Err != "" {
errCh <- xerrors.New(pr.Err)
return
Expand Down
53 changes: 32 additions & 21 deletions tailnet/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func NewConn(options *Options) (*Conn, error) {
magicConn: magicConn,
dialer: dialer,
listeners: map[listenKey]*listener{},
peerMap: map[tailcfg.NodeID]*tailcfg.Node{},
tunDevice: tunDevice,
netMap: netMap,
netStack: netStack,
Expand All @@ -192,10 +193,17 @@ func NewConn(options *Options) (*Conn, error) {
wireguardEngine: wireguardEngine,
}
wireguardEngine.SetStatusCallback(func(s *wgengine.Status, err error) {
server.logger.Info(context.Background(), "wireguard status", slog.F("status", s), slog.F("err", err))
if err != nil {
return
}
server.lastMutex.Lock()
if s.AsOf.Before(server.lastStatus) {
// Don't process outdated status!
server.lastMutex.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could just defer the Unlock here instead of calling it twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, because sendNode can take some time

return
}
server.lastStatus = s.AsOf
server.lastEndpoints = make([]string, 0, len(s.LocalAddrs))
for _, addr := range s.LocalAddrs {
server.lastEndpoints = append(server.lastEndpoints, addr.Addr.String())
Expand Down Expand Up @@ -240,6 +248,7 @@ type Conn struct {

dialer *tsdial.Dialer
tunDevice *tstun.Wrapper
peerMap map[tailcfg.NodeID]*tailcfg.Node
netMap *netmap.NetworkMap
netStack *netstack.Impl
magicConn *magicsock.Conn
Expand All @@ -254,6 +263,7 @@ type Conn struct {
nodeChanged bool
// It's only possible to store these values via status functions,
// so the values must be stored for retrieval later on.
lastStatus time.Time
lastEndpoints []string
lastPreferredDERP int
lastDERPLatency map[string]float64
Expand Down Expand Up @@ -282,8 +292,9 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {
func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.netMap.DERPMap = derpMap
c.logger.Debug(context.Background(), "updating derp map", slog.F("derp_map", derpMap))
c.netMap.DERPMap = derpMap
c.wireguardEngine.SetNetworkMap(c.netMap)
c.wireguardEngine.SetDERPMap(derpMap)
}

Expand All @@ -292,18 +303,24 @@ func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
func (c *Conn) UpdateNodes(nodes []*Node) error {
c.mutex.Lock()
defer c.mutex.Unlock()
peerMap := map[tailcfg.NodeID]*tailcfg.Node{}
status := c.Status()
for _, peer := range c.netMap.Peers {
if peerStatus, ok := status.Peer[peer.Key]; ok {
// Clear out inactive connections!
// If a connection hasn't been active for a minute post creation, we assume it's dead.
if !peerStatus.Active && peer.Created.Before(time.Now().Add(-time.Minute)) {
c.logger.Debug(context.Background(), "clearing peer", slog.F("peerStatus", peerStatus))
continue
}
peerStatus, ok := status.Peer[peer.Key]
if !ok {
continue
}
peerMap[peer.ID] = peer
// If this peer was added in the last 5 minutes, assume it
// could still be active.
if time.Since(peer.Created) < 5*time.Minute {
continue
}
// We double-check that it's safe to remove by ensuring no
// handshake has been sent in the past 5 minutes as well. Connections that
// are actively exchanging IP traffic will handshake every 2 minutes.
if time.Since(peerStatus.LastHandshake) < 5*time.Minute {
continue
}
delete(c.peerMap, peer.ID)
}
for _, node := range nodes {
peerStatus, ok := status.Peer[node.Key]
Expand All @@ -322,18 +339,11 @@ func (c *Conn) UpdateNodes(nodes []*Node) error {
// reason. TODO: @kylecarbs debug this!
KeepAlive: ok && peerStatus.Active,
}
existingNode, ok := peerMap[node.ID]
if ok {
peerNode.Created = existingNode.Created
c.logger.Debug(context.Background(), "updating peer", slog.F("peer", peerNode))
} else {
c.logger.Debug(context.Background(), "adding peer", slog.F("peer", peerNode))
}
peerMap[node.ID] = peerNode
c.peerMap[node.ID] = peerNode
}
c.netMap.Peers = make([]*tailcfg.Node, 0, len(peerMap))
for _, peer := range peerMap {
c.netMap.Peers = append(c.netMap.Peers, peer)
c.netMap.Peers = make([]*tailcfg.Node, 0, len(c.peerMap))
for _, peer := range c.peerMap {
c.netMap.Peers = append(c.netMap.Peers, peer.Clone())
}
netMapCopy := *c.netMap
c.wireguardEngine.SetNetworkMap(&netMapCopy)
Expand Down Expand Up @@ -425,6 +435,7 @@ func (c *Conn) sendNode() {
}
c.nodeSending = true
go func() {
c.logger.Info(context.Background(), "sending node", slog.F("node", node))
nodeCallback(node)
c.lastMutex.Lock()
c.nodeSending = false
Expand Down
6 changes: 2 additions & 4 deletions tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (c *Coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
c.mutex.Unlock()
continue
}
c.mutex.Unlock()
// Write the new node from this client to the actively
// connected agent.
data, err := json.Marshal([]*Node{&node})
Expand All @@ -173,14 +174,11 @@ func (c *Coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
}
_, err = agentSocket.Write(data)
if errors.Is(err, io.EOF) {
c.mutex.Unlock()
return nil
}
if err != nil {
c.mutex.Unlock()
return xerrors.Errorf("write json: %w", err)
}
c.mutex.Unlock()
}
}

Expand Down Expand Up @@ -259,7 +257,7 @@ func (c *Coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
wg.Done()
}()
}
wg.Wait()
c.mutex.Unlock()
wg.Wait()
}
}