@@ -3,11 +3,15 @@ package tailnet
3
3
import (
4
4
"context"
5
5
"errors"
6
+ "fmt"
6
7
"net/netip"
7
8
"sync"
9
+ "time"
8
10
11
+ "github.com/benbjohnson/clock"
9
12
"github.com/google/uuid"
10
13
"go4.org/netipx"
14
+ "tailscale.com/ipn/ipnstate"
11
15
"tailscale.com/net/dns"
12
16
"tailscale.com/tailcfg"
13
17
"tailscale.com/types/ipproto"
@@ -23,10 +27,13 @@ import (
23
27
"github.com/coder/coder/v2/tailnet/proto"
24
28
)
25
29
30
+ const lostTimeout = 15 * time .Minute
31
+
26
32
// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
27
33
//
28
34
// This allows us to test configuration code without faking the whole interface.
29
35
type engineConfigurable interface {
36
+ UpdateStatus (* ipnstate.StatusBuilder )
30
37
SetNetworkMap (* netmap.NetworkMap )
31
38
Reconfig (* wgcfg.Config , * router.Config , * dns.Config , * tailcfg.Debug ) error
32
39
SetDERPMap (* tailcfg.DERPMap )
@@ -49,12 +56,16 @@ type configMaps struct {
49
56
closing bool
50
57
phase phase
51
58
52
- engine engineConfigurable
53
- static netmap.NetworkMap
54
- peers map [uuid.UUID ]* peerLifecycle
55
- addresses []netip.Prefix
56
- derpMap * proto.DERPMap
57
- logger slog.Logger
59
+ engine engineConfigurable
60
+ static netmap.NetworkMap
61
+ peers map [uuid.UUID ]* peerLifecycle
62
+ addresses []netip.Prefix
63
+ derpMap * proto.DERPMap
64
+ logger slog.Logger
65
+ blockEndpoints bool
66
+
67
+ // for testing
68
+ clock clock.Clock
58
69
}
59
70
60
71
func newConfigMaps (logger slog.Logger , engine engineConfigurable , nodeID tailcfg.NodeID , nodeKey key.NodePrivate , discoKey key.DiscoPublic , addresses []netip.Prefix ) * configMaps {
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
101
112
},
102
113
peers : make (map [uuid.UUID ]* peerLifecycle ),
103
114
addresses : addresses ,
115
+ clock : clock .New (),
104
116
}
105
117
go c .configLoop ()
106
118
return c
@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
165
177
func (c * configMaps ) close () {
166
178
c .L .Lock ()
167
179
defer c .L .Unlock ()
180
+ for _ , lc := range c .peers {
181
+ lc .resetTimer ()
182
+ }
168
183
c .closing = true
169
184
c .Broadcast ()
170
185
for c .phase != closed {
@@ -248,11 +263,201 @@ func (c *configMaps) filterLocked() *filter.Filter {
248
263
)
249
264
}
250
265
266
+ func (c * configMaps ) updatePeers (updates []* proto.CoordinateResponse_PeerUpdate ) {
267
+ status := c .status ()
268
+ c .L .Lock ()
269
+ defer c .L .Unlock ()
270
+
271
+ // Update all the lastHandshake values here. That way we don't have to
272
+ // worry about them being up-to-date when handling updates below, and it covers
273
+ // all peers, not just the ones we got updates about.
274
+ for _ , lc := range c .peers {
275
+ if peerStatus , ok := status .Peer [lc .node .Key ]; ok {
276
+ lc .lastHandshake = peerStatus .LastHandshake
277
+ }
278
+ }
279
+
280
+ for _ , update := range updates {
281
+ if dirty := c .updatePeerLocked (update , status ); dirty {
282
+ c .netmapDirty = true
283
+ }
284
+ }
285
+ if c .netmapDirty {
286
+ c .Broadcast ()
287
+ }
288
+ }
289
+
290
+ func (c * configMaps ) status () * ipnstate.Status {
291
+ sb := & ipnstate.StatusBuilder {WantPeers : true }
292
+ c .engine .UpdateStatus (sb )
293
+ return sb .Status ()
294
+ }
295
+
296
+ func (c * configMaps ) updatePeerLocked (update * proto.CoordinateResponse_PeerUpdate , status * ipnstate.Status ) (dirty bool ) {
297
+ id , err := uuid .FromBytes (update .Id )
298
+ if err != nil {
299
+ c .logger .Critical (context .Background (), "received update with bad id" , slog .F ("id" , update .Id ))
300
+ return false
301
+ }
302
+ logger := c .logger .With (slog .F ("peer_id" , id ))
303
+ lc , ok := c .peers [id ]
304
+ var node * tailcfg.Node
305
+ if update .Kind == proto .CoordinateResponse_PeerUpdate_NODE {
306
+ // If no preferred DERP is provided, we can't reach the node.
307
+ if update .Node .PreferredDerp == 0 {
308
+ logger .Warn (context .Background (), "no preferred DERP, peer update" , slog .F ("node_proto" , update .Node ))
309
+ return false
310
+ }
311
+ node , err = c .protoNodeToTailcfg (update .Node )
312
+ if err != nil {
313
+ logger .Critical (context .Background (), "failed to convert proto node to tailcfg" , slog .F ("node_proto" , update .Node ))
314
+ return false
315
+ }
316
+ logger = logger .With (slog .F ("key_id" , node .Key .ShortString ()), slog .F ("node" , node ))
317
+ peerStatus , ok := status .Peer [node .Key ]
318
+ // Starting KeepAlive messages at the initialization of a connection
319
+ // causes a race condition. If we send the handshake before the peer has
320
+ // our node, we'll have to wait for 5 seconds before trying again.
321
+ // Ideally, the first handshake starts when the user first initiates a
322
+ // connection to the peer. After a successful connection we enable
323
+ // keep alives to persist the connection and keep it from becoming idle.
324
+ // SSH connections don't send packets while idle, so we use keep alives
325
+ // to avoid random hangs while we set up the connection again after
326
+ // inactivity.
327
+ node .KeepAlive = ok && peerStatus .Active
328
+ if c .blockEndpoints {
329
+ node .Endpoints = nil
330
+ }
331
+ }
332
+ switch {
333
+ case ! ok && update .Kind == proto .CoordinateResponse_PeerUpdate_NODE :
334
+ // new!
335
+ var lastHandshake time.Time
336
+ if ps , ok := status .Peer [node .Key ]; ok {
337
+ lastHandshake = ps .LastHandshake
338
+ }
339
+ c .peers [id ] = & peerLifecycle {
340
+ peerID : id ,
341
+ node : node ,
342
+ lastHandshake : lastHandshake ,
343
+ lost : false ,
344
+ }
345
+ logger .Debug (context .Background (), "adding new peer" )
346
+ return true
347
+ case ok && update .Kind == proto .CoordinateResponse_PeerUpdate_NODE :
348
+ // update
349
+ node .Created = lc .node .Created
350
+ dirty = ! lc .node .Equal (node )
351
+ lc .node = node
352
+ lc .lost = false
353
+ lc .resetTimer ()
354
+ logger .Debug (context .Background (), "node update to existing peer" , slog .F ("dirty" , dirty ))
355
+ return dirty
356
+ case ! ok :
357
+ // disconnected or lost, but we don't have the node. No op
358
+ logger .Debug (context .Background (), "skipping update for peer we don't recognize" )
359
+ return false
360
+ case update .Kind == proto .CoordinateResponse_PeerUpdate_DISCONNECTED :
361
+ lc .resetTimer ()
362
+ delete (c .peers , id )
363
+ logger .Debug (context .Background (), "disconnected peer" )
364
+ return true
365
+ case update .Kind == proto .CoordinateResponse_PeerUpdate_LOST :
366
+ lc .lost = true
367
+ lc .setLostTimer (c )
368
+ logger .Debug (context .Background (), "marked peer lost" )
369
+ // marking a node lost doesn't change anything right now, so dirty=false
370
+ return false
371
+ default :
372
+ logger .Warn (context .Background (), "unknown peer update" , slog .F ("kind" , update .Kind ))
373
+ return false
374
+ }
375
+ }
376
+
377
+ func (c * configMaps ) peerLostTimeout (id uuid.UUID ) {
378
+ logger := c .logger .With (slog .F ("peer_id" , id ))
379
+ logger .Debug (context .Background (),
380
+ "peer lost timeout" )
381
+
382
+ // First do a status update to see if the peer did a handshake while we were
383
+ // waiting
384
+ status := c .status ()
385
+ c .L .Lock ()
386
+ defer c .L .Unlock ()
387
+
388
+ lc , ok := c .peers [id ]
389
+ if ! ok {
390
+ logger .Debug (context .Background (),
391
+ "timeout triggered for peer that is removed from the map" )
392
+ return
393
+ }
394
+ if peerStatus , ok := status .Peer [lc .node .Key ]; ok {
395
+ lc .lastHandshake = peerStatus .LastHandshake
396
+ }
397
+ logger = logger .With (slog .F ("key_id" , lc .node .Key .ShortString ()))
398
+ if ! lc .lost {
399
+ logger .Debug (context .Background (),
400
+ "timeout triggered for peer that is no longer lost" )
401
+ return
402
+ }
403
+ since := c .clock .Since (lc .lastHandshake )
404
+ if since >= lostTimeout {
405
+ logger .Info (
406
+ context .Background (), "removing lost peer" )
407
+ delete (c .peers , id )
408
+ c .netmapDirty = true
409
+ c .Broadcast ()
410
+ return
411
+ }
412
+ logger .Debug (context .Background (),
413
+ "timeout triggered for peer but it had handshake in meantime" )
414
+ lc .setLostTimer (c )
415
+ }
416
+
417
+ func (c * configMaps ) protoNodeToTailcfg (p * proto.Node ) (* tailcfg.Node , error ) {
418
+ node , err := ProtoToNode (p )
419
+ if err != nil {
420
+ return nil , err
421
+ }
422
+ return & tailcfg.Node {
423
+ ID : tailcfg .NodeID (p .GetId ()),
424
+ Created : c .clock .Now (),
425
+ Key : node .Key ,
426
+ DiscoKey : node .DiscoKey ,
427
+ Addresses : node .Addresses ,
428
+ AllowedIPs : node .AllowedIPs ,
429
+ Endpoints : node .Endpoints ,
430
+ DERP : fmt .Sprintf ("%s:%d" , tailcfg .DerpMagicIP , node .PreferredDERP ),
431
+ Hostinfo : (& tailcfg.Hostinfo {}).View (),
432
+ }, nil
433
+ }
434
+
251
435
type peerLifecycle struct {
252
- node * tailcfg.Node
253
- // TODO: implement timers to track lost peers
254
- // lastHandshake time.Time
255
- // timer time.Timer
436
+ peerID uuid.UUID
437
+ node * tailcfg.Node
438
+ lost bool
439
+ lastHandshake time.Time
440
+ timer * clock.Timer
441
+ }
442
+
443
+ func (l * peerLifecycle ) resetTimer () {
444
+ if l .timer != nil {
445
+ l .timer .Stop ()
446
+ l .timer = nil
447
+ }
448
+ }
449
+
450
+ func (l * peerLifecycle ) setLostTimer (c * configMaps ) {
451
+ if l .timer != nil {
452
+ l .timer .Stop ()
453
+ }
454
+ ttl := lostTimeout - c .clock .Since (l .lastHandshake )
455
+ if ttl <= 0 {
456
+ ttl = time .Nanosecond
457
+ }
458
+ l .timer = c .clock .AfterFunc (ttl , func () {
459
+ c .peerLostTimeout (l .peerID )
460
+ })
256
461
}
257
462
258
463
// prefixesDifferent returns true if the two slices contain different prefixes
0 commit comments