@@ -251,16 +251,6 @@ func (c *pgCoord) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) erro
251
251
if err := c .addSubscription (cIO , agent ); err != nil {
252
252
return err
253
253
}
254
- defer func () {
255
- err := c .removeSubscription (cIO , agent )
256
- if err != nil {
257
- c .logger .Debug (c .ctx , "remove client subscription" ,
258
- slog .F ("client_id" , id ),
259
- slog .F ("agent_id" , agent ),
260
- slog .Error (err ),
261
- )
262
- }
263
- }()
264
254
265
255
<- cIO .ctx .Done ()
266
256
return nil
@@ -924,12 +914,15 @@ func (q *querier) newClientSubscription(c agpl.Queue, agentID uuid.UUID) {
924
914
q .clientSubscriptions [c .UniqueID ()] = map [uuid.UUID ]struct {}{}
925
915
}
926
916
917
+ fmt .Println ("add sub" , c .UniqueID (), agentID )
918
+
927
919
mk := mKey {
928
920
agent : agentID ,
929
921
kind : agpl .QueueKindClient ,
930
922
}
931
923
cm , ok := q .mappers [mk ]
932
924
if ! ok {
925
+ fmt .Println ("new mapper" )
933
926
ctx , cancel := context .WithCancel (q .ctx )
934
927
mpr := newMapper (ctx , q .logger , mk , q .heartbeats )
935
928
cm = & countedMapper {
@@ -952,6 +945,8 @@ func (q *querier) removeClientSubscription(c agpl.Queue, agentID uuid.UUID) {
952
945
q .mu .Lock ()
953
946
defer q .mu .Unlock ()
954
947
948
+ fmt .Println ("remove sub" , c .UniqueID (), agentID )
949
+
955
950
// agentID: uuid.Nil indicates that a client is going away. The querier
956
951
// handles that in cleanupConn below instead.
957
952
if agentID == uuid .Nil {
0 commit comments