@@ -57,8 +57,9 @@ func (c *haCoordinator) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
57
57
ID : id ,
58
58
AgentIsLegacyFunc : c .agentIsLegacy ,
59
59
OnSubscribe : c .clientSubscribeToAgent ,
60
+ OnUnsubscribe : c .clientUnsubscribeFromAgent ,
60
61
OnNodeUpdate : c .clientNodeUpdate ,
61
- OnRemove : func ( enq agpl. Queue ) { c .clientDisconnected ( enq . UniqueID ()) } ,
62
+ OnRemove : c .clientDisconnected ,
62
63
}).Init ()
63
64
c .addClient (id , m )
64
65
return m
@@ -101,6 +102,22 @@ func (c *haCoordinator) clientSubscribeToAgent(enq agpl.Queue, agentID uuid.UUID
101
102
return nil , nil
102
103
}
103
104
105
+ func (c * haCoordinator ) clientUnsubscribeFromAgent (enq agpl.Queue , agentID uuid.UUID ) error {
106
+ c .mutex .Lock ()
107
+ defer c .mutex .Unlock ()
108
+
109
+ connectionSockets , ok := c .agentToConnectionSockets [agentID ]
110
+ if ! ok {
111
+ return nil
112
+ }
113
+ delete (connectionSockets , enq .UniqueID ())
114
+ if len (connectionSockets ) == 0 {
115
+ delete (c .agentToConnectionSockets , agentID )
116
+ }
117
+
118
+ return nil
119
+ }
120
+
104
121
type haCoordinator struct {
105
122
id uuid.UUID
106
123
log slog.Logger
@@ -161,7 +178,7 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id, agentID uuid.UUID) error
161
178
defer tc .Close ()
162
179
163
180
c .addClient (id , tc )
164
- defer c .clientDisconnected (id )
181
+ defer c .clientDisconnected (tc )
165
182
166
183
agentNode , err := c .clientSubscribeToAgent (tc , agentID )
167
184
if err != nil {
@@ -200,26 +217,24 @@ func (c *haCoordinator) initOrSetAgentConnectionSocketLocked(agentID uuid.UUID,
200
217
c .clientsToAgents [enq .UniqueID ()][agentID ] = c .agentSockets [agentID ]
201
218
}
202
219
203
- func (c * haCoordinator ) clientDisconnected (id uuid. UUID ) {
220
+ func (c * haCoordinator ) clientDisconnected (enq agpl. Queue ) {
204
221
c .mutex .Lock ()
205
222
defer c .mutex .Unlock ()
206
223
207
- for agentID := range c .clientsToAgents [id ] {
208
- // Clean all traces of this connection from the map.
209
- delete (c .nodes , id )
224
+ for agentID := range c .clientsToAgents [enq .UniqueID ()] {
210
225
connectionSockets , ok := c .agentToConnectionSockets [agentID ]
211
226
if ! ok {
212
- return
227
+ continue
213
228
}
214
- delete (connectionSockets , id )
215
- if len (connectionSockets ) ! = 0 {
216
- return
229
+ delete (connectionSockets , enq . UniqueID () )
230
+ if len (connectionSockets ) = = 0 {
231
+ delete ( c . agentToConnectionSockets , agentID )
217
232
}
218
- delete (c .agentToConnectionSockets , agentID )
219
233
}
220
234
221
- delete (c .clients , id )
222
- delete (c .clientsToAgents , id )
235
+ delete (c .nodes , enq .UniqueID ())
236
+ delete (c .clients , enq .UniqueID ())
237
+ delete (c .clientsToAgents , enq .UniqueID ())
223
238
}
224
239
225
240
func (c * haCoordinator ) handleNextClientMessage (id uuid.UUID , decoder * json.Decoder ) error {
0 commit comments