Skip to content

Commit cc694a5

Browse files
authored
feat: add debug info to HA coordinator (#5883)
1 parent 52ecd35 commit cc694a5

File tree

2 files changed

+188
-131
lines changed

2 files changed

+188
-131
lines changed

enterprise/tailnet/coordinator.go

+60-14
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/google/uuid"
16+
lru "github.com/hashicorp/golang-lru/v2"
1617
"golang.org/x/xerrors"
1718

1819
"cdr.dev/slog"
@@ -24,15 +25,22 @@ import (
2425
// that uses PostgreSQL pubsub to exchange handshakes.
2526
func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) {
2627
ctx, cancelFunc := context.WithCancel(context.Background())
28+
29+
nameCache, err := lru.New[uuid.UUID, string](512)
30+
if err != nil {
31+
panic("make lru cache: " + err.Error())
32+
}
33+
2734
coord := &haCoordinator{
2835
id: uuid.New(),
2936
log: logger,
3037
pubsub: pubsub,
3138
closeFunc: cancelFunc,
3239
close: make(chan struct{}),
3340
nodes: map[uuid.UUID]*agpl.Node{},
34-
agentSockets: map[uuid.UUID]net.Conn{},
35-
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]net.Conn{},
41+
agentSockets: map[uuid.UUID]*agpl.TrackedConn{},
42+
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]*agpl.TrackedConn{},
43+
agentNameCache: nameCache,
3644
}
3745

3846
if err := coord.runPubsub(ctx); err != nil {
@@ -53,10 +61,14 @@ type haCoordinator struct {
5361
// nodes maps agent and connection IDs their respective node.
5462
nodes map[uuid.UUID]*agpl.Node
5563
// agentSockets maps agent IDs to their open websocket.
56-
agentSockets map[uuid.UUID]net.Conn
64+
agentSockets map[uuid.UUID]*agpl.TrackedConn
5765
// agentToConnectionSockets maps agent IDs to connection IDs of conns that
5866
// are subscribed to updates for that agent.
59-
agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]net.Conn
67+
agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]*agpl.TrackedConn
68+
69+
// agentNameCache holds a cache of agent names. If one of them disappears,
70+
// it's helpful to have a name cached for debugging.
71+
agentNameCache *lru.Cache[uuid.UUID, string]
6072
}
6173

6274
// Node returns an in-memory node by ID.
@@ -94,12 +106,18 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID
94106
c.mutex.Lock()
95107
connectionSockets, ok := c.agentToConnectionSockets[agent]
96108
if !ok {
97-
connectionSockets = map[uuid.UUID]net.Conn{}
109+
connectionSockets = map[uuid.UUID]*agpl.TrackedConn{}
98110
c.agentToConnectionSockets[agent] = connectionSockets
99111
}
100112

101-
// Insert this connection into a map so the agent can publish node updates.
102-
connectionSockets[id] = conn
113+
now := time.Now().Unix()
114+
// Insert this connection into a map so the agent
115+
// can publish node updates.
116+
connectionSockets[id] = &agpl.TrackedConn{
117+
Conn: conn,
118+
Start: now,
119+
LastWrite: now,
120+
}
103121
c.mutex.Unlock()
104122

105123
defer func() {
@@ -176,7 +194,9 @@ func (c *haCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *js
176194

177195
// ServeAgent accepts a WebSocket connection to an agent that listens to
178196
// incoming connections and publishes node updates.
179-
func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, _ string) error {
197+
func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
198+
c.agentNameCache.Add(id, name)
199+
180200
// Tell clients on other instances to send a callmemaybe to us.
181201
err := c.publishAgentHello(id)
182202
if err != nil {
@@ -196,21 +216,41 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, _ string) error
196216
}
197217
}
198218

219+
// This uniquely identifies a connection that belongs to this goroutine.
220+
unique := uuid.New()
221+
now := time.Now().Unix()
222+
overwrites := int64(0)
223+
199224
// If an old agent socket is connected, we close it
200225
// to avoid any leaks. This shouldn't ever occur because
201226
// we expect one agent to be running.
202227
c.mutex.Lock()
203228
oldAgentSocket, ok := c.agentSockets[id]
204229
if ok {
230+
overwrites = oldAgentSocket.Overwrites + 1
205231
_ = oldAgentSocket.Close()
206232
}
207-
c.agentSockets[id] = conn
233+
c.agentSockets[id] = &agpl.TrackedConn{
234+
ID: unique,
235+
Conn: conn,
236+
237+
Name: name,
238+
Start: now,
239+
LastWrite: now,
240+
Overwrites: overwrites,
241+
}
208242
c.mutex.Unlock()
243+
209244
defer func() {
210245
c.mutex.Lock()
211246
defer c.mutex.Unlock()
212-
delete(c.agentSockets, id)
213-
delete(c.nodes, id)
247+
248+
// Only delete the connection if it's ours. It could have been
249+
// overwritten.
250+
if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique {
251+
delete(c.agentSockets, id)
252+
delete(c.nodes, id)
253+
}
214254
}()
215255

216256
decoder := json.NewDecoder(conn)
@@ -576,8 +616,14 @@ func (c *haCoordinator) formatAgentUpdate(id uuid.UUID, node *agpl.Node) ([]byte
576616
return buf.Bytes(), nil
577617
}
578618

579-
func (*haCoordinator) ServeHTTPDebug(w http.ResponseWriter, _ *http.Request) {
619+
func (c *haCoordinator) ServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
580620
w.Header().Set("Content-Type", "text/html; charset=utf-8")
581-
fmt.Fprintf(w, "<h1>coordinator</h1>")
582-
fmt.Fprintf(w, "<h2>ha debug coming soon</h2>")
621+
622+
c.mutex.RLock()
623+
defer c.mutex.RUnlock()
624+
625+
fmt.Fprintln(w, "<h1>high-availability wireguard coordinator debug</h1>")
626+
fmt.Fprintln(w, "<h4 style=\"margin-top:-25px\">warning: this only provides info from the node that served the request, if there are multiple replicas this data may be incomplete</h4>")
627+
628+
agpl.CoordinatorHTTPDebug(c.agentSockets, c.agentToConnectionSockets, c.agentNameCache)(w, r)
583629
}

0 commit comments

Comments
 (0)