@@ -13,6 +13,8 @@ import (
13
13
"sync/atomic"
14
14
"time"
15
15
16
+ "cdr.dev/slog"
17
+
16
18
"github.com/google/uuid"
17
19
lru "github.com/hashicorp/golang-lru/v2"
18
20
"golang.org/x/exp/slices"
@@ -111,16 +113,19 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func
111
113
}, errChan
112
114
}
113
115
116
+ const LoggerName = "coord"
117
+
114
118
// NewCoordinator constructs a new in-memory connection coordinator. This
115
119
// coordinator is incompatible with multiple Coder replicas as all node data is
116
120
// in-memory.
117
- func NewCoordinator () Coordinator {
121
+ func NewCoordinator (logger slog. Logger ) Coordinator {
118
122
nameCache , err := lru.New [uuid.UUID , string ](512 )
119
123
if err != nil {
120
124
panic ("make lru cache: " + err .Error ())
121
125
}
122
126
123
127
return & coordinator {
128
+ logger : logger .Named (LoggerName ),
124
129
closed : false ,
125
130
nodes : map [uuid.UUID ]* Node {},
126
131
agentSockets : map [uuid.UUID ]* TrackedConn {},
@@ -137,6 +142,7 @@ func NewCoordinator() Coordinator {
137
142
// This coordinator is incompatible with multiple Coder
138
143
// replicas as all node data is in-memory.
139
144
type coordinator struct {
145
+ logger slog.Logger
140
146
mutex sync.RWMutex
141
147
closed bool
142
148
@@ -194,6 +200,8 @@ func (c *coordinator) AgentCount() int {
194
200
// ServeClient accepts a WebSocket connection that wants to connect to an agent
195
201
// with the specified ID.
196
202
func (c * coordinator ) ServeClient (conn net.Conn , id uuid.UUID , agent uuid.UUID ) error {
203
+ logger := c .logger .With (slog .F ("client_id" , id ), slog .F ("agent_id" , agent ))
204
+ logger .Debug (context .TODO (), "coordinating client" )
197
205
c .mutex .Lock ()
198
206
if c .closed {
199
207
c .mutex .Unlock ()
@@ -210,6 +218,7 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
210
218
return xerrors .Errorf ("marshal node: %w" , err )
211
219
}
212
220
_ , err = conn .Write (data )
221
+ logger .Debug (context .TODO (), "wrote initial node" )
213
222
if err != nil {
214
223
return xerrors .Errorf ("write nodes: %w" , err )
215
224
}
@@ -230,7 +239,9 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
230
239
LastWrite : now ,
231
240
}
232
241
c .mutex .Unlock ()
242
+ logger .Debug (context .TODO (), "added tracked connection" )
233
243
defer func () {
244
+ logger .Debug (context .TODO (), "deleting tracked connection" )
234
245
c .mutex .Lock ()
235
246
defer c .mutex .Unlock ()
236
247
// Clean all traces of this connection from the map.
@@ -259,11 +270,13 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
259
270
}
260
271
261
272
func (c * coordinator ) handleNextClientMessage (id , agent uuid.UUID , decoder * json.Decoder ) error {
273
+ logger := c .logger .With (slog .F ("client_id" , id ), slog .F ("agent_id" , agent ))
262
274
var node Node
263
275
err := decoder .Decode (& node )
264
276
if err != nil {
265
277
return xerrors .Errorf ("read json: %w" , err )
266
278
}
279
+ logger .Debug (context .TODO (), "got client node update" , slog .F ("node" , node ))
267
280
268
281
c .mutex .Lock ()
269
282
// Update the node of this client in our in-memory map. If an agent entirely
@@ -274,6 +287,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
274
287
agentSocket , ok := c .agentSockets [agent ]
275
288
if ! ok {
276
289
c .mutex .Unlock ()
290
+ logger .Debug (context .TODO (), "no agent socket" )
277
291
return nil
278
292
}
279
293
c .mutex .Unlock ()
@@ -291,13 +305,16 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
291
305
}
292
306
return xerrors .Errorf ("write json: %w" , err )
293
307
}
308
+ logger .Debug (context .TODO (), "sent client node to agent" )
294
309
295
310
return nil
296
311
}
297
312
298
313
// ServeAgent accepts a WebSocket connection to an agent that
299
314
// listens to incoming connections and publishes node updates.
300
315
func (c * coordinator ) ServeAgent (conn net.Conn , id uuid.UUID , name string ) error {
316
+ logger := c .logger .With (slog .F ("agent_id" , id ))
317
+ logger .Debug (context .TODO (), "coordinating agent" )
301
318
c .mutex .Lock ()
302
319
if c .closed {
303
320
c .mutex .Unlock ()
@@ -324,6 +341,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
324
341
return xerrors .Errorf ("marshal json: %w" , err )
325
342
}
326
343
_ , err = conn .Write (data )
344
+ logger .Debug (context .TODO (), "wrote initial client(s) to agent" , slog .F ("nodes" , nodes ))
327
345
if err != nil {
328
346
return xerrors .Errorf ("write nodes: %w" , err )
329
347
}
@@ -356,6 +374,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
356
374
}
357
375
358
376
c .mutex .Unlock ()
377
+ logger .Debug (context .TODO (), "added agent socket" )
359
378
defer func () {
360
379
c .mutex .Lock ()
361
380
defer c .mutex .Unlock ()
@@ -365,6 +384,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
365
384
if idConn , ok := c .agentSockets [id ]; ok && idConn .ID == unique {
366
385
delete (c .agentSockets , id )
367
386
delete (c .nodes , id )
387
+ logger .Debug (context .TODO (), "deleted agent socket" )
368
388
}
369
389
}()
370
390
@@ -381,17 +401,20 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
381
401
}
382
402
383
403
func (c * coordinator ) handleNextAgentMessage (id uuid.UUID , decoder * json.Decoder ) error {
404
+ logger := c .logger .With (slog .F ("agent_id" , id ))
384
405
var node Node
385
406
err := decoder .Decode (& node )
386
407
if err != nil {
387
408
return xerrors .Errorf ("read json: %w" , err )
388
409
}
410
+ logger .Debug (context .TODO (), "decoded agent node" , slog .F ("node" , node ))
389
411
390
412
c .mutex .Lock ()
391
413
c .nodes [id ] = & node
392
414
connectionSockets , ok := c .agentToConnectionSockets [id ]
393
415
if ! ok {
394
416
c .mutex .Unlock ()
417
+ logger .Debug (context .TODO (), "no client sockets" )
395
418
return nil
396
419
}
397
420
data , err := json .Marshal ([]* Node {& node })
@@ -403,11 +426,14 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder
403
426
// Publish the new node to every listening socket.
404
427
var wg sync.WaitGroup
405
428
wg .Add (len (connectionSockets ))
406
- for _ , connectionSocket := range connectionSockets {
429
+ for clientID , connectionSocket := range connectionSockets {
430
+ clientID := clientID
407
431
connectionSocket := connectionSocket
408
432
go func () {
409
433
_ = connectionSocket .SetWriteDeadline (time .Now ().Add (5 * time .Second ))
410
- _ , _ = connectionSocket .Write (data )
434
+ _ , err := connectionSocket .Write (data )
435
+ logger .Debug (context .TODO (), "sent agent node to client" ,
436
+ slog .F ("client_id" , clientID ), slog .Error (err ))
411
437
wg .Done ()
412
438
}()
413
439
}
0 commit comments