Skip to content

chore: add support for peer updates to tailnet.configMaps #11487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ require (

require go.uber.org/mock v0.4.0

require github.com/benbjohnson/clock v1.3.5 // indirect

require (
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/logging v1.8.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bep/godartsass v1.2.0 h1:E2VvQrxAHAFwbjyOIExAMmogTItSKodoKuijNrGm5yU=
Expand Down
232 changes: 222 additions & 10 deletions tailnet/configmaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package tailnet
import (
"context"
"errors"
"fmt"
"net/netip"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/google/uuid"
"go4.org/netipx"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/dns"
"tailscale.com/tailcfg"
"tailscale.com/types/ipproto"
Expand All @@ -23,10 +27,13 @@ import (
"github.com/coder/coder/v2/tailnet/proto"
)

const lostTimeout = 15 * time.Minute

// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
//
// This allows us to test configuration code without faking the whole interface.
type engineConfigurable interface {
UpdateStatus(*ipnstate.StatusBuilder)
SetNetworkMap(*netmap.NetworkMap)
Reconfig(*wgcfg.Config, *router.Config, *dns.Config, *tailcfg.Debug) error
SetDERPMap(*tailcfg.DERPMap)
Expand All @@ -49,12 +56,16 @@ type configMaps struct {
closing bool
phase phase

engine engineConfigurable
static netmap.NetworkMap
peers map[uuid.UUID]*peerLifecycle
addresses []netip.Prefix
derpMap *proto.DERPMap
logger slog.Logger
engine engineConfigurable
static netmap.NetworkMap
peers map[uuid.UUID]*peerLifecycle
addresses []netip.Prefix
derpMap *proto.DERPMap
logger slog.Logger
blockEndpoints bool

// for testing
clock clock.Clock
}

func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps {
Expand Down Expand Up @@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
},
peers: make(map[uuid.UUID]*peerLifecycle),
addresses: addresses,
clock: clock.New(),
}
go c.configLoop()
return c
Expand Down Expand Up @@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
func (c *configMaps) close() {
c.L.Lock()
defer c.L.Unlock()
for _, lc := range c.peers {
lc.resetTimer()
}
c.closing = true
c.Broadcast()
for c.phase != closed {
Expand Down Expand Up @@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter {
)
}

// updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held.
func (c *configMaps) updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) {
status := c.status()
c.L.Lock()
defer c.L.Unlock()

// Update all the lastHandshake values here. That way we don't have to
// worry about them being up-to-date when handling updates below, and it covers
// all peers, not just the ones we got updates about.
for _, lc := range c.peers {
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
lc.lastHandshake = peerStatus.LastHandshake
}
}

for _, update := range updates {
if dirty := c.updatePeerLocked(update, status); dirty {
c.netmapDirty = true
}
}
if c.netmapDirty {
c.Broadcast()
}
}

// status requests a status update from the engine.
func (c *configMaps) status() *ipnstate.Status {
sb := &ipnstate.StatusBuilder{WantPeers: true}
c.engine.UpdateStatus(sb)
return sb.Status()
}

// updatePeerLocked processes a single update for a single peer. It is intended
// as internal function since it returns whether or not the config is dirtied by
// the update (instead of handling it directly like updatePeers). c.L must be held.
func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdate, status *ipnstate.Status) (dirty bool) {
id, err := uuid.FromBytes(update.Id)
if err != nil {
c.logger.Critical(context.Background(), "received update with bad id", slog.F("id", update.Id))
return false
}
logger := c.logger.With(slog.F("peer_id", id))
lc, ok := c.peers[id]
var node *tailcfg.Node
if update.Kind == proto.CoordinateResponse_PeerUpdate_NODE {
// If no preferred DERP is provided, we can't reach the node.
if update.Node.PreferredDerp == 0 {
logger.Warn(context.Background(), "no preferred DERP, peer update", slog.F("node_proto", update.Node))
return false
}
node, err = c.protoNodeToTailcfg(update.Node)
if err != nil {
logger.Critical(context.Background(), "failed to convert proto node to tailcfg", slog.F("node_proto", update.Node))
return false
}
logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node))
peerStatus, ok := status.Peer[node.Key]
// Starting KeepAlive messages at the initialization of a connection
// causes a race condition. If we send the handshake before the peer has
// our node, we'll have to wait for 5 seconds before trying again.
// Ideally, the first handshake starts when the user first initiates a
// connection to the peer. After a successful connection we enable
// keep alives to persist the connection and keep it from becoming idle.
// SSH connections don't send packets while idle, so we use keep alives
// to avoid random hangs while we set up the connection again after
// inactivity.
node.KeepAlive = ok && peerStatus.Active
if c.blockEndpoints {
node.Endpoints = nil
}
}
switch {
case !ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
// new!
var lastHandshake time.Time
if ps, ok := status.Peer[node.Key]; ok {
lastHandshake = ps.LastHandshake
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a situation a peer would be missing from the configMaps peer list but still be in the engine's peer list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, immediately after receiving a DISCONNECTED update, we remove it from configMaps. Then until the next update it'll be in the engine but not configMaps. Probably an edge case for us to DISCONNECT and then immediately get a new NODE update, though.

c.peers[id] = &peerLifecycle{
peerID: id,
node: node,
lastHandshake: lastHandshake,
lost: false,
}
logger.Debug(context.Background(), "adding new peer")
return true
case ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
// update
node.Created = lc.node.Created
dirty = !lc.node.Equal(node)
lc.node = node
lc.lost = false
lc.resetTimer()
logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty))
return dirty
case !ok:
// disconnected or lost, but we don't have the node. No op
logger.Debug(context.Background(), "skipping update for peer we don't recognize")
return false
case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
lc.resetTimer()
delete(c.peers, id)
logger.Debug(context.Background(), "disconnected peer")
return true
case update.Kind == proto.CoordinateResponse_PeerUpdate_LOST:
lc.lost = true
lc.setLostTimer(c)
logger.Debug(context.Background(), "marked peer lost")
// marking a node lost doesn't change anything right now, so dirty=false
return false
default:
logger.Warn(context.Background(), "unknown peer update", slog.F("kind", update.Kind))
return false
}
}

// peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to
// receive a handshake fires.
func (c *configMaps) peerLostTimeout(id uuid.UUID) {
logger := c.logger.With(slog.F("peer_id", id))
logger.Debug(context.Background(),
"peer lost timeout")

// First do a status update to see if the peer did a handshake while we were
// waiting
status := c.status()
c.L.Lock()
defer c.L.Unlock()

lc, ok := c.peers[id]
if !ok {
logger.Debug(context.Background(),
"timeout triggered for peer that is removed from the map")
return
}
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
lc.lastHandshake = peerStatus.LastHandshake
}
logger = logger.With(slog.F("key_id", lc.node.Key.ShortString()))
if !lc.lost {
logger.Debug(context.Background(),
"timeout triggered for peer that is no longer lost")
return
}
since := c.clock.Since(lc.lastHandshake)
if since >= lostTimeout {
logger.Info(
context.Background(), "removing lost peer")
delete(c.peers, id)
c.netmapDirty = true
c.Broadcast()
return
}
logger.Debug(context.Background(),
"timeout triggered for peer but it had handshake in meantime")
lc.setLostTimer(c)
}

func (c *configMaps) protoNodeToTailcfg(p *proto.Node) (*tailcfg.Node, error) {
node, err := ProtoToNode(p)
if err != nil {
return nil, err
}
return &tailcfg.Node{
ID: tailcfg.NodeID(p.GetId()),
Created: c.clock.Now(),
Key: node.Key,
DiscoKey: node.DiscoKey,
Addresses: node.Addresses,
AllowedIPs: node.AllowedIPs,
Endpoints: node.Endpoints,
DERP: fmt.Sprintf("%s:%d", tailcfg.DerpMagicIP, node.PreferredDERP),
Hostinfo: (&tailcfg.Hostinfo{}).View(),
}, nil
}

type peerLifecycle struct {
node *tailcfg.Node
// TODO: implement timers to track lost peers
// lastHandshake time.Time
// timer time.Timer
peerID uuid.UUID
node *tailcfg.Node
lost bool
lastHandshake time.Time
timer *clock.Timer
}

func (l *peerLifecycle) resetTimer() {
if l.timer != nil {
l.timer.Stop()
l.timer = nil
}
}

func (l *peerLifecycle) setLostTimer(c *configMaps) {
if l.timer != nil {
l.timer.Stop()
}
ttl := lostTimeout - c.clock.Since(l.lastHandshake)
if ttl <= 0 {
ttl = time.Nanosecond
}
l.timer = c.clock.AfterFunc(ttl, func() {
c.peerLostTimeout(l.peerID)
})
}

// prefixesDifferent returns true if the two slices contain different prefixes
Expand Down
Loading