Skip to content

Commit 3844f8a

Browse files
committed
chore: add support for peer updates to tailnet.configMaps
1 parent 6f37b9b commit 3844f8a

File tree

4 files changed

+719
-21
lines changed

4 files changed

+719
-21
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ require (
206206

207207
require go.uber.org/mock v0.4.0
208208

209+
require github.com/benbjohnson/clock v1.3.5 // indirect
210+
209211
require (
210212
cloud.google.com/go/compute v1.23.3 // indirect
211213
cloud.google.com/go/logging v1.8.1 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE
123123
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
124124
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
125125
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
126+
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
127+
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
126128
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
127129
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
128130
github.com/bep/godartsass v1.2.0 h1:E2VvQrxAHAFwbjyOIExAMmogTItSKodoKuijNrGm5yU=

tailnet/configmaps.go

+215-10
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ package tailnet
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"net/netip"
78
"sync"
9+
"time"
810

11+
"github.com/benbjohnson/clock"
912
"github.com/google/uuid"
1013
"go4.org/netipx"
14+
"tailscale.com/ipn/ipnstate"
1115
"tailscale.com/net/dns"
1216
"tailscale.com/tailcfg"
1317
"tailscale.com/types/ipproto"
@@ -23,10 +27,13 @@ import (
2327
"github.com/coder/coder/v2/tailnet/proto"
2428
)
2529

30+
const lostTimeout = 15 * time.Minute
31+
2632
// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
2733
//
2834
// This allows us to test configuration code without faking the whole interface.
2935
type engineConfigurable interface {
36+
UpdateStatus(*ipnstate.StatusBuilder)
3037
SetNetworkMap(*netmap.NetworkMap)
3138
Reconfig(*wgcfg.Config, *router.Config, *dns.Config, *tailcfg.Debug) error
3239
SetDERPMap(*tailcfg.DERPMap)
@@ -49,12 +56,16 @@ type configMaps struct {
4956
closing bool
5057
phase phase
5158

52-
engine engineConfigurable
53-
static netmap.NetworkMap
54-
peers map[uuid.UUID]*peerLifecycle
55-
addresses []netip.Prefix
56-
derpMap *proto.DERPMap
57-
logger slog.Logger
59+
engine engineConfigurable
60+
static netmap.NetworkMap
61+
peers map[uuid.UUID]*peerLifecycle
62+
addresses []netip.Prefix
63+
derpMap *proto.DERPMap
64+
logger slog.Logger
65+
blockEndpoints bool
66+
67+
// for testing
68+
clock clock.Clock
5869
}
5970

6071
func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps {
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
101112
},
102113
peers: make(map[uuid.UUID]*peerLifecycle),
103114
addresses: addresses,
115+
clock: clock.New(),
104116
}
105117
go c.configLoop()
106118
return c
@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
165177
func (c *configMaps) close() {
166178
c.L.Lock()
167179
defer c.L.Unlock()
180+
for _, lc := range c.peers {
181+
lc.resetTimer()
182+
}
168183
c.closing = true
169184
c.Broadcast()
170185
for c.phase != closed {
@@ -248,11 +263,201 @@ func (c *configMaps) filterLocked() *filter.Filter {
248263
)
249264
}
250265

266+
func (c *configMaps) updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) {
267+
status := c.status()
268+
c.L.Lock()
269+
defer c.L.Unlock()
270+
271+
// Update all the lastHandshake values here. That way we don't have to
272+
// worry about them being up-to-date when handling updates below, and it covers
273+
// all peers, not just the ones we got updates about.
274+
for _, lc := range c.peers {
275+
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
276+
lc.lastHandshake = peerStatus.LastHandshake
277+
}
278+
}
279+
280+
for _, update := range updates {
281+
if dirty := c.updatePeerLocked(update, status); dirty {
282+
c.netmapDirty = true
283+
}
284+
}
285+
if c.netmapDirty {
286+
c.Broadcast()
287+
}
288+
}
289+
290+
func (c *configMaps) status() *ipnstate.Status {
291+
sb := &ipnstate.StatusBuilder{WantPeers: true}
292+
c.engine.UpdateStatus(sb)
293+
return sb.Status()
294+
}
295+
296+
func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdate, status *ipnstate.Status) (dirty bool) {
297+
id, err := uuid.FromBytes(update.Id)
298+
if err != nil {
299+
c.logger.Critical(context.Background(), "received update with bad id", slog.F("id", update.Id))
300+
return false
301+
}
302+
logger := c.logger.With(slog.F("peer_id", id))
303+
lc, ok := c.peers[id]
304+
var node *tailcfg.Node
305+
if update.Kind == proto.CoordinateResponse_PeerUpdate_NODE {
306+
// If no preferred DERP is provided, we can't reach the node.
307+
if update.Node.PreferredDerp == 0 {
308+
logger.Warn(context.Background(), "no preferred DERP, peer update", slog.F("node_proto", update.Node))
309+
return false
310+
}
311+
node, err = c.protoNodeToTailcfg(update.Node)
312+
if err != nil {
313+
logger.Critical(context.Background(), "failed to convert proto node to tailcfg", slog.F("node_proto", update.Node))
314+
return false
315+
}
316+
logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node))
317+
peerStatus, ok := status.Peer[node.Key]
318+
// Starting KeepAlive messages at the initialization of a connection
319+
// causes a race condition. If we send the handshake before the peer has
320+
// our node, we'll have to wait for 5 seconds before trying again.
321+
// Ideally, the first handshake starts when the user first initiates a
322+
// connection to the peer. After a successful connection we enable
323+
// keep alives to persist the connection and keep it from becoming idle.
324+
// SSH connections don't send packets while idle, so we use keep alives
325+
// to avoid random hangs while we set up the connection again after
326+
// inactivity.
327+
node.KeepAlive = ok && peerStatus.Active
328+
if c.blockEndpoints {
329+
node.Endpoints = nil
330+
}
331+
}
332+
switch {
333+
case !ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
334+
// new!
335+
var lastHandshake time.Time
336+
if ps, ok := status.Peer[node.Key]; ok {
337+
lastHandshake = ps.LastHandshake
338+
}
339+
c.peers[id] = &peerLifecycle{
340+
peerID: id,
341+
node: node,
342+
lastHandshake: lastHandshake,
343+
lost: false,
344+
}
345+
logger.Debug(context.Background(), "adding new peer")
346+
return true
347+
case ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
348+
// update
349+
node.Created = lc.node.Created
350+
dirty = !lc.node.Equal(node)
351+
lc.node = node
352+
lc.lost = false
353+
lc.resetTimer()
354+
logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty))
355+
return dirty
356+
case !ok:
357+
// disconnected or lost, but we don't have the node. No op
358+
logger.Debug(context.Background(), "skipping update for peer we don't recognize")
359+
return false
360+
case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
361+
lc.resetTimer()
362+
delete(c.peers, id)
363+
logger.Debug(context.Background(), "disconnected peer")
364+
return true
365+
case update.Kind == proto.CoordinateResponse_PeerUpdate_LOST:
366+
lc.lost = true
367+
lc.setLostTimer(c)
368+
logger.Debug(context.Background(), "marked peer lost")
369+
// marking a node lost doesn't change anything right now, so dirty=false
370+
return false
371+
default:
372+
logger.Warn(context.Background(), "unknown peer update", slog.F("kind", update.Kind))
373+
return false
374+
}
375+
}
376+
377+
func (c *configMaps) peerLostTimeout(id uuid.UUID) {
378+
logger := c.logger.With(slog.F("peer_id", id))
379+
logger.Debug(context.Background(),
380+
"peer lost timeout")
381+
382+
// First do a status update to see if the peer did a handshake while we were
383+
// waiting
384+
status := c.status()
385+
c.L.Lock()
386+
defer c.L.Unlock()
387+
388+
lc, ok := c.peers[id]
389+
if !ok {
390+
logger.Debug(context.Background(),
391+
"timeout triggered for peer that is removed from the map")
392+
return
393+
}
394+
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
395+
lc.lastHandshake = peerStatus.LastHandshake
396+
}
397+
logger = logger.With(slog.F("key_id", lc.node.Key.ShortString()))
398+
if !lc.lost {
399+
logger.Debug(context.Background(),
400+
"timeout triggered for peer that is no longer lost")
401+
return
402+
}
403+
since := c.clock.Since(lc.lastHandshake)
404+
if since >= lostTimeout {
405+
logger.Info(
406+
context.Background(), "removing lost peer")
407+
delete(c.peers, id)
408+
c.netmapDirty = true
409+
c.Broadcast()
410+
return
411+
}
412+
logger.Debug(context.Background(),
413+
"timeout triggered for peer but it had handshake in meantime")
414+
lc.setLostTimer(c)
415+
}
416+
417+
func (c *configMaps) protoNodeToTailcfg(p *proto.Node) (*tailcfg.Node, error) {
418+
node, err := ProtoToNode(p)
419+
if err != nil {
420+
return nil, err
421+
}
422+
return &tailcfg.Node{
423+
ID: tailcfg.NodeID(p.GetId()),
424+
Created: c.clock.Now(),
425+
Key: node.Key,
426+
DiscoKey: node.DiscoKey,
427+
Addresses: node.Addresses,
428+
AllowedIPs: node.AllowedIPs,
429+
Endpoints: node.Endpoints,
430+
DERP: fmt.Sprintf("%s:%d", tailcfg.DerpMagicIP, node.PreferredDERP),
431+
Hostinfo: (&tailcfg.Hostinfo{}).View(),
432+
}, nil
433+
}
434+
251435
type peerLifecycle struct {
252-
node *tailcfg.Node
253-
// TODO: implement timers to track lost peers
254-
// lastHandshake time.Time
255-
// timer time.Timer
436+
peerID uuid.UUID
437+
node *tailcfg.Node
438+
lost bool
439+
lastHandshake time.Time
440+
timer *clock.Timer
441+
}
442+
443+
func (l *peerLifecycle) resetTimer() {
444+
if l.timer != nil {
445+
l.timer.Stop()
446+
l.timer = nil
447+
}
448+
}
449+
450+
func (l *peerLifecycle) setLostTimer(c *configMaps) {
451+
if l.timer != nil {
452+
l.timer.Stop()
453+
}
454+
ttl := lostTimeout - c.clock.Since(l.lastHandshake)
455+
if ttl <= 0 {
456+
ttl = time.Nanosecond
457+
}
458+
l.timer = c.clock.AfterFunc(ttl, func() {
459+
c.peerLostTimeout(l.peerID)
460+
})
256461
}
257462

258463
// prefixesDifferent returns true if the two slices contain different prefixes

0 commit comments

Comments
 (0)