Skip to content

Commit a3f95e8

Browse files
committed
chore(vpn): upsert agents with their network status
1 parent 1ac2bae commit a3f95e8

File tree

5 files changed

+565
-157
lines changed

5 files changed

+565
-157
lines changed

vpn/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"net/http"
66
"net/netip"
77
"net/url"
8+
"time"
89

910
"golang.org/x/xerrors"
1011
"nhooyr.io/websocket"
12+
"tailscale.com/ipn/ipnstate"
1113
"tailscale.com/net/dns"
1214
"tailscale.com/wgengine/router"
1315

16+
"github.com/google/uuid"
1417
"github.com/tailscale/wireguard-go/tun"
1518

1619
"cdr.dev/slog"
@@ -23,6 +26,8 @@ import (
2326

2427
type Conn interface {
2528
CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error)
29+
GetPeerDiagnostics(peerID uuid.UUID) tailnet.PeerDiagnostics
30+
Ping(ctx context.Context, ip netip.Addr) (time.Duration, bool, *ipnstate.PingResult, error)
2631
Close() error
2732
}
2833

vpn/tunnel.go

Lines changed: 161 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,36 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"net/netip"
1011
"net/url"
1112
"reflect"
1213
"strconv"
1314
"sync"
15+
"time"
1416
"unicode"
1517

18+
"golang.org/x/exp/maps"
1619
"golang.org/x/xerrors"
20+
"google.golang.org/protobuf/types/known/durationpb"
21+
"google.golang.org/protobuf/types/known/timestamppb"
1722
"tailscale.com/net/dns"
23+
"tailscale.com/util/dnsname"
1824
"tailscale.com/wgengine/router"
1925

26+
"github.com/google/uuid"
27+
2028
"cdr.dev/slog"
2129
"github.com/coder/coder/v2/coderd/util/ptr"
2230
"github.com/coder/coder/v2/tailnet"
2331
)
2432

33+
// The interval at which the tunnel sends network status updates to the manager.
34+
const netStatusInterval = 30 * time.Second
35+
2536
type Tunnel struct {
2637
speaker[*TunnelMessage, *ManagerMessage, ManagerMessage]
2738
ctx context.Context
39+
netLoopDone chan struct{}
2840
requestLoopDone chan struct{}
2941

3042
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
3547
client Client
3648
conn Conn
3749

50+
mu sync.Mutex
51+
// agents contains the agents that are currently connected to the tunnel.
52+
agents map[uuid.UUID]*tailnet.Agent
53+
3854
// clientLogger is deliberately separate, to avoid the tunnel using itself
3955
// as a sink for it's own logs, which could lead to deadlocks
4056
clientLogger slog.Logger
@@ -65,14 +81,17 @@ func NewTunnel(
6581
logger: logger,
6682
clientLogger: slog.Make(),
6783
requestLoopDone: make(chan struct{}),
84+
netLoopDone: make(chan struct{}),
6885
client: client,
86+
agents: make(map[uuid.UUID]*tailnet.Agent),
6987
}
7088

7189
for _, opt := range opts {
7290
opt(t)
7391
}
7492
t.speaker.start()
7593
go t.requestLoop()
94+
go t.netStatusLoop()
7695
return t, nil
7796
}
7897

@@ -101,6 +120,20 @@ func (t *Tunnel) requestLoop() {
101120
}
102121
}
103122

123+
func (t *Tunnel) netStatusLoop() {
124+
ticker := time.NewTicker(netStatusInterval)
125+
defer ticker.Stop()
126+
defer close(t.netLoopDone)
127+
for {
128+
select {
129+
case <-t.ctx.Done():
130+
return
131+
case <-ticker.C:
132+
t.sendAgentUpdate()
133+
}
134+
}
135+
}
136+
104137
// handleRPC handles unary RPCs from the manager.
105138
func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
106139
resp := &TunnelMessage{}
@@ -111,8 +144,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
111144
if err != nil {
112145
t.logger.Critical(t.ctx, "failed to get current workspace state", slog.Error(err))
113146
}
147+
update, err := t.createPeerUpdate(state)
148+
if err != nil {
149+
t.logger.Error(t.ctx, "failed to populate agent network info", slog.Error(err))
150+
}
114151
resp.Msg = &TunnelMessage_PeerUpdate{
115-
PeerUpdate: convertWorkspaceUpdate(state),
152+
PeerUpdate: update,
116153
}
117154
return resp
118155
case *ManagerMessage_Start:
@@ -193,9 +230,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
193230
}
194231

195232
func (t *Tunnel) Update(update tailnet.WorkspaceUpdate) error {
233+
peerUpdate, err := t.createPeerUpdate(update)
234+
if err != nil {
235+
t.logger.Error(t.ctx, "failed to populate agent network info", slog.Error(err))
236+
}
196237
msg := &TunnelMessage{
197238
Msg: &TunnelMessage_PeerUpdate{
198-
PeerUpdate: convertWorkspaceUpdate(update),
239+
PeerUpdate: peerUpdate,
199240
},
200241
}
201242
select {
@@ -292,35 +333,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
292333
return l
293334
}
294335

295-
func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
336+
// createPeerUpdate creates a PeerUpdate message from a workspace update, populating
337+
// the network status of the agents.
338+
func (t *Tunnel) createPeerUpdate(update tailnet.WorkspaceUpdate) (*PeerUpdate, error) {
296339
out := &PeerUpdate{
297340
UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)),
298341
UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)),
299342
DeletedWorkspaces: make([]*Workspace, len(update.DeletedWorkspaces)),
300343
DeletedAgents: make([]*Agent, len(update.DeletedAgents)),
301344
}
345+
346+
t.saveUpdate(update)
347+
302348
for i, ws := range update.UpsertedWorkspaces {
303349
out.UpsertedWorkspaces[i] = &Workspace{
304350
Id: tailnet.UUIDToByteSlice(ws.ID),
305351
Name: ws.Name,
306352
Status: Workspace_Status(ws.Status),
307353
}
308354
}
309-
for i, agent := range update.UpsertedAgents {
310-
fqdn := make([]string, 0, len(agent.Hosts))
311-
for name := range agent.Hosts {
312-
fqdn = append(fqdn, name.WithTrailingDot())
313-
}
314-
out.UpsertedAgents[i] = &Agent{
315-
Id: tailnet.UUIDToByteSlice(agent.ID),
316-
Name: agent.Name,
317-
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
318-
Fqdn: fqdn,
319-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
320-
// TODO: Populate
321-
LastHandshake: nil,
322-
}
355+
upsertedAgents, err := t.populateAgents(update.UpsertedAgents)
356+
if err != nil {
357+
return nil, xerrors.Errorf("failed to populate agent network info: %w", err)
323358
}
359+
out.UpsertedAgents = upsertedAgents
324360
for i, ws := range update.DeletedWorkspaces {
325361
out.DeletedWorkspaces[i] = &Workspace{
326362
Id: tailnet.UUIDToByteSlice(ws.ID),
@@ -334,16 +370,106 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
334370
fqdn = append(fqdn, name.WithTrailingDot())
335371
}
336372
out.DeletedAgents[i] = &Agent{
373+
Id: tailnet.UUIDToByteSlice(agent.ID),
374+
Name: agent.Name,
375+
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
376+
Fqdn: fqdn,
377+
IpAddrs: hostsToIPStrings(agent.Hosts),
378+
LastHandshake: nil,
379+
Latency: nil,
380+
}
381+
}
382+
return out, nil
383+
}
384+
385+
// Given a list of agents, populate their network info, and return them as proto agents.
386+
func (t *Tunnel) populateAgents(agents []*tailnet.Agent) ([]*Agent, error) {
387+
if t.conn == nil {
388+
return nil, xerrors.New("no active connection")
389+
}
390+
391+
out := make([]*Agent, 0, len(agents))
392+
var wg sync.WaitGroup
393+
pingCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
394+
defer cancelFunc()
395+
396+
for _, agent := range agents {
397+
fqdn := make([]string, 0, len(agent.Hosts))
398+
for name := range agent.Hosts {
399+
fqdn = append(fqdn, name.WithTrailingDot())
400+
}
401+
protoAgent := &Agent{
337402
Id: tailnet.UUIDToByteSlice(agent.ID),
338403
Name: agent.Name,
339404
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
340405
Fqdn: fqdn,
341-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
342-
// TODO: Populate
343-
LastHandshake: nil,
406+
IpAddrs: hostsToIPStrings(agent.Hosts),
344407
}
408+
agentIP := tailnet.CoderServicePrefix.AddrFromUUID(agent.ID)
409+
wg.Add(1)
410+
go func() {
411+
defer wg.Done()
412+
duration, _, _, err := t.conn.Ping(pingCtx, agentIP)
413+
if err != nil {
414+
return
415+
}
416+
protoAgent.Latency = durationpb.New(duration)
417+
}()
418+
diags := t.conn.GetPeerDiagnostics(agent.ID)
419+
//nolint:revive // outdated rule
420+
protoAgent.LastHandshake = timestamppb.New(diags.LastWireguardHandshake)
421+
out = append(out, protoAgent)
422+
}
423+
wg.Wait()
424+
425+
return out, nil
426+
}
427+
428+
// saveUpdate saves the workspace update to the tunnel's state, such that it can
429+
// be used to populate automated peer updates.
430+
func (t *Tunnel) saveUpdate(update tailnet.WorkspaceUpdate) {
431+
t.mu.Lock()
432+
defer t.mu.Unlock()
433+
434+
for _, agent := range update.UpsertedAgents {
435+
t.agents[agent.ID] = agent
436+
}
437+
for _, agent := range update.DeletedAgents {
438+
delete(t.agents, agent.ID)
439+
}
440+
}
441+
442+
// sendAgentUpdate sends a peer update message to the manager with the current
443+
// state of the agents, including the latest network status.
444+
func (t *Tunnel) sendAgentUpdate() {
445+
// The lock must be held until we send the message,
446+
// else we risk upserting a deleted agent.
447+
t.mu.Lock()
448+
defer t.mu.Unlock()
449+
450+
upsertedAgents, err := t.populateAgents(maps.Values(t.agents))
451+
if err != nil {
452+
t.logger.Error(t.ctx, "failed to produce agent network status update", slog.Error(err))
453+
return
454+
}
455+
456+
if len(upsertedAgents) == 0 {
457+
return
458+
}
459+
460+
msg := &TunnelMessage{
461+
Msg: &TunnelMessage_PeerUpdate{
462+
PeerUpdate: &PeerUpdate{
463+
UpsertedAgents: upsertedAgents,
464+
},
465+
},
466+
}
467+
468+
select {
469+
case <-t.ctx.Done():
470+
return
471+
case t.sendCh <- msg:
345472
}
346-
return out
347473
}
348474

349475
// the following are taken from sloghuman:
@@ -403,3 +529,17 @@ func quote(key string) string {
403529
}
404530
return quoted
405531
}
532+
533+
func hostsToIPStrings(hosts map[dnsname.FQDN][]netip.Addr) []string {
534+
seen := make(map[netip.Addr]struct{})
535+
var result []string
536+
for _, inner := range hosts {
537+
for _, elem := range inner {
538+
if _, exists := seen[elem]; !exists {
539+
seen[elem] = struct{}{}
540+
result = append(result, elem.String())
541+
}
542+
}
543+
}
544+
return result
545+
}

0 commit comments

Comments
 (0)