Skip to content

Commit cf43bdd

Browse files
committed
chore(vpn): upsert agents with their network status
1 parent 26561dd commit cf43bdd

File tree

5 files changed

+563
-158
lines changed

5 files changed

+563
-158
lines changed

vpn/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"net/http"
66
"net/netip"
77
"net/url"
8+
"time"
89

910
"golang.org/x/xerrors"
1011
"nhooyr.io/websocket"
12+
"tailscale.com/ipn/ipnstate"
1113
"tailscale.com/net/dns"
1214
"tailscale.com/wgengine/router"
1315

16+
"github.com/google/uuid"
1417
"github.com/tailscale/wireguard-go/tun"
1518

1619
"cdr.dev/slog"
@@ -23,6 +26,8 @@ import (
2326

2427
type Conn interface {
2528
CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error)
29+
GetPeerDiagnostics(peerID uuid.UUID) tailnet.PeerDiagnostics
30+
Ping(ctx context.Context, ip netip.Addr) (time.Duration, bool, *ipnstate.PingResult, error)
2631
Close() error
2732
}
2833

vpn/tunnel.go

Lines changed: 162 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,36 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"net/netip"
1011
"net/url"
1112
"reflect"
1213
"strconv"
1314
"sync"
15+
"time"
1416
"unicode"
1517

18+
"golang.org/x/exp/maps"
1619
"golang.org/x/xerrors"
20+
"google.golang.org/protobuf/types/known/durationpb"
21+
"google.golang.org/protobuf/types/known/timestamppb"
1722
"tailscale.com/net/dns"
23+
"tailscale.com/util/dnsname"
1824
"tailscale.com/wgengine/router"
1925

26+
"github.com/google/uuid"
27+
2028
"cdr.dev/slog"
2129
"github.com/coder/coder/v2/coderd/util/ptr"
2230
"github.com/coder/coder/v2/tailnet"
2331
)
2432

33+
// The interval at which the tunnel sends network status updates to the manager.
34+
const netStatusInterval = 30 * time.Second
35+
2536
type Tunnel struct {
2637
speaker[*TunnelMessage, *ManagerMessage, ManagerMessage]
2738
ctx context.Context
39+
netLoopDone chan struct{}
2840
requestLoopDone chan struct{}
2941

3042
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
3547
client Client
3648
conn Conn
3749

50+
mu sync.Mutex
51+
// agents contains the agents that are currently connected to the tunnel.
52+
agents map[uuid.UUID]*tailnet.Agent
53+
3854
// router and dnsConfigurator may be nil
3955
router router.Router
4056
dnsConfigurator dns.OSConfigurator
@@ -61,14 +77,17 @@ func NewTunnel(
6177
ctx: ctx,
6278
logger: logger,
6379
requestLoopDone: make(chan struct{}),
80+
netLoopDone: make(chan struct{}),
6481
client: client,
82+
agents: make(map[uuid.UUID]*tailnet.Agent),
6583
}
6684

6785
for _, opt := range opts {
6886
opt(t)
6987
}
7088
t.speaker.start()
7189
go t.requestLoop()
90+
go t.netStatusLoop()
7291
return t, nil
7392
}
7493

@@ -97,6 +116,20 @@ func (t *Tunnel) requestLoop() {
97116
}
98117
}
99118

119+
func (t *Tunnel) netStatusLoop() {
120+
ticker := time.NewTicker(netStatusInterval)
121+
defer ticker.Stop()
122+
defer close(t.netLoopDone)
123+
for {
124+
select {
125+
case <-t.ctx.Done():
126+
return
127+
case <-ticker.C:
128+
t.sendAgentUpdate()
129+
}
130+
}
131+
}
132+
100133
// handleRPC handles unary RPCs from the manager.
101134
func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
102135
resp := &TunnelMessage{}
@@ -107,8 +140,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
107140
if err != nil {
108141
t.logger.Critical(t.ctx, "failed to get current workspace state", slog.Error(err))
109142
}
143+
update, err := t.createPeerUpdate(state)
144+
if err != nil {
145+
t.logger.Error(t.ctx, "failed to populate agent network info", slog.Error(err))
146+
}
110147
resp.Msg = &TunnelMessage_PeerUpdate{
111-
PeerUpdate: convertWorkspaceUpdate(state),
148+
PeerUpdate: update,
112149
}
113150
return resp
114151
case *ManagerMessage_Start:
@@ -189,9 +226,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
189226
}
190227

191228
func (t *Tunnel) Update(update tailnet.WorkspaceUpdate) error {
229+
peerUpdate, err := t.createPeerUpdate(update)
230+
if err != nil {
231+
t.logger.Error(t.ctx, "failed to populate agent network info", slog.Error(err))
232+
}
192233
msg := &TunnelMessage{
193234
Msg: &TunnelMessage_PeerUpdate{
194-
PeerUpdate: convertWorkspaceUpdate(update),
235+
PeerUpdate: peerUpdate,
195236
},
196237
}
197238
select {
@@ -288,35 +329,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
288329
return l
289330
}
290331

291-
func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
332+
// createPeerUpdate creates a PeerUpdate message from a workspace update, populating
333+
// the network status of the agents.
334+
func (t *Tunnel) createPeerUpdate(update tailnet.WorkspaceUpdate) (*PeerUpdate, error) {
292335
out := &PeerUpdate{
293336
UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)),
294337
UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)),
295338
DeletedWorkspaces: make([]*Workspace, len(update.DeletedWorkspaces)),
296339
DeletedAgents: make([]*Agent, len(update.DeletedAgents)),
297340
}
341+
342+
t.saveUpdate(update)
343+
298344
for i, ws := range update.UpsertedWorkspaces {
299345
out.UpsertedWorkspaces[i] = &Workspace{
300346
Id: tailnet.UUIDToByteSlice(ws.ID),
301347
Name: ws.Name,
302348
Status: Workspace_Status(ws.Status),
303349
}
304350
}
305-
for i, agent := range update.UpsertedAgents {
306-
fqdn := make([]string, 0, len(agent.Hosts))
307-
for name := range agent.Hosts {
308-
fqdn = append(fqdn, name.WithoutTrailingDot())
309-
}
310-
out.UpsertedAgents[i] = &Agent{
311-
Id: tailnet.UUIDToByteSlice(agent.ID),
312-
Name: agent.Name,
313-
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
314-
Fqdn: fqdn,
315-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
316-
// TODO: Populate
317-
LastHandshake: nil,
318-
}
351+
upsertedAgents, err := t.populateAgents(update.UpsertedAgents)
352+
if err != nil {
353+
return nil, xerrors.Errorf("failed to populate agent network info: %w", err)
319354
}
355+
out.UpsertedAgents = upsertedAgents
320356
for i, ws := range update.DeletedWorkspaces {
321357
out.DeletedWorkspaces[i] = &Workspace{
322358
Id: tailnet.UUIDToByteSlice(ws.ID),
@@ -327,19 +363,109 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
327363
for i, agent := range update.DeletedAgents {
328364
fqdn := make([]string, 0, len(agent.Hosts))
329365
for name := range agent.Hosts {
330-
fqdn = append(fqdn, name.WithoutTrailingDot())
366+
fqdn = append(fqdn, name.WithTrailingDot())
331367
}
332368
out.DeletedAgents[i] = &Agent{
369+
Id: tailnet.UUIDToByteSlice(agent.ID),
370+
Name: agent.Name,
371+
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
372+
Fqdn: fqdn,
373+
IpAddrs: hostsToIPStrings(agent.Hosts),
374+
LastHandshake: nil,
375+
Latency: nil,
376+
}
377+
}
378+
return out, nil
379+
}
380+
381+
// Given a list of agents, populate their network info, and return them as proto agents.
382+
func (t *Tunnel) populateAgents(agents []*tailnet.Agent) ([]*Agent, error) {
383+
if t.conn == nil {
384+
return nil, xerrors.New("no active connection")
385+
}
386+
387+
out := make([]*Agent, 0, len(agents))
388+
var wg sync.WaitGroup
389+
pingCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
390+
defer cancelFunc()
391+
392+
for _, agent := range agents {
393+
fqdn := make([]string, 0, len(agent.Hosts))
394+
for name := range agent.Hosts {
395+
fqdn = append(fqdn, name.WithTrailingDot())
396+
}
397+
protoAgent := &Agent{
333398
Id: tailnet.UUIDToByteSlice(agent.ID),
334399
Name: agent.Name,
335400
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
336401
Fqdn: fqdn,
337-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
338-
// TODO: Populate
339-
LastHandshake: nil,
402+
IpAddrs: hostsToIPStrings(agent.Hosts),
340403
}
404+
agentIP := tailnet.CoderServicePrefix.AddrFromUUID(agent.ID)
405+
wg.Add(1)
406+
go func() {
407+
defer wg.Done()
408+
duration, _, _, err := t.conn.Ping(pingCtx, agentIP)
409+
if err != nil {
410+
return
411+
}
412+
protoAgent.Latency = durationpb.New(duration)
413+
}()
414+
diags := t.conn.GetPeerDiagnostics(agent.ID)
415+
//nolint:revive // outdated rule
416+
protoAgent.LastHandshake = timestamppb.New(diags.LastWireguardHandshake)
417+
out = append(out, protoAgent)
418+
}
419+
wg.Wait()
420+
421+
return out, nil
422+
}
423+
424+
// saveUpdate saves the workspace update to the tunnel's state, such that it can
425+
// be used to populate automated peer updates.
426+
func (t *Tunnel) saveUpdate(update tailnet.WorkspaceUpdate) {
427+
t.mu.Lock()
428+
defer t.mu.Unlock()
429+
430+
for _, agent := range update.UpsertedAgents {
431+
t.agents[agent.ID] = agent
432+
}
433+
for _, agent := range update.DeletedAgents {
434+
delete(t.agents, agent.ID)
435+
}
436+
}
437+
438+
// sendAgentUpdate sends a peer update message to the manager with the current
439+
// state of the agents, including the latest network status.
440+
func (t *Tunnel) sendAgentUpdate() {
441+
// The lock must be held until we send the message,
442+
// else we risk upserting a deleted agent.
443+
t.mu.Lock()
444+
defer t.mu.Unlock()
445+
446+
upsertedAgents, err := t.populateAgents(maps.Values(t.agents))
447+
if err != nil {
448+
t.logger.Error(t.ctx, "failed to produce agent network status update", slog.Error(err))
449+
return
450+
}
451+
452+
if len(upsertedAgents) == 0 {
453+
return
454+
}
455+
456+
msg := &TunnelMessage{
457+
Msg: &TunnelMessage_PeerUpdate{
458+
PeerUpdate: &PeerUpdate{
459+
UpsertedAgents: upsertedAgents,
460+
},
461+
},
462+
}
463+
464+
select {
465+
case <-t.ctx.Done():
466+
return
467+
case t.sendCh <- msg:
341468
}
342-
return out
343469
}
344470

345471
// the following are taken from sloghuman:
@@ -399,3 +525,17 @@ func quote(key string) string {
399525
}
400526
return quoted
401527
}
528+
529+
func hostsToIPStrings(hosts map[dnsname.FQDN][]netip.Addr) []string {
530+
seen := make(map[netip.Addr]struct{})
531+
var result []string
532+
for _, inner := range hosts {
533+
for _, elem := range inner {
534+
if _, exists := seen[elem]; !exists {
535+
seen[elem] = struct{}{}
536+
result = append(result, elem.String())
537+
}
538+
}
539+
}
540+
return result
541+
}

0 commit comments

Comments
 (0)