@@ -3,11 +3,15 @@ package tailnet
3
3
import (
4
4
"context"
5
5
"errors"
6
+ "fmt"
6
7
"net/netip"
7
8
"sync"
9
+ "time"
8
10
11
+ "github.com/benbjohnson/clock"
9
12
"github.com/google/uuid"
10
13
"go4.org/netipx"
14
+ "tailscale.com/ipn/ipnstate"
11
15
"tailscale.com/net/dns"
12
16
"tailscale.com/tailcfg"
13
17
"tailscale.com/types/ipproto"
@@ -23,10 +27,13 @@ import (
23
27
"github.com/coder/coder/v2/tailnet/proto"
24
28
)
25
29
30
+ const lostTimeout = 15 * time .Minute
31
+
26
32
// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
27
33
//
28
34
// This allows us to test configuration code without faking the whole interface.
29
35
type engineConfigurable interface {
36
+ UpdateStatus (* ipnstate.StatusBuilder )
30
37
SetNetworkMap (* netmap.NetworkMap )
31
38
Reconfig (* wgcfg.Config , * router.Config , * dns.Config , * tailcfg.Debug ) error
32
39
SetDERPMap (* tailcfg.DERPMap )
@@ -49,12 +56,16 @@ type configMaps struct {
49
56
closing bool
50
57
phase phase
51
58
52
- engine engineConfigurable
53
- static netmap.NetworkMap
54
- peers map [uuid.UUID ]* peerLifecycle
55
- addresses []netip.Prefix
56
- derpMap * proto.DERPMap
57
- logger slog.Logger
59
+ engine engineConfigurable
60
+ static netmap.NetworkMap
61
+ peers map [uuid.UUID ]* peerLifecycle
62
+ addresses []netip.Prefix
63
+ derpMap * proto.DERPMap
64
+ logger slog.Logger
65
+ blockEndpoints bool
66
+
67
+ // for testing
68
+ clock clock.Clock
58
69
}
59
70
60
71
func newConfigMaps (logger slog.Logger , engine engineConfigurable , nodeID tailcfg.NodeID , nodeKey key.NodePrivate , discoKey key.DiscoPublic , addresses []netip.Prefix ) * configMaps {
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
101
112
},
102
113
peers : make (map [uuid.UUID ]* peerLifecycle ),
103
114
addresses : addresses ,
115
+ clock : clock .New (),
104
116
}
105
117
go c .configLoop ()
106
118
return c
@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
165
177
func (c * configMaps ) close () {
166
178
c .L .Lock ()
167
179
defer c .L .Unlock ()
180
+ for _ , lc := range c .peers {
181
+ lc .resetTimer ()
182
+ }
168
183
c .closing = true
169
184
c .Broadcast ()
170
185
for c .phase != closed {
@@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter {
260
275
)
261
276
}
262
277
278
+ // updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held.
279
+ func (c * configMaps ) updatePeers (updates []* proto.CoordinateResponse_PeerUpdate ) {
280
+ status := c .status ()
281
+ c .L .Lock ()
282
+ defer c .L .Unlock ()
283
+
284
+ // Update all the lastHandshake values here. That way we don't have to
285
+ // worry about them being up-to-date when handling updates below, and it covers
286
+ // all peers, not just the ones we got updates about.
287
+ for _ , lc := range c .peers {
288
+ if peerStatus , ok := status .Peer [lc .node .Key ]; ok {
289
+ lc .lastHandshake = peerStatus .LastHandshake
290
+ }
291
+ }
292
+
293
+ for _ , update := range updates {
294
+ if dirty := c .updatePeerLocked (update , status ); dirty {
295
+ c .netmapDirty = true
296
+ }
297
+ }
298
+ if c .netmapDirty {
299
+ c .Broadcast ()
300
+ }
301
+ }
302
+
303
+ // status requests a status update from the engine.
304
+ func (c * configMaps ) status () * ipnstate.Status {
305
+ sb := & ipnstate.StatusBuilder {WantPeers : true }
306
+ c .engine .UpdateStatus (sb )
307
+ return sb .Status ()
308
+ }
309
+
310
+ // updatePeerLocked processes a single update for a single peer. It is intended
311
+ // as internal function since it returns whether or not the config is dirtied by
312
+ // the update (instead of handling it directly like updatePeers). c.L must be held.
313
+ func (c * configMaps ) updatePeerLocked (update * proto.CoordinateResponse_PeerUpdate , status * ipnstate.Status ) (dirty bool ) {
314
+ id , err := uuid .FromBytes (update .Id )
315
+ if err != nil {
316
+ c .logger .Critical (context .Background (), "received update with bad id" , slog .F ("id" , update .Id ))
317
+ return false
318
+ }
319
+ logger := c .logger .With (slog .F ("peer_id" , id ))
320
+ lc , ok := c .peers [id ]
321
+ var node * tailcfg.Node
322
+ if update .Kind == proto .CoordinateResponse_PeerUpdate_NODE {
323
+ // If no preferred DERP is provided, we can't reach the node.
324
+ if update .Node .PreferredDerp == 0 {
325
+ logger .Warn (context .Background (), "no preferred DERP, peer update" , slog .F ("node_proto" , update .Node ))
326
+ return false
327
+ }
328
+ node , err = c .protoNodeToTailcfg (update .Node )
329
+ if err != nil {
330
+ logger .Critical (context .Background (), "failed to convert proto node to tailcfg" , slog .F ("node_proto" , update .Node ))
331
+ return false
332
+ }
333
+ logger = logger .With (slog .F ("key_id" , node .Key .ShortString ()), slog .F ("node" , node ))
334
+ peerStatus , ok := status .Peer [node .Key ]
335
+ // Starting KeepAlive messages at the initialization of a connection
336
+ // causes a race condition. If we send the handshake before the peer has
337
+ // our node, we'll have to wait for 5 seconds before trying again.
338
+ // Ideally, the first handshake starts when the user first initiates a
339
+ // connection to the peer. After a successful connection we enable
340
+ // keep alives to persist the connection and keep it from becoming idle.
341
+ // SSH connections don't send packets while idle, so we use keep alives
342
+ // to avoid random hangs while we set up the connection again after
343
+ // inactivity.
344
+ node .KeepAlive = ok && peerStatus .Active
345
+ if c .blockEndpoints {
346
+ node .Endpoints = nil
347
+ }
348
+ }
349
+ switch {
350
+ case ! ok && update .Kind == proto .CoordinateResponse_PeerUpdate_NODE :
351
+ // new!
352
+ var lastHandshake time.Time
353
+ if ps , ok := status .Peer [node .Key ]; ok {
354
+ lastHandshake = ps .LastHandshake
355
+ }
356
+ c .peers [id ] = & peerLifecycle {
357
+ peerID : id ,
358
+ node : node ,
359
+ lastHandshake : lastHandshake ,
360
+ lost : false ,
361
+ }
362
+ logger .Debug (context .Background (), "adding new peer" )
363
+ return true
364
+ case ok && update .Kind == proto .CoordinateResponse_PeerUpdate_NODE :
365
+ // update
366
+ node .Created = lc .node .Created
367
+ dirty = ! lc .node .Equal (node )
368
+ lc .node = node
369
+ lc .lost = false
370
+ lc .resetTimer ()
371
+ logger .Debug (context .Background (), "node update to existing peer" , slog .F ("dirty" , dirty ))
372
+ return dirty
373
+ case ! ok :
374
+ // disconnected or lost, but we don't have the node. No op
375
+ logger .Debug (context .Background (), "skipping update for peer we don't recognize" )
376
+ return false
377
+ case update .Kind == proto .CoordinateResponse_PeerUpdate_DISCONNECTED :
378
+ lc .resetTimer ()
379
+ delete (c .peers , id )
380
+ logger .Debug (context .Background (), "disconnected peer" )
381
+ return true
382
+ case update .Kind == proto .CoordinateResponse_PeerUpdate_LOST :
383
+ lc .lost = true
384
+ lc .setLostTimer (c )
385
+ logger .Debug (context .Background (), "marked peer lost" )
386
+ // marking a node lost doesn't change anything right now, so dirty=false
387
+ return false
388
+ default :
389
+ logger .Warn (context .Background (), "unknown peer update" , slog .F ("kind" , update .Kind ))
390
+ return false
391
+ }
392
+ }
393
+
394
+ // peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to
395
+ // receive a handshake fires.
396
+ func (c * configMaps ) peerLostTimeout (id uuid.UUID ) {
397
+ logger := c .logger .With (slog .F ("peer_id" , id ))
398
+ logger .Debug (context .Background (),
399
+ "peer lost timeout" )
400
+
401
+ // First do a status update to see if the peer did a handshake while we were
402
+ // waiting
403
+ status := c .status ()
404
+ c .L .Lock ()
405
+ defer c .L .Unlock ()
406
+
407
+ lc , ok := c .peers [id ]
408
+ if ! ok {
409
+ logger .Debug (context .Background (),
410
+ "timeout triggered for peer that is removed from the map" )
411
+ return
412
+ }
413
+ if peerStatus , ok := status .Peer [lc .node .Key ]; ok {
414
+ lc .lastHandshake = peerStatus .LastHandshake
415
+ }
416
+ logger = logger .With (slog .F ("key_id" , lc .node .Key .ShortString ()))
417
+ if ! lc .lost {
418
+ logger .Debug (context .Background (),
419
+ "timeout triggered for peer that is no longer lost" )
420
+ return
421
+ }
422
+ since := c .clock .Since (lc .lastHandshake )
423
+ if since >= lostTimeout {
424
+ logger .Info (
425
+ context .Background (), "removing lost peer" )
426
+ delete (c .peers , id )
427
+ c .netmapDirty = true
428
+ c .Broadcast ()
429
+ return
430
+ }
431
+ logger .Debug (context .Background (),
432
+ "timeout triggered for peer but it had handshake in meantime" )
433
+ lc .setLostTimer (c )
434
+ }
435
+
436
+ func (c * configMaps ) protoNodeToTailcfg (p * proto.Node ) (* tailcfg.Node , error ) {
437
+ node , err := ProtoToNode (p )
438
+ if err != nil {
439
+ return nil , err
440
+ }
441
+ return & tailcfg.Node {
442
+ ID : tailcfg .NodeID (p .GetId ()),
443
+ Created : c .clock .Now (),
444
+ Key : node .Key ,
445
+ DiscoKey : node .DiscoKey ,
446
+ Addresses : node .Addresses ,
447
+ AllowedIPs : node .AllowedIPs ,
448
+ Endpoints : node .Endpoints ,
449
+ DERP : fmt .Sprintf ("%s:%d" , tailcfg .DerpMagicIP , node .PreferredDERP ),
450
+ Hostinfo : (& tailcfg.Hostinfo {}).View (),
451
+ }, nil
452
+ }
453
+
263
454
type peerLifecycle struct {
264
- node * tailcfg.Node
265
- // TODO: implement timers to track lost peers
266
- // lastHandshake time.Time
267
- // timer time.Timer
455
+ peerID uuid.UUID
456
+ node * tailcfg.Node
457
+ lost bool
458
+ lastHandshake time.Time
459
+ timer * clock.Timer
460
+ }
461
+
462
+ func (l * peerLifecycle ) resetTimer () {
463
+ if l .timer != nil {
464
+ l .timer .Stop ()
465
+ l .timer = nil
466
+ }
467
+ }
468
+
469
+ func (l * peerLifecycle ) setLostTimer (c * configMaps ) {
470
+ if l .timer != nil {
471
+ l .timer .Stop ()
472
+ }
473
+ ttl := lostTimeout - c .clock .Since (l .lastHandshake )
474
+ if ttl <= 0 {
475
+ ttl = time .Nanosecond
476
+ }
477
+ l .timer = c .clock .AfterFunc (ttl , func () {
478
+ c .peerLostTimeout (l .peerID )
479
+ })
268
480
}
269
481
270
482
// prefixesDifferent returns true if the two slices contain different prefixes
0 commit comments