Skip to content

Commit b573a10

Browse files
committed
chore: add support for peer updates to tailnet.configMaps
1 parent c125206 commit b573a10

File tree

4 files changed

+735
-21
lines changed

4 files changed

+735
-21
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ require (
206206

207207
require go.uber.org/mock v0.4.0
208208

209+
require github.com/benbjohnson/clock v1.3.5 // indirect
210+
209211
require (
210212
cloud.google.com/go/compute v1.23.3 // indirect
211213
cloud.google.com/go/logging v1.8.1 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE
123123
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
124124
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
125125
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
126+
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
127+
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
126128
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
127129
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
128130
github.com/bep/godartsass v1.2.0 h1:E2VvQrxAHAFwbjyOIExAMmogTItSKodoKuijNrGm5yU=

tailnet/configmaps.go

+222-10
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ package tailnet
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"net/netip"
78
"sync"
9+
"time"
810

11+
"github.com/benbjohnson/clock"
912
"github.com/google/uuid"
1013
"go4.org/netipx"
14+
"tailscale.com/ipn/ipnstate"
1115
"tailscale.com/net/dns"
1216
"tailscale.com/tailcfg"
1317
"tailscale.com/types/ipproto"
@@ -23,10 +27,13 @@ import (
2327
"github.com/coder/coder/v2/tailnet/proto"
2428
)
2529

30+
const lostTimeout = 15 * time.Minute
31+
2632
// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
2733
//
2834
// This allows us to test configuration code without faking the whole interface.
2935
type engineConfigurable interface {
36+
UpdateStatus(*ipnstate.StatusBuilder)
3037
SetNetworkMap(*netmap.NetworkMap)
3138
Reconfig(*wgcfg.Config, *router.Config, *dns.Config, *tailcfg.Debug) error
3239
SetDERPMap(*tailcfg.DERPMap)
@@ -49,12 +56,16 @@ type configMaps struct {
4956
closing bool
5057
phase phase
5158

52-
engine engineConfigurable
53-
static netmap.NetworkMap
54-
peers map[uuid.UUID]*peerLifecycle
55-
addresses []netip.Prefix
56-
derpMap *proto.DERPMap
57-
logger slog.Logger
59+
engine engineConfigurable
60+
static netmap.NetworkMap
61+
peers map[uuid.UUID]*peerLifecycle
62+
addresses []netip.Prefix
63+
derpMap *proto.DERPMap
64+
logger slog.Logger
65+
blockEndpoints bool
66+
67+
// for testing
68+
clock clock.Clock
5869
}
5970

6071
func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps {
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
101112
},
102113
peers: make(map[uuid.UUID]*peerLifecycle),
103114
addresses: addresses,
115+
clock: clock.New(),
104116
}
105117
go c.configLoop()
106118
return c
@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
165177
func (c *configMaps) close() {
166178
c.L.Lock()
167179
defer c.L.Unlock()
180+
for _, lc := range c.peers {
181+
lc.resetTimer()
182+
}
168183
c.closing = true
169184
c.Broadcast()
170185
for c.phase != closed {
@@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter {
260275
)
261276
}
262277

278+
// updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held.
279+
func (c *configMaps) updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) {
280+
status := c.status()
281+
c.L.Lock()
282+
defer c.L.Unlock()
283+
284+
// Update all the lastHandshake values here. That way we don't have to
285+
// worry about them being up-to-date when handling updates below, and it covers
286+
// all peers, not just the ones we got updates about.
287+
for _, lc := range c.peers {
288+
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
289+
lc.lastHandshake = peerStatus.LastHandshake
290+
}
291+
}
292+
293+
for _, update := range updates {
294+
if dirty := c.updatePeerLocked(update, status); dirty {
295+
c.netmapDirty = true
296+
}
297+
}
298+
if c.netmapDirty {
299+
c.Broadcast()
300+
}
301+
}
302+
303+
// status requests a status update from the engine.
304+
func (c *configMaps) status() *ipnstate.Status {
305+
sb := &ipnstate.StatusBuilder{WantPeers: true}
306+
c.engine.UpdateStatus(sb)
307+
return sb.Status()
308+
}
309+
310+
// updatePeerLocked processes a single update for a single peer. It is intended
311+
// as internal function since it returns whether or not the config is dirtied by
312+
// the update (instead of handling it directly like updatePeers). c.L must be held.
313+
func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdate, status *ipnstate.Status) (dirty bool) {
314+
id, err := uuid.FromBytes(update.Id)
315+
if err != nil {
316+
c.logger.Critical(context.Background(), "received update with bad id", slog.F("id", update.Id))
317+
return false
318+
}
319+
logger := c.logger.With(slog.F("peer_id", id))
320+
lc, ok := c.peers[id]
321+
var node *tailcfg.Node
322+
if update.Kind == proto.CoordinateResponse_PeerUpdate_NODE {
323+
// If no preferred DERP is provided, we can't reach the node.
324+
if update.Node.PreferredDerp == 0 {
325+
logger.Warn(context.Background(), "no preferred DERP, peer update", slog.F("node_proto", update.Node))
326+
return false
327+
}
328+
node, err = c.protoNodeToTailcfg(update.Node)
329+
if err != nil {
330+
logger.Critical(context.Background(), "failed to convert proto node to tailcfg", slog.F("node_proto", update.Node))
331+
return false
332+
}
333+
logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node))
334+
peerStatus, ok := status.Peer[node.Key]
335+
// Starting KeepAlive messages at the initialization of a connection
336+
// causes a race condition. If we send the handshake before the peer has
337+
// our node, we'll have to wait for 5 seconds before trying again.
338+
// Ideally, the first handshake starts when the user first initiates a
339+
// connection to the peer. After a successful connection we enable
340+
// keep alives to persist the connection and keep it from becoming idle.
341+
// SSH connections don't send packets while idle, so we use keep alives
342+
// to avoid random hangs while we set up the connection again after
343+
// inactivity.
344+
node.KeepAlive = ok && peerStatus.Active
345+
if c.blockEndpoints {
346+
node.Endpoints = nil
347+
}
348+
}
349+
switch {
350+
case !ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
351+
// new!
352+
var lastHandshake time.Time
353+
if ps, ok := status.Peer[node.Key]; ok {
354+
lastHandshake = ps.LastHandshake
355+
}
356+
c.peers[id] = &peerLifecycle{
357+
peerID: id,
358+
node: node,
359+
lastHandshake: lastHandshake,
360+
lost: false,
361+
}
362+
logger.Debug(context.Background(), "adding new peer")
363+
return true
364+
case ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
365+
// update
366+
node.Created = lc.node.Created
367+
dirty = !lc.node.Equal(node)
368+
lc.node = node
369+
lc.lost = false
370+
lc.resetTimer()
371+
logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty))
372+
return dirty
373+
case !ok:
374+
// disconnected or lost, but we don't have the node. No op
375+
logger.Debug(context.Background(), "skipping update for peer we don't recognize")
376+
return false
377+
case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
378+
lc.resetTimer()
379+
delete(c.peers, id)
380+
logger.Debug(context.Background(), "disconnected peer")
381+
return true
382+
case update.Kind == proto.CoordinateResponse_PeerUpdate_LOST:
383+
lc.lost = true
384+
lc.setLostTimer(c)
385+
logger.Debug(context.Background(), "marked peer lost")
386+
// marking a node lost doesn't change anything right now, so dirty=false
387+
return false
388+
default:
389+
logger.Warn(context.Background(), "unknown peer update", slog.F("kind", update.Kind))
390+
return false
391+
}
392+
}
393+
394+
// peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to
395+
// receive a handshake fires.
396+
func (c *configMaps) peerLostTimeout(id uuid.UUID) {
397+
logger := c.logger.With(slog.F("peer_id", id))
398+
logger.Debug(context.Background(),
399+
"peer lost timeout")
400+
401+
// First do a status update to see if the peer did a handshake while we were
402+
// waiting
403+
status := c.status()
404+
c.L.Lock()
405+
defer c.L.Unlock()
406+
407+
lc, ok := c.peers[id]
408+
if !ok {
409+
logger.Debug(context.Background(),
410+
"timeout triggered for peer that is removed from the map")
411+
return
412+
}
413+
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
414+
lc.lastHandshake = peerStatus.LastHandshake
415+
}
416+
logger = logger.With(slog.F("key_id", lc.node.Key.ShortString()))
417+
if !lc.lost {
418+
logger.Debug(context.Background(),
419+
"timeout triggered for peer that is no longer lost")
420+
return
421+
}
422+
since := c.clock.Since(lc.lastHandshake)
423+
if since >= lostTimeout {
424+
logger.Info(
425+
context.Background(), "removing lost peer")
426+
delete(c.peers, id)
427+
c.netmapDirty = true
428+
c.Broadcast()
429+
return
430+
}
431+
logger.Debug(context.Background(),
432+
"timeout triggered for peer but it had handshake in meantime")
433+
lc.setLostTimer(c)
434+
}
435+
436+
func (c *configMaps) protoNodeToTailcfg(p *proto.Node) (*tailcfg.Node, error) {
437+
node, err := ProtoToNode(p)
438+
if err != nil {
439+
return nil, err
440+
}
441+
return &tailcfg.Node{
442+
ID: tailcfg.NodeID(p.GetId()),
443+
Created: c.clock.Now(),
444+
Key: node.Key,
445+
DiscoKey: node.DiscoKey,
446+
Addresses: node.Addresses,
447+
AllowedIPs: node.AllowedIPs,
448+
Endpoints: node.Endpoints,
449+
DERP: fmt.Sprintf("%s:%d", tailcfg.DerpMagicIP, node.PreferredDERP),
450+
Hostinfo: (&tailcfg.Hostinfo{}).View(),
451+
}, nil
452+
}
453+
263454
type peerLifecycle struct {
264-
node *tailcfg.Node
265-
// TODO: implement timers to track lost peers
266-
// lastHandshake time.Time
267-
// timer time.Timer
455+
peerID uuid.UUID
456+
node *tailcfg.Node
457+
lost bool
458+
lastHandshake time.Time
459+
timer *clock.Timer
460+
}
461+
462+
func (l *peerLifecycle) resetTimer() {
463+
if l.timer != nil {
464+
l.timer.Stop()
465+
l.timer = nil
466+
}
467+
}
468+
469+
func (l *peerLifecycle) setLostTimer(c *configMaps) {
470+
if l.timer != nil {
471+
l.timer.Stop()
472+
}
473+
ttl := lostTimeout - c.clock.Since(l.lastHandshake)
474+
if ttl <= 0 {
475+
ttl = time.Nanosecond
476+
}
477+
l.timer = c.clock.AfterFunc(ttl, func() {
478+
c.peerLostTimeout(l.peerID)
479+
})
268480
}
269481

270482
// prefixesDifferent returns true if the two slices contain different prefixes

0 commit comments

Comments
 (0)