@@ -178,12 +178,10 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
178
178
if len (nodes ) > 0 {
179
179
data , err := json .Marshal (nodes )
180
180
if err != nil {
181
- c .mutex .Unlock ()
182
181
return xerrors .Errorf ("marshal json: %w" , err )
183
182
}
184
183
_ , err = conn .Write (data )
185
184
if err != nil {
186
- c .mutex .Unlock ()
187
185
return xerrors .Errorf ("write nodes: %w" , err )
188
186
}
189
187
}
@@ -250,17 +248,16 @@ func (c *haCoordinator) hangleAgentUpdate(id uuid.UUID, decoder *json.Decoder) (
250
248
}
251
249
252
250
c .mutex .Lock ()
253
- defer c .mutex .Unlock ()
254
-
255
251
c .nodes [id ] = & node
256
-
257
252
connectionSockets , ok := c .agentToConnectionSockets [id ]
258
253
if ! ok {
254
+ c .mutex .Unlock ()
259
255
return & node , nil
260
256
}
261
257
262
258
data , err := json .Marshal ([]* agpl.Node {& node })
263
259
if err != nil {
260
+ c .mutex .Unlock ()
264
261
return nil , xerrors .Errorf ("marshal nodes: %w" , err )
265
262
}
266
263
@@ -275,6 +272,7 @@ func (c *haCoordinator) hangleAgentUpdate(id uuid.UUID, decoder *json.Decoder) (
275
272
_ , _ = connectionSocket .Write (data )
276
273
}()
277
274
}
275
+ c .mutex .Unlock ()
278
276
wg .Wait ()
279
277
return & node , nil
280
278
}
@@ -394,12 +392,12 @@ func (c *haCoordinator) runPubsub() error {
394
392
}
395
393
396
394
c .mutex .Lock ()
397
- defer c .mutex .Unlock ()
398
-
399
395
agentSocket , ok := c .agentSockets [agentUUID ]
400
396
if ! ok {
397
+ c .mutex .Unlock ()
401
398
return
402
399
}
400
+ c .mutex .Unlock ()
403
401
404
402
// We get a single node over pubsub, so turn into an array.
405
403
_ , err = agentSocket .Write (nodeJSON )
@@ -410,7 +408,6 @@ func (c *haCoordinator) runPubsub() error {
410
408
c .log .Error (ctx , "send callmemaybe to agent" , slog .Error (err ))
411
409
return
412
410
}
413
-
414
411
case "agenthello" :
415
412
agentUUID , err := uuid .ParseBytes (agentID )
416
413
if err != nil {
@@ -426,7 +423,6 @@ func (c *haCoordinator) runPubsub() error {
426
423
return
427
424
}
428
425
}
429
-
430
426
case "agentupdate" :
431
427
agentUUID , err := uuid .ParseBytes (agentID )
432
428
if err != nil {
@@ -440,7 +436,6 @@ func (c *haCoordinator) runPubsub() error {
440
436
c .log .Error (ctx , "handle agent update" , slog .Error (err ))
441
437
return
442
438
}
443
-
444
439
default :
445
440
c .log .Error (ctx , "unknown peer event" , slog .F ("name" , string (eventType )))
446
441
}
0 commit comments