Skip to content

Commit d52fab0

Browse files
committed
chore: add nodeUpdater to tailnet
1 parent 15eef5e commit d52fab0

File tree

4 files changed

+256
-8
lines changed

4 files changed

+256
-8
lines changed

tailnet/configmaps.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@ const (
4848
closed
4949
)
5050

51-
type configMaps struct {
51+
type phased struct {
5252
sync.Cond
53+
phase phase
54+
}
55+
56+
type configMaps struct {
57+
phased
5358
netmapDirty bool
5459
derpMapDirty bool
5560
filterDirty bool
5661
closing bool
57-
phase phase
5862

5963
engine engineConfigurable
6064
static netmap.NetworkMap
@@ -71,7 +75,7 @@ type configMaps struct {
7175
func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps {
7276
pubKey := nodeKey.Public()
7377
c := &configMaps{
74-
Cond: *(sync.NewCond(&sync.Mutex{})),
78+
phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))},
7579
logger: logger,
7680
engine: engine,
7781
static: netmap.NetworkMap{

tailnet/configmaps_internal_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestConfigMaps_setAddresses_same(t *testing.T) {
9696
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), addrs)
9797
defer uut.close()
9898

99-
requireNeverConfigures(ctx, t, uut)
99+
requireNeverConfigures(ctx, t, &uut.phased)
100100

101101
uut.setAddresses(addrs)
102102

@@ -190,7 +190,7 @@ func TestConfigMaps_updatePeers_same(t *testing.T) {
190190
defer uut.close()
191191

192192
// Then: we don't configure
193-
requireNeverConfigures(ctx, t, uut)
193+
requireNeverConfigures(ctx, t, &uut.phased)
194194

195195
p1ID := uuid.MustParse("10000000-0000-0000-0000-000000000000")
196196
p1Node := newTestNode(1)
@@ -558,7 +558,7 @@ func TestConfigMaps_setBlockEndpoints_same(t *testing.T) {
558558
uut.L.Unlock()
559559

560560
// Then: we don't configure
561-
requireNeverConfigures(ctx, t, uut)
561+
requireNeverConfigures(ctx, t, &uut.phased)
562562

563563
// When we set blockEndpoints to true
564564
uut.setBlockEndpoints(true)
@@ -619,7 +619,7 @@ func TestConfigMaps_updatePeers_nonexist(t *testing.T) {
619619
defer uut.close()
620620

621621
// Then: we don't configure
622-
requireNeverConfigures(ctx, t, uut)
622+
requireNeverConfigures(ctx, t, &uut.phased)
623623

624624
// Given: no known peers
625625
go func() {
@@ -669,7 +669,7 @@ func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tail
669669
return nil
670670
}
671671

672-
func requireNeverConfigures(ctx context.Context, t *testing.T, uut *configMaps) {
672+
func requireNeverConfigures(ctx context.Context, t *testing.T, uut *phased) {
673673
t.Helper()
674674
waiting := make(chan struct{})
675675
go func() {

tailnet/node.go

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package tailnet
2+
3+
import (
4+
"context"
5+
"net/netip"
6+
"sync"
7+
8+
"golang.org/x/exp/maps"
9+
"golang.org/x/exp/slices"
10+
"tailscale.com/tailcfg"
11+
"tailscale.com/types/key"
12+
13+
"cdr.dev/slog"
14+
"github.com/coder/coder/v2/coderd/database/dbtime"
15+
)
16+
17+
type nodeUpdater struct {
18+
phased
19+
dirty bool
20+
closing bool
21+
22+
// static
23+
logger slog.Logger
24+
id tailcfg.NodeID
25+
key key.NodePublic
26+
discoKey key.DiscoPublic
27+
callback func(n *Node)
28+
29+
// dynamic
30+
preferredDERP int
31+
derpLatency map[string]float64
32+
derpForcedWebsockets map[int]string
33+
endpoints []string
34+
addresses []netip.Prefix
35+
}
36+
37+
// updateLoop waits until the config is dirty and then calls the callback with the newest node.
38+
// It is intended only to be called internally, and shuts down when close() is called.
39+
func (u *nodeUpdater) updateLoop() {
40+
u.L.Lock()
41+
defer u.L.Unlock()
42+
defer func() {
43+
u.phase = closed
44+
u.Broadcast()
45+
}()
46+
for {
47+
for !(u.closing || u.dirty) {
48+
u.phase = idle
49+
u.Wait()
50+
}
51+
if u.closing {
52+
return
53+
}
54+
node := u.nodeLocked()
55+
u.dirty = false
56+
u.phase = configuring
57+
u.Broadcast()
58+
59+
// We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending
60+
// the node without this, and we can save ourselves from churn in the tailscale/wireguard
61+
// layer.
62+
if node.PreferredDERP == 0 {
63+
u.logger.Debug(context.Background(), "skipped sending node; no PreferredDERP", slog.F("node", node))
64+
continue
65+
}
66+
67+
u.L.Unlock()
68+
u.callback(node)
69+
u.L.Lock()
70+
}
71+
}
72+
73+
// close closes the nodeUpdate and stops it calling the node callback
74+
func (u *nodeUpdater) close() {
75+
u.L.Lock()
76+
defer u.L.Unlock()
77+
u.closing = true
78+
u.Broadcast()
79+
for u.phase != closed {
80+
u.Wait()
81+
}
82+
}
83+
84+
func newNodeUpdater(
85+
logger slog.Logger, callback func(n *Node),
86+
id tailcfg.NodeID, np key.NodePublic, dp key.DiscoPublic,
87+
) *nodeUpdater {
88+
u := &nodeUpdater{
89+
phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))},
90+
logger: logger,
91+
id: id,
92+
key: np,
93+
discoKey: dp,
94+
callback: callback,
95+
}
96+
go u.updateLoop()
97+
return u
98+
}
99+
100+
// nodeLocked returns the current best node information. u.L must be held.
101+
func (u *nodeUpdater) nodeLocked() *Node {
102+
return &Node{
103+
ID: u.id,
104+
AsOf: dbtime.Now(),
105+
Key: u.key,
106+
Addresses: slices.Clone(u.addresses),
107+
AllowedIPs: slices.Clone(u.addresses),
108+
DiscoKey: u.discoKey,
109+
Endpoints: slices.Clone(u.endpoints),
110+
PreferredDERP: u.preferredDERP,
111+
DERPLatency: maps.Clone(u.derpLatency),
112+
DERPForcedWebsocket: maps.Clone(u.derpForcedWebsockets),
113+
}
114+
}
115+
116+
// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST
117+
// NOT be held.
118+
func (u *nodeUpdater) setNetInfo(ni *tailcfg.NetInfo) {
119+
u.L.Lock()
120+
defer u.L.Unlock()
121+
dirty := false
122+
if u.preferredDERP != ni.PreferredDERP {
123+
dirty = true
124+
u.preferredDERP = ni.PreferredDERP
125+
}
126+
if !maps.Equal(u.derpLatency, ni.DERPLatency) {
127+
dirty = true
128+
u.derpLatency = ni.DERPLatency
129+
}
130+
if dirty {
131+
u.dirty = true
132+
u.Broadcast()
133+
}
134+
}

tailnet/node_internal_test.go

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package tailnet
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"golang.org/x/exp/maps"
8+
"tailscale.com/tailcfg"
9+
"tailscale.com/types/key"
10+
11+
"cdr.dev/slog"
12+
"cdr.dev/slog/sloggers/slogtest"
13+
"github.com/coder/coder/v2/testutil"
14+
)
15+
16+
func TestNodeUpdater_setNetInfo_different(t *testing.T) {
17+
t.Parallel()
18+
ctx := testutil.Context(t, testutil.WaitShort)
19+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
20+
id := tailcfg.NodeID(1)
21+
nodeKey := key.NewNode().Public()
22+
discoKey := key.NewDisco().Public()
23+
nodeCh := make(chan *Node)
24+
goCh := make(chan struct{})
25+
uut := newNodeUpdater(
26+
logger,
27+
func(n *Node) {
28+
nodeCh <- n
29+
<-goCh
30+
},
31+
id, nodeKey, discoKey,
32+
)
33+
defer uut.close()
34+
35+
dl := map[string]float64{"1": 0.025}
36+
uut.setNetInfo(&tailcfg.NetInfo{
37+
PreferredDERP: 1,
38+
DERPLatency: dl,
39+
})
40+
41+
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
42+
require.Equal(t, nodeKey, node.Key)
43+
require.Equal(t, discoKey, node.DiscoKey)
44+
require.Equal(t, 1, node.PreferredDERP)
45+
require.True(t, maps.Equal(dl, node.DERPLatency))
46+
47+
// Send in second update to test getting updates in the middle of the
48+
// callback
49+
uut.setNetInfo(&tailcfg.NetInfo{
50+
PreferredDERP: 2,
51+
DERPLatency: dl,
52+
})
53+
close(goCh) // allows callback to complete
54+
55+
node = testutil.RequireRecvCtx(ctx, t, nodeCh)
56+
require.Equal(t, nodeKey, node.Key)
57+
require.Equal(t, discoKey, node.DiscoKey)
58+
require.Equal(t, 2, node.PreferredDERP)
59+
require.True(t, maps.Equal(dl, node.DERPLatency))
60+
61+
done := make(chan struct{})
62+
go func() {
63+
defer close(done)
64+
uut.close()
65+
}()
66+
_ = testutil.RequireRecvCtx(ctx, t, done)
67+
}
68+
69+
func TestNodeUpdater_setNetInfo_same(t *testing.T) {
70+
t.Parallel()
71+
ctx := testutil.Context(t, testutil.WaitShort)
72+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
73+
id := tailcfg.NodeID(1)
74+
nodeKey := key.NewNode().Public()
75+
discoKey := key.NewDisco().Public()
76+
nodeCh := make(chan *Node)
77+
goCh := make(chan struct{})
78+
uut := newNodeUpdater(
79+
logger,
80+
func(n *Node) {
81+
nodeCh <- n
82+
<-goCh
83+
},
84+
id, nodeKey, discoKey,
85+
)
86+
defer uut.close()
87+
88+
// Then: we don't configure
89+
requireNeverConfigures(ctx, t, &uut.phased)
90+
91+
// Given: preferred DERP and latency already set
92+
dl := map[string]float64{"1": 0.025}
93+
uut.L.Lock()
94+
uut.preferredDERP = 1
95+
uut.derpLatency = maps.Clone(dl)
96+
uut.L.Unlock()
97+
98+
// When: new update with same info
99+
uut.setNetInfo(&tailcfg.NetInfo{
100+
PreferredDERP: 1,
101+
DERPLatency: dl,
102+
})
103+
104+
done := make(chan struct{})
105+
go func() {
106+
defer close(done)
107+
uut.close()
108+
}()
109+
_ = testutil.RequireRecvCtx(ctx, t, done)
110+
}

0 commit comments

Comments
 (0)