@@ -56,8 +56,8 @@ type haCoordinator struct {
56
56
57
57
// Node returns an in-memory node by ID.
58
58
func (c * haCoordinator ) Node (id uuid.UUID ) * agpl.Node {
59
- c .mutex .RLock ()
60
- defer c .mutex .RUnlock ()
59
+ c .mutex .Lock ()
60
+ defer c .mutex .Unlock ()
61
61
node := c .nodes [id ]
62
62
return node
63
63
}
@@ -79,6 +79,11 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID
79
79
if err != nil {
80
80
return xerrors .Errorf ("write nodes: %w" , err )
81
81
}
82
+ } else {
83
+ err := c .publishClientHello (agent )
84
+ if err != nil {
85
+ return xerrors .Errorf ("publish client hello: %w" , err )
86
+ }
82
87
}
83
88
84
89
c .mutex .Lock ()
@@ -205,7 +210,7 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
205
210
206
211
decoder := json .NewDecoder (conn )
207
212
for {
208
- node , err := c .hangleAgentUpdate (id , decoder )
213
+ node , err := c .handleAgentUpdate (id , decoder )
209
214
if err != nil {
210
215
if errors .Is (err , io .EOF ) {
211
216
return nil
@@ -240,7 +245,17 @@ func (c *haCoordinator) nodesSubscribedToAgent(agentID uuid.UUID) []*agpl.Node {
240
245
return nodes
241
246
}
242
247
243
- func (c * haCoordinator ) hangleAgentUpdate (id uuid.UUID , decoder * json.Decoder ) (* agpl.Node , error ) {
248
+ func (c * haCoordinator ) handleClientHello (id uuid.UUID ) error {
249
+ c .mutex .Lock ()
250
+ node , ok := c .nodes [id ]
251
+ c .mutex .Unlock ()
252
+ if ! ok {
253
+ return nil
254
+ }
255
+ return c .publishAgentToNodes (id , node )
256
+ }
257
+
258
+ func (c * haCoordinator ) handleAgentUpdate (id uuid.UUID , decoder * json.Decoder ) (* agpl.Node , error ) {
244
259
var node agpl.Node
245
260
err := decoder .Decode (& node )
246
261
if err != nil {
@@ -343,6 +358,18 @@ func (c *haCoordinator) publishAgentHello(id uuid.UUID) error {
343
358
return nil
344
359
}
345
360
361
+ func (c * haCoordinator ) publishClientHello (id uuid.UUID ) error {
362
+ msg , err := c .formatClientHello (id )
363
+ if err != nil {
364
+ return xerrors .Errorf ("format client hello: %w" , err )
365
+ }
366
+ err = c .pubsub .Publish ("wireguard_peers" , msg )
367
+ if err != nil {
368
+ return xerrors .Errorf ("publish client hello: %w" , err )
369
+ }
370
+ return nil
371
+ }
372
+
346
373
func (c * haCoordinator ) publishAgentToNodes (id uuid.UUID , node * agpl.Node ) error {
347
374
msg , err := c .formatAgentUpdate (id , node )
348
375
if err != nil {
@@ -408,6 +435,18 @@ func (c *haCoordinator) runPubsub() error {
408
435
c .log .Error (ctx , "send callmemaybe to agent" , slog .Error (err ))
409
436
return
410
437
}
438
+ case "clienthello" :
439
+ agentUUID , err := uuid .ParseBytes (agentID )
440
+ if err != nil {
441
+ c .log .Error (ctx , "invalid agent id" , slog .F ("id" , string (agentID )))
442
+ return
443
+ }
444
+
445
+ err = c .handleClientHello (agentUUID )
446
+ if err != nil {
447
+ c .log .Error (ctx , "handle agent request node" , slog .Error (err ))
448
+ return
449
+ }
411
450
case "agenthello" :
412
451
agentUUID , err := uuid .ParseBytes (agentID )
413
452
if err != nil {
@@ -431,7 +470,7 @@ func (c *haCoordinator) runPubsub() error {
431
470
}
432
471
433
472
decoder := json .NewDecoder (bytes .NewReader (nodeJSON ))
434
- _ , err = c .hangleAgentUpdate (agentUUID , decoder )
473
+ _ , err = c .handleAgentUpdate (agentUUID , decoder )
435
474
if err != nil {
436
475
c .log .Error (ctx , "handle agent update" , slog .Error (err ))
437
476
return
@@ -478,6 +517,17 @@ func (c *haCoordinator) formatAgentHello(id uuid.UUID) ([]byte, error) {
478
517
return buf .Bytes (), nil
479
518
}
480
519
520
+ // format: <coordinator id>|clienthello|<agent id>|
521
+ func (c * haCoordinator ) formatClientHello (id uuid.UUID ) ([]byte , error ) {
522
+ buf := bytes.Buffer {}
523
+
524
+ buf .WriteString (c .id .String () + "|" )
525
+ buf .WriteString ("clienthello|" )
526
+ buf .WriteString (id .String () + "|" )
527
+
528
+ return buf .Bytes (), nil
529
+ }
530
+
481
531
// format: <coordinator id>|agentupdate|<node id>|<node json>
482
532
func (c * haCoordinator ) formatAgentUpdate (id uuid.UUID , node * agpl.Node ) ([]byte , error ) {
483
533
buf := bytes.Buffer {}
0 commit comments