Skip to content

Commit 88416d2

Browse files
committed
comments
1 parent 298655b commit 88416d2

File tree

5 files changed

+83
-56
lines changed

5 files changed

+83
-56
lines changed

codersdk/workspacesdk/connector.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ import (
2727
type tailnetConn interface {
2828
tailnet.Coordinatee
2929
SetDERPMap(derpMap *tailcfg.DERPMap)
30-
// SetTunnelDestination indicates to tailnet that the peer id is a
31-
// destination.
32-
SetTunnelDestination(id uuid.UUID)
3330
}
3431

3532
// tailnetAPIConnector dials the tailnet API (v2+) and then uses the API with a tailnet.Conn to
@@ -78,8 +75,6 @@ func runTailnetAPIConnector(
7875
connected: make(chan error, 1),
7976
closed: make(chan struct{}),
8077
}
81-
// TODO: reenable in upstack pr
82-
// conn.SetTunnelDestination(agentID)
8378
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
8479
go tac.manageGracefulTimeout()
8580
go tac.run()

tailnet/configmaps.go

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (c *configMaps) close() {
186186
c.L.Lock()
187187
defer c.L.Unlock()
188188
for _, lc := range c.peers {
189-
lc.resetTimer()
189+
lc.resetLostTimer()
190190
}
191191
c.closing = true
192192
c.Broadcast()
@@ -398,17 +398,7 @@ func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdat
398398
return false
399399
}
400400
logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node))
401-
peerStatus, statusOk := status.Peer[node.Key]
402-
// Starting KeepAlive messages at the initialization of a connection
403-
// causes a race condition. If we send the handshake before the peer has
404-
// our node, we'll have to wait for 5 seconds before trying again.
405-
// Ideally, the first handshake starts when the user first initiates a
406-
// connection to the peer. After a successful connection we enable
407-
// keep alives to persist the connection and keep it from becoming idle.
408-
// SSH connections don't send packets while idle, so we use keep alives
409-
// to avoid random hangs while we set up the connection again after
410-
// inactivity.
411-
node.KeepAlive = (statusOk && peerStatus.Active) || (peerOk && lc.node != nil && lc.node.KeepAlive)
401+
node.KeepAlive = c.nodeKeepalive(lc, status, node)
412402
}
413403
switch {
414404
case !peerOk && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
@@ -422,31 +412,27 @@ func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdat
422412
node: node,
423413
lastHandshake: lastHandshake,
424414
lost: false,
425-
// If we're receiving a NODE update for a peer we don't already have
426-
// a lifecycle for, it's likely the source of a tunnel. We don't
427-
// need to wait for a READY_FOR_HANDSHAKE.
428-
readyForHandshake: true,
429415
}
430416
c.peers[id] = lc
431417
logger.Debug(context.Background(), "adding new peer")
432-
// since we just got this node, we don't know if it's ready for
433-
// handshakes yet.
434-
return lc.readyForHandshake
418+
return lc.validForWireguard()
435419
case peerOk && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
436420
// update
437421
if lc.node != nil {
438422
node.Created = lc.node.Created
439423
}
440-
dirty = !lc.node.Equal(node) && lc.readyForHandshake
424+
dirty = !lc.node.Equal(node)
441425
lc.node = node
426+
// validForWireguard checks that the node is non-nil, so should be
427+
// called after we update the node.
428+
dirty = dirty && lc.validForWireguard()
442429
lc.lost = false
443-
lc.resetTimer()
444-
if lc.isDestination {
445-
if !lc.readyForHandshake {
446-
lc.setReadyForHandshakeTimer(c)
447-
} else {
448-
lc.node.KeepAlive = true
449-
}
430+
lc.resetLostTimer()
431+
if lc.isDestination && !lc.readyForHandshake {
432+
// We received the node of a destination peer before we've received
433+
// their READY_FOR_HANDSHAKE. Set a timer
434+
lc.setReadyForHandshakeTimer(c)
435+
logger.Debug(context.Background(), "setting ready for handshake timeout")
450436
}
451437
logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty))
452438
return dirty
@@ -455,7 +441,6 @@ func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdat
455441
lc.readyForHandshake = true
456442
if lc.readyForHandshakeTimer != nil {
457443
lc.readyForHandshakeTimer.Stop()
458-
lc.readyForHandshakeTimer = nil
459444
}
460445
if lc.node != nil {
461446
dirty = dirty || !lc.node.KeepAlive
@@ -475,16 +460,14 @@ func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdat
475460
lost: true,
476461
readyForHandshake: true,
477462
}
478-
// Timeout the peer in case we never receive a NODE update for it.
479-
lc.setLostTimer(c)
480463
c.peers[id] = lc
481464
return false
482465
case !peerOk:
483466
// disconnected or lost, but we don't have the node. No op
484467
logger.Debug(context.Background(), "skipping update for peer we don't recognize")
485468
return false
486469
case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
487-
lc.resetTimer()
470+
lc.resetLostTimer()
488471
delete(c.peers, id)
489472
logger.Debug(context.Background(), "disconnected peer")
490473
return true
@@ -607,39 +590,58 @@ func (c *configMaps) fillPeerDiagnostics(d *PeerDiagnostics, peerID uuid.UUID) {
607590
}
608591
}
609592
lc, ok := c.peers[peerID]
610-
if !ok {
593+
if !ok || lc.node == nil {
611594
return
612595
}
613596

614597
d.ReceivedNode = lc.node
615-
if lc.node != nil {
616-
ps, ok := status.Peer[lc.node.Key]
617-
if !ok {
618-
return
619-
}
620-
d.LastWireguardHandshake = ps.LastHandshake
598+
ps, ok := status.Peer[lc.node.Key]
599+
if !ok {
600+
return
621601
}
622-
return
602+
d.LastWireguardHandshake = ps.LastHandshake
623603
}
624604

625605
func (c *configMaps) peerReadyForHandshakeTimeout(peerID uuid.UUID) {
606+
logger := c.logger.With(slog.F("peer_id", peerID))
607+
logger.Debug(context.Background(), "peer ready for handshake timeout")
626608
c.L.Lock()
627609
defer c.L.Unlock()
628610
lc, ok := c.peers[peerID]
629611
if !ok {
612+
logger.Debug(context.Background(),
613+
"ready for handshake timeout triggered for peer that is removed from the map")
630614
return
631615
}
632-
if lc.readyForHandshakeTimer != nil {
633-
wasReady := lc.readyForHandshake
634-
lc.readyForHandshakeTimer = nil
635-
lc.readyForHandshake = true
636-
if !wasReady {
637-
c.netmapDirty = true
638-
c.Broadcast()
639-
}
616+
617+
wasReady := lc.readyForHandshake
618+
lc.readyForHandshake = true
619+
if !wasReady {
620+
logger.Info(context.Background(), "setting peer ready for handshake after timeout")
621+
c.netmapDirty = true
622+
c.Broadcast()
640623
}
641624
}
642625

626+
func (*configMaps) nodeKeepalive(lc *peerLifecycle, status *ipnstate.Status, node *tailcfg.Node) bool {
627+
// If the peer is already active, keepalives should be enabled.
628+
if peerStatus, statusOk := status.Peer[node.Key]; statusOk && peerStatus.Active {
629+
return true
630+
}
631+
// If the peer is a destination, we should only enable keepalives if we've
632+
// received the READY_FOR_HANDSHAKE.
633+
if lc != nil && lc.isDestination && lc.readyForHandshake {
634+
return true
635+
}
636+
// If keepalives are already enabled on the node, keep them enabled.
637+
if lc != nil && lc.node != nil && lc.node.KeepAlive {
638+
return true
639+
}
640+
641+
// If none of the above are true, keepalives should not be enabled.
642+
return false
643+
}
644+
643645
type peerLifecycle struct {
644646
peerID uuid.UUID
645647
// isDestination specifies if the peer is a destination, meaning we
@@ -658,7 +660,7 @@ type peerLifecycle struct {
658660
readyForHandshakeTimer *clock.Timer
659661
}
660662

661-
func (l *peerLifecycle) resetTimer() {
663+
func (l *peerLifecycle) resetLostTimer() {
662664
if l.lostTimer != nil {
663665
l.lostTimer.Stop()
664666
l.lostTimer = nil
@@ -678,13 +680,28 @@ func (l *peerLifecycle) setLostTimer(c *configMaps) {
678680
})
679681
}
680682

683+
const readyForHandshakeTimeout = 5 * time.Second
684+
681685
func (l *peerLifecycle) setReadyForHandshakeTimer(c *configMaps) {
682-
l.readyForHandshakeTimer = c.clock.AfterFunc(5*time.Second, func() {
686+
if l.readyForHandshakeTimer != nil {
687+
l.readyForHandshakeTimer.Stop()
688+
}
689+
l.readyForHandshakeTimer = c.clock.AfterFunc(readyForHandshakeTimeout, func() {
683690
c.logger.Debug(context.Background(), "ready for handshake timeout", slog.F("peer_id", l.peerID))
684691
c.peerReadyForHandshakeTimeout(l.peerID)
685692
})
686693
}
687694

695+
// validForWireguard returns true if the peer is ready to be programmed into
696+
// wireguard.
697+
func (l *peerLifecycle) validForWireguard() bool {
698+
valid := l.node != nil
699+
if l.isDestination {
700+
return valid && l.readyForHandshake
701+
}
702+
return valid
703+
}
704+
688705
// prefixesDifferent returns true if the two slices contain different prefixes
689706
// where order doesn't matter.
690707
func prefixesDifferent(a, b []netip.Prefix) bool {

tailnet/conn.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ type Options struct {
8787
// connections, rather than trying `Upgrade: derp` first and potentially
8888
// falling back. This is useful for misbehaving proxies that prevent
8989
// fallback due to odd behavior, like Azure App Proxy.
90-
DERPForceWebSockets bool
91-
ShouldWaitReadyForHandshake bool
90+
DERPForceWebSockets bool
9291
// BlockEndpoints specifies whether P2P endpoints are blocked.
9392
// If so, only DERPs can establish connections.
9493
BlockEndpoints bool

tailnet/coordinator.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ type Coordinatee interface {
9999
UpdatePeers([]*proto.CoordinateResponse_PeerUpdate) error
100100
SetAllPeersLost()
101101
SetNodeCallback(func(*Node))
102+
// SetTunnelDestination indicates to tailnet that the peer id is a
103+
// destination.
104+
SetTunnelDestination(id uuid.UUID)
102105
}
103106

104107
type Coordination interface {
@@ -212,6 +215,8 @@ func NewRemoteCoordination(logger slog.Logger,
212215
respLoopDone: make(chan struct{}),
213216
}
214217
if tunnelTarget != uuid.Nil {
218+
// TODO: reenable in upstack PR
219+
// c.coordinatee.SetTunnelDestination(tunnelTarget)
215220
c.Lock()
216221
err := c.protocol.Send(&proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: tunnelTarget[:]}})
217222
c.Unlock()

tailnet/coordinator_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,7 @@ type fakeCoordinatee struct {
848848
callback func(*tailnet.Node)
849849
updates [][]*proto.CoordinateResponse_PeerUpdate
850850
setAllPeersLostCalls int
851+
tunnelDestinations map[uuid.UUID]struct{}
851852
}
852853

853854
func (f *fakeCoordinatee) UpdatePeers(updates []*proto.CoordinateResponse_PeerUpdate) error {
@@ -863,6 +864,16 @@ func (f *fakeCoordinatee) SetAllPeersLost() {
863864
f.setAllPeersLostCalls++
864865
}
865866

867+
func (f *fakeCoordinatee) SetTunnelDestination(id uuid.UUID) {
868+
f.Lock()
869+
defer f.Unlock()
870+
871+
if f.tunnelDestinations == nil {
872+
f.tunnelDestinations = map[uuid.UUID]struct{}{}
873+
}
874+
f.tunnelDestinations[id] = struct{}{}
875+
}
876+
866877
func (f *fakeCoordinatee) SetNodeCallback(callback func(*tailnet.Node)) {
867878
f.Lock()
868879
defer f.Unlock()

0 commit comments

Comments
 (0)