Skip to content

Commit 1b9f328

Browse files
committed
Add logging to coordinator
Signed-off-by: Spike Curtis <spike@coder.com>
1 parent 62f3155 commit 1b9f328

File tree

7 files changed

+67
-29
lines changed

7 files changed

+67
-29
lines changed

agent/agent_test.go

+20-16
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ func TestAgent_UnixRemoteForwarding(t *testing.T) {
714714
var err error
715715
conn, err = net.Dial("unix", remoteSocketPath)
716716
return err == nil
717-
}, testutil.WaitLong, testutil.IntervalFast)
717+
}, testutil.WaitShort, testutil.IntervalFast)
718718
defer conn.Close()
719719
_, err = conn.Write([]byte("test"))
720720
require.NoError(t, err)
@@ -879,6 +879,7 @@ func TestAgent_StartupScript(t *testing.T) {
879879
}
880880
t.Run("Success", func(t *testing.T) {
881881
t.Parallel()
882+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
882883
client := &client{
883884
t: t,
884885
agentID: uuid.New(),
@@ -887,12 +888,12 @@ func TestAgent_StartupScript(t *testing.T) {
887888
DERPMap: &tailcfg.DERPMap{},
888889
},
889890
statsChan: make(chan *agentsdk.Stats),
890-
coordinator: tailnet.NewCoordinator(),
891+
coordinator: tailnet.NewCoordinator(logger),
891892
}
892893
closer := agent.New(agent.Options{
893894
Client: client,
894895
Filesystem: afero.NewMemMapFs(),
895-
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
896+
Logger: logger.Named("agent"),
896897
ReconnectingPTYTimeout: 0,
897898
})
898899
t.Cleanup(func() {
@@ -910,6 +911,7 @@ func TestAgent_StartupScript(t *testing.T) {
910911
// script has written too many lines it will still succeed!
911912
t.Run("OverflowsAndSkips", func(t *testing.T) {
912913
t.Parallel()
914+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
913915
client := &client{
914916
t: t,
915917
agentID: uuid.New(),
@@ -927,12 +929,12 @@ func TestAgent_StartupScript(t *testing.T) {
927929
return codersdk.ReadBodyAsError(res)
928930
},
929931
statsChan: make(chan *agentsdk.Stats),
930-
coordinator: tailnet.NewCoordinator(),
932+
coordinator: tailnet.NewCoordinator(logger),
931933
}
932934
closer := agent.New(agent.Options{
933935
Client: client,
934936
Filesystem: afero.NewMemMapFs(),
935-
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
937+
Logger: logger.Named("agent"),
936938
ReconnectingPTYTimeout: 0,
937939
})
938940
t.Cleanup(func() {
@@ -1282,7 +1284,7 @@ func TestAgent_Lifecycle(t *testing.T) {
12821284

12831285
t.Run("ShutdownScriptOnce", func(t *testing.T) {
12841286
t.Parallel()
1285-
1287+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
12861288
expected := "this-is-shutdown"
12871289
client := &client{
12881290
t: t,
@@ -1293,13 +1295,13 @@ func TestAgent_Lifecycle(t *testing.T) {
12931295
ShutdownScript: "echo " + expected,
12941296
},
12951297
statsChan: make(chan *agentsdk.Stats),
1296-
coordinator: tailnet.NewCoordinator(),
1298+
coordinator: tailnet.NewCoordinator(logger),
12971299
}
12981300

12991301
fs := afero.NewMemMapFs()
13001302
agent := agent.New(agent.Options{
13011303
Client: client,
1302-
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
1304+
Logger: logger.Named("agent"),
13031305
Filesystem: fs,
13041306
})
13051307

@@ -1548,9 +1550,10 @@ func TestAgent_Speedtest(t *testing.T) {
15481550

15491551
func TestAgent_Reconnect(t *testing.T) {
15501552
t.Parallel()
1553+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
15511554
// After the agent is disconnected from a coordinator, it's supposed
15521555
// to reconnect!
1553-
coordinator := tailnet.NewCoordinator()
1556+
coordinator := tailnet.NewCoordinator(logger)
15541557
defer coordinator.Close()
15551558

15561559
agentID := uuid.New()
@@ -1572,7 +1575,7 @@ func TestAgent_Reconnect(t *testing.T) {
15721575
return "", nil
15731576
},
15741577
Client: client,
1575-
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
1578+
Logger: logger.Named("agent"),
15761579
})
15771580
defer closer.Close()
15781581

@@ -1587,8 +1590,8 @@ func TestAgent_Reconnect(t *testing.T) {
15871590

15881591
func TestAgent_WriteVSCodeConfigs(t *testing.T) {
15891592
t.Parallel()
1590-
1591-
coordinator := tailnet.NewCoordinator()
1593+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
1594+
coordinator := tailnet.NewCoordinator(logger)
15921595
defer coordinator.Close()
15931596

15941597
client := &client{
@@ -1607,7 +1610,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
16071610
return "", nil
16081611
},
16091612
Client: client,
1610-
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
1613+
Logger: logger.Named("agent"),
16111614
Filesystem: filesystem,
16121615
})
16131616
defer closer.Close()
@@ -1698,10 +1701,11 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
16981701
afero.Fs,
16991702
io.Closer,
17001703
) {
1704+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
17011705
if metadata.DERPMap == nil {
17021706
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)
17031707
}
1704-
coordinator := tailnet.NewCoordinator()
1708+
coordinator := tailnet.NewCoordinator(logger)
17051709
t.Cleanup(func() {
17061710
_ = coordinator.Close()
17071711
})
@@ -1718,7 +1722,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
17181722
closer := agent.New(agent.Options{
17191723
Client: c,
17201724
Filesystem: fs,
1721-
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
1725+
Logger: logger.Named("agent"),
17221726
ReconnectingPTYTimeout: ptyTimeout,
17231727
})
17241728
t.Cleanup(func() {
@@ -1727,7 +1731,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
17271731
conn, err := tailnet.NewConn(&tailnet.Options{
17281732
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
17291733
DERPMap: metadata.DERPMap,
1730-
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
1734+
Logger: logger.Named("client"),
17311735
})
17321736
require.NoError(t, err)
17331737
clientConn, serverConn := net.Pipe()

coderd/coderd.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func New(options *Options) *API {
221221
options.PrometheusRegistry = prometheus.NewRegistry()
222222
}
223223
if options.TailnetCoordinator == nil {
224-
options.TailnetCoordinator = tailnet.NewCoordinator()
224+
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger)
225225
}
226226
if options.DERPServer == nil {
227227
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp")))

coderd/prometheusmetrics/prometheusmetrics_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/require"
1818

19+
"cdr.dev/slog"
1920
"cdr.dev/slog/sloggers/slogtest"
2021

2122
"github.com/coder/coder/coderd/coderdtest"
@@ -298,7 +299,7 @@ func TestAgents(t *testing.T) {
298299
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
299300

300301
// given
301-
coordinator := tailnet.NewCoordinator()
302+
coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug))
302303
coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{}
303304
coordinatorPtr.Store(&coordinator)
304305
derpMap := tailnettest.RunDERPAndSTUN(t)

coderd/wsconncache/wsconncache_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ func TestCache(t *testing.T) {
156156

157157
func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Duration) *codersdk.WorkspaceAgentConn {
158158
t.Helper()
159-
159+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
160160
manifest.DERPMap = tailnettest.RunDERPAndSTUN(t)
161161

162-
coordinator := tailnet.NewCoordinator()
162+
coordinator := tailnet.NewCoordinator(logger)
163163
t.Cleanup(func() {
164164
_ = coordinator.Close()
165165
})
@@ -171,7 +171,7 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
171171
manifest: manifest,
172172
coordinator: coordinator,
173173
},
174-
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelInfo),
174+
Logger: logger.Named("agent"),
175175
ReconnectingPTYTimeout: ptyTimeout,
176176
})
177177
t.Cleanup(func() {

enterprise/coderd/coderd.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ func (api *API) updateEntitlements(ctx context.Context) error {
390390
}
391391

392392
if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
393-
coordinator := agpltailnet.NewCoordinator()
393+
coordinator := agpltailnet.NewCoordinator(api.Logger)
394394
if enabled {
395395
haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub)
396396
if err != nil {

tailnet/coordinator.go

+29-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"cdr.dev/slog"
17+
1618
"github.com/google/uuid"
1719
lru "github.com/hashicorp/golang-lru/v2"
1820
"golang.org/x/exp/slices"
@@ -111,16 +113,19 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func
111113
}, errChan
112114
}
113115

116+
const LoggerName = "coord"
117+
114118
// NewCoordinator constructs a new in-memory connection coordinator. This
115119
// coordinator is incompatible with multiple Coder replicas as all node data is
116120
// in-memory.
117-
func NewCoordinator() Coordinator {
121+
func NewCoordinator(logger slog.Logger) Coordinator {
118122
nameCache, err := lru.New[uuid.UUID, string](512)
119123
if err != nil {
120124
panic("make lru cache: " + err.Error())
121125
}
122126

123127
return &coordinator{
128+
logger: logger.Named(LoggerName),
124129
closed: false,
125130
nodes: map[uuid.UUID]*Node{},
126131
agentSockets: map[uuid.UUID]*TrackedConn{},
@@ -137,6 +142,7 @@ func NewCoordinator() Coordinator {
137142
// This coordinator is incompatible with multiple Coder
138143
// replicas as all node data is in-memory.
139144
type coordinator struct {
145+
logger slog.Logger
140146
mutex sync.RWMutex
141147
closed bool
142148

@@ -194,6 +200,8 @@ func (c *coordinator) AgentCount() int {
194200
// ServeClient accepts a WebSocket connection that wants to connect to an agent
195201
// with the specified ID.
196202
func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error {
203+
logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent))
204+
logger.Debug(context.TODO(), "coordinating client")
197205
c.mutex.Lock()
198206
if c.closed {
199207
c.mutex.Unlock()
@@ -210,6 +218,7 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
210218
return xerrors.Errorf("marshal node: %w", err)
211219
}
212220
_, err = conn.Write(data)
221+
logger.Debug(context.TODO(), "wrote initial node")
213222
if err != nil {
214223
return xerrors.Errorf("write nodes: %w", err)
215224
}
@@ -230,7 +239,9 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
230239
LastWrite: now,
231240
}
232241
c.mutex.Unlock()
242+
logger.Debug(context.TODO(), "added tracked connection")
233243
defer func() {
244+
logger.Debug(context.TODO(), "deleting tracked connection")
234245
c.mutex.Lock()
235246
defer c.mutex.Unlock()
236247
// Clean all traces of this connection from the map.
@@ -259,11 +270,13 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
259270
}
260271

261272
func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json.Decoder) error {
273+
logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent))
262274
var node Node
263275
err := decoder.Decode(&node)
264276
if err != nil {
265277
return xerrors.Errorf("read json: %w", err)
266278
}
279+
logger.Debug(context.TODO(), "got client node update", slog.F("node", node))
267280

268281
c.mutex.Lock()
269282
// Update the node of this client in our in-memory map. If an agent entirely
@@ -274,6 +287,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
274287
agentSocket, ok := c.agentSockets[agent]
275288
if !ok {
276289
c.mutex.Unlock()
290+
logger.Debug(context.TODO(), "no agent socket")
277291
return nil
278292
}
279293
c.mutex.Unlock()
@@ -291,13 +305,16 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
291305
}
292306
return xerrors.Errorf("write json: %w", err)
293307
}
308+
logger.Debug(context.TODO(), "sent client node to agent")
294309

295310
return nil
296311
}
297312

298313
// ServeAgent accepts a WebSocket connection to an agent that
299314
// listens to incoming connections and publishes node updates.
300315
func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
316+
logger := c.logger.With(slog.F("agent_id", id))
317+
logger.Debug(context.TODO(), "coordinating agent")
301318
c.mutex.Lock()
302319
if c.closed {
303320
c.mutex.Unlock()
@@ -324,6 +341,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
324341
return xerrors.Errorf("marshal json: %w", err)
325342
}
326343
_, err = conn.Write(data)
344+
logger.Debug(context.TODO(), "wrote initial client(s) to agent", slog.F("nodes", nodes))
327345
if err != nil {
328346
return xerrors.Errorf("write nodes: %w", err)
329347
}
@@ -356,6 +374,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
356374
}
357375

358376
c.mutex.Unlock()
377+
logger.Debug(context.TODO(), "added agent socket")
359378
defer func() {
360379
c.mutex.Lock()
361380
defer c.mutex.Unlock()
@@ -365,6 +384,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
365384
if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique {
366385
delete(c.agentSockets, id)
367386
delete(c.nodes, id)
387+
logger.Debug(context.TODO(), "deleted agent socket")
368388
}
369389
}()
370390

@@ -381,17 +401,20 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
381401
}
382402

383403
func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder) error {
404+
logger := c.logger.With(slog.F("agent_id", id))
384405
var node Node
385406
err := decoder.Decode(&node)
386407
if err != nil {
387408
return xerrors.Errorf("read json: %w", err)
388409
}
410+
logger.Debug(context.TODO(), "decoded agent node", slog.F("node", node))
389411

390412
c.mutex.Lock()
391413
c.nodes[id] = &node
392414
connectionSockets, ok := c.agentToConnectionSockets[id]
393415
if !ok {
394416
c.mutex.Unlock()
417+
logger.Debug(context.TODO(), "no client sockets")
395418
return nil
396419
}
397420
data, err := json.Marshal([]*Node{&node})
@@ -403,11 +426,14 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder
403426
// Publish the new node to every listening socket.
404427
var wg sync.WaitGroup
405428
wg.Add(len(connectionSockets))
406-
for _, connectionSocket := range connectionSockets {
429+
for clientID, connectionSocket := range connectionSockets {
430+
clientID := clientID
407431
connectionSocket := connectionSocket
408432
go func() {
409433
_ = connectionSocket.SetWriteDeadline(time.Now().Add(5 * time.Second))
410-
_, _ = connectionSocket.Write(data)
434+
_, err := connectionSocket.Write(data)
435+
logger.Debug(context.TODO(), "sent agent node to client",
436+
slog.F("client_id", clientID), slog.Error(err))
411437
wg.Done()
412438
}()
413439
}

0 commit comments

Comments
 (0)