Skip to content

Commit b9d6a1f

Browse files
committed
chore: add setStatus support to nodeUpdater
1 parent bc2aadf commit b9d6a1f

File tree

2 files changed

+208
-0
lines changed

2 files changed

+208
-0
lines changed

tailnet/node.go

+30
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"context"
55
"net/netip"
66
"sync"
7+
"time"
78

89
"golang.org/x/exp/maps"
910
"golang.org/x/exp/slices"
1011
"tailscale.com/tailcfg"
1112
"tailscale.com/types/key"
13+
"tailscale.com/wgengine"
1214

1315
"cdr.dev/slog"
1416
"github.com/coder/coder/v2/coderd/database/dbtime"
@@ -32,6 +34,7 @@ type nodeUpdater struct {
3234
derpForcedWebsockets map[int]string
3335
endpoints []string
3436
addresses []netip.Prefix
37+
lastStatus time.Time
3538
}
3639

3740
// updateLoop waits until the config is dirty and then calls the callback with the newest node.
@@ -146,3 +149,30 @@ func (u *nodeUpdater) setDERPForcedWebsocket(region int, reason string) {
146149
u.Broadcast()
147150
}
148151
}
152+
153+
// setStatus handles the status callback from the wireguard engine to learn about new endpoints
154+
// (e.g. discovered by STUN)
155+
func (u *nodeUpdater) setStatus(s *wgengine.Status, err error) {
156+
u.logger.Debug(context.Background(), "wireguard status", slog.F("status", s), slog.Error(err))
157+
if err != nil {
158+
return
159+
}
160+
u.L.Lock()
161+
defer u.L.Unlock()
162+
if s.AsOf.Before(u.lastStatus) {
163+
// Don't process outdated status!
164+
return
165+
}
166+
u.lastStatus = s.AsOf
167+
endpoints := make([]string, len(s.LocalAddrs))
168+
for i, ep := range s.LocalAddrs {
169+
endpoints[i] = ep.Addr.String()
170+
}
171+
if slices.Equal(endpoints, u.endpoints) {
172+
// No need to update the node if nothing changed!
173+
return
174+
}
175+
u.endpoints = endpoints
176+
u.dirty = true
177+
u.Broadcast()
178+
}

tailnet/node_internal_test.go

+178
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
package tailnet
22

33
import (
4+
"net/netip"
45
"testing"
6+
"time"
7+
8+
"golang.org/x/xerrors"
9+
10+
"golang.org/x/exp/slices"
11+
12+
"tailscale.com/wgengine"
513

614
"github.com/stretchr/testify/require"
715
"golang.org/x/exp/maps"
@@ -183,3 +191,173 @@ func TestNodeUpdater_setDERPForcedWebsocket_same(t *testing.T) {
183191
}()
184192
_ = testutil.RequireRecvCtx(ctx, t, done)
185193
}
194+
195+
func TestNodeUpdater_setStatus_different(t *testing.T) {
196+
t.Parallel()
197+
ctx := testutil.Context(t, testutil.WaitShort)
198+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
199+
id := tailcfg.NodeID(1)
200+
nodeKey := key.NewNode().Public()
201+
discoKey := key.NewDisco().Public()
202+
nodeCh := make(chan *Node)
203+
uut := newNodeUpdater(
204+
logger,
205+
func(n *Node) {
206+
nodeCh <- n
207+
},
208+
id, nodeKey, discoKey,
209+
)
210+
defer uut.close()
211+
212+
// Given: preferred DERP is 1, so we'll send an update
213+
uut.L.Lock()
214+
uut.preferredDERP = 1
215+
uut.L.Unlock()
216+
217+
// When: we set a new status
218+
asof := time.Date(2024, 1, 10, 8, 0o0, 1, 1, time.UTC)
219+
uut.setStatus(&wgengine.Status{
220+
LocalAddrs: []tailcfg.Endpoint{
221+
{Addr: netip.MustParseAddrPort("[fe80::1]:5678")},
222+
},
223+
AsOf: asof,
224+
}, nil)
225+
226+
// Then: we receive an update with the endpoint
227+
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
228+
require.Equal(t, nodeKey, node.Key)
229+
require.Equal(t, discoKey, node.DiscoKey)
230+
require.True(t, slices.Equal([]string{"[fe80::1]:5678"}, node.Endpoints))
231+
232+
// Then: we store the AsOf time as lastStatus
233+
uut.L.Lock()
234+
require.Equal(t, uut.lastStatus, asof)
235+
uut.L.Unlock()
236+
237+
done := make(chan struct{})
238+
go func() {
239+
defer close(done)
240+
uut.close()
241+
}()
242+
_ = testutil.RequireRecvCtx(ctx, t, done)
243+
}
244+
245+
func TestNodeUpdater_setStatus_same(t *testing.T) {
246+
t.Parallel()
247+
ctx := testutil.Context(t, testutil.WaitShort)
248+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
249+
id := tailcfg.NodeID(1)
250+
nodeKey := key.NewNode().Public()
251+
discoKey := key.NewDisco().Public()
252+
nodeCh := make(chan *Node)
253+
uut := newNodeUpdater(
254+
logger,
255+
func(n *Node) {
256+
nodeCh <- n
257+
},
258+
id, nodeKey, discoKey,
259+
)
260+
defer uut.close()
261+
262+
// Then: we don't configure
263+
requireNeverConfigures(ctx, t, &uut.phased)
264+
265+
// Given: preferred DERP is 1, so we would send an update on change &&
266+
// endpoints set to {"[fe80::1]:5678"}
267+
uut.L.Lock()
268+
uut.preferredDERP = 1
269+
uut.endpoints = []string{"[fe80::1]:5678"}
270+
uut.L.Unlock()
271+
272+
// When: we set a status with endpoints {[fe80::1]:5678}
273+
uut.setStatus(&wgengine.Status{LocalAddrs: []tailcfg.Endpoint{
274+
{Addr: netip.MustParseAddrPort("[fe80::1]:5678")},
275+
}}, nil)
276+
277+
done := make(chan struct{})
278+
go func() {
279+
defer close(done)
280+
uut.close()
281+
}()
282+
_ = testutil.RequireRecvCtx(ctx, t, done)
283+
}
284+
285+
func TestNodeUpdater_setStatus_error(t *testing.T) {
286+
t.Parallel()
287+
ctx := testutil.Context(t, testutil.WaitShort)
288+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
289+
id := tailcfg.NodeID(1)
290+
nodeKey := key.NewNode().Public()
291+
discoKey := key.NewDisco().Public()
292+
nodeCh := make(chan *Node)
293+
uut := newNodeUpdater(
294+
logger,
295+
func(n *Node) {
296+
nodeCh <- n
297+
},
298+
id, nodeKey, discoKey,
299+
)
300+
defer uut.close()
301+
302+
// Then: we don't configure
303+
requireNeverConfigures(ctx, t, &uut.phased)
304+
305+
// Given: preferred DERP is 1, so we would send an update on change && empty endpoints
306+
uut.L.Lock()
307+
uut.preferredDERP = 1
308+
uut.L.Unlock()
309+
310+
// When: we set a status with endpoints {[fe80::1]:5678}, with an error
311+
uut.setStatus(&wgengine.Status{LocalAddrs: []tailcfg.Endpoint{
312+
{Addr: netip.MustParseAddrPort("[fe80::1]:5678")},
313+
}}, xerrors.New("test"))
314+
315+
done := make(chan struct{})
316+
go func() {
317+
defer close(done)
318+
uut.close()
319+
}()
320+
_ = testutil.RequireRecvCtx(ctx, t, done)
321+
}
322+
323+
func TestNodeUpdater_setStatus_outdated(t *testing.T) {
324+
t.Parallel()
325+
ctx := testutil.Context(t, testutil.WaitShort)
326+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
327+
id := tailcfg.NodeID(1)
328+
nodeKey := key.NewNode().Public()
329+
discoKey := key.NewDisco().Public()
330+
nodeCh := make(chan *Node)
331+
uut := newNodeUpdater(
332+
logger,
333+
func(n *Node) {
334+
nodeCh <- n
335+
},
336+
id, nodeKey, discoKey,
337+
)
338+
defer uut.close()
339+
340+
// Then: we don't configure
341+
requireNeverConfigures(ctx, t, &uut.phased)
342+
343+
// Given: preferred DERP is 1, so we would send an update on change && lastStatus set ahead
344+
ahead := time.Date(2024, 1, 10, 8, 0o0, 1, 0, time.UTC)
345+
behind := time.Date(2024, 1, 10, 8, 0o0, 0, 0, time.UTC)
346+
uut.L.Lock()
347+
uut.preferredDERP = 1
348+
uut.lastStatus = ahead
349+
uut.L.Unlock()
350+
351+
// When: we set a status with endpoints {[fe80::1]:5678}, with AsOf set behind
352+
uut.setStatus(&wgengine.Status{
353+
LocalAddrs: []tailcfg.Endpoint{{Addr: netip.MustParseAddrPort("[fe80::1]:5678")}},
354+
AsOf: behind,
355+
}, xerrors.New("test"))
356+
357+
done := make(chan struct{})
358+
go func() {
359+
defer close(done)
360+
uut.close()
361+
}()
362+
_ = testutil.RequireRecvCtx(ctx, t, done)
363+
}

0 commit comments

Comments
 (0)