@@ -74,7 +74,9 @@ type pgCoord struct {
74
74
75
75
bindings chan binding
76
76
newConnections chan agpl.Queue
77
- newSubscriptions chan subscribe
77
+ closeConnections chan agpl.Queue
78
+ subscriberCh chan subscribe
79
+ querierSubCh chan subscribe
78
80
id uuid.UUID
79
81
80
82
cancel context.CancelFunc
@@ -109,7 +111,10 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store
109
111
id := uuid .New ()
110
112
logger = logger .Named ("pgcoord" ).With (slog .F ("coordinator_id" , id ))
111
113
bCh := make (chan binding )
114
+ // used for opening connections
112
115
cCh := make (chan agpl.Queue )
116
+ // used for closing connections
117
+ ccCh := make (chan agpl.Queue )
113
118
// for communicating subscriptions with the subscriber
114
119
sCh := make (chan subscribe )
115
120
// for communicating subscriptions with the querier
@@ -126,10 +131,12 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store
126
131
binder : newBinder (ctx , logger , id , store , bCh , fHB ),
127
132
bindings : bCh ,
128
133
newConnections : cCh ,
129
- subscriber : newSubscriber (ctx , logger , id , store , sCh , qsCh , fHB ),
130
- newSubscriptions : sCh ,
134
+ closeConnections : ccCh ,
135
+ subscriber : newSubscriber (ctx , logger , id , store , sCh , fHB ),
136
+ subscriberCh : sCh ,
137
+ querierSubCh : qsCh ,
131
138
id : id ,
132
- querier : newQuerier (ctx , logger , id , ps , store , id , cCh , qsCh , numQuerierWorkers , fHB ),
139
+ querier : newQuerier (ctx , logger , id , ps , store , id , cCh , ccCh , qsCh , numQuerierWorkers , fHB ),
133
140
closed : make (chan struct {}),
134
141
}
135
142
logger .Info (ctx , "starting coordinator" )
@@ -152,22 +159,18 @@ func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
152
159
})
153
160
},
154
161
OnRemove : func (enq agpl.Queue ) {
155
- b := binding {
162
+ _ = sendCtx ( c . ctx , c . bindings , binding {
156
163
bKey : bKey {
157
164
id : enq .UniqueID (),
158
165
kind : enq .Kind (),
159
166
},
160
- }
161
- if err := sendCtx (c .ctx , c .bindings , b ); err != nil {
162
- c .logger .Debug (c .ctx , "parent context expired while withdrawing binding" , slog .Error (err ))
163
- }
164
- if err := sendCtx (c .ctx , c .newSubscriptions , subscribe {
167
+ })
168
+ _ = sendCtx (c .ctx , c .subscriberCh , subscribe {
165
169
sKey : sKey {clientID : id },
166
170
q : enq ,
167
171
active : false ,
168
- }); err != nil {
169
- c .logger .Debug (c .ctx , "parent context expired while withdrawing subscriptions" , slog .Error (err ))
170
- }
172
+ })
173
+ _ = sendCtx (c .ctx , c .closeConnections , enq )
171
174
},
172
175
}).Init ()
173
176
@@ -182,32 +185,44 @@ func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
182
185
}
183
186
184
187
func (c * pgCoord ) addSubscription (q agpl.Queue , agentID uuid.UUID ) error {
185
- err := sendCtx ( c . ctx , c . newSubscriptions , subscribe {
188
+ sub := subscribe {
186
189
sKey : sKey {
187
190
clientID : q .UniqueID (),
188
191
agentID : agentID ,
189
192
},
190
193
q : q ,
191
194
active : true ,
192
- })
193
- if err != nil {
195
+ }
196
+ if err := sendCtx ( c . ctx , c . subscriberCh , sub ); err != nil {
194
197
return err
195
198
}
199
+ if err := sendCtx (c .ctx , c .querierSubCh , sub ); err != nil {
200
+ // There's no need to clean up the sub sent to the subscriber if this
201
+ // fails, since it means the entire coordinator is being torn down.
202
+ return err
203
+ }
204
+
196
205
return nil
197
206
}
198
207
199
208
func (c * pgCoord ) removeSubscription (q agpl.Queue , agentID uuid.UUID ) error {
200
- err := sendCtx ( c . ctx , c . newSubscriptions , subscribe {
209
+ sub := subscribe {
201
210
sKey : sKey {
202
211
clientID : q .UniqueID (),
203
212
agentID : agentID ,
204
213
},
205
214
q : q ,
206
215
active : false ,
207
- })
208
- if err != nil {
216
+ }
217
+ if err := sendCtx ( c . ctx , c . subscriberCh , sub ); err != nil {
209
218
return err
210
219
}
220
+ if err := sendCtx (c .ctx , c .querierSubCh , sub ); err != nil {
221
+ // There's no need to clean up the sub sent to the subscriber if this
222
+ // fails, since it means the entire coordinator is being torn down.
223
+ return err
224
+ }
225
+
211
226
return nil
212
227
}
213
228
@@ -247,6 +262,7 @@ func (c *pgCoord) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) erro
247
262
// can only be a context error, no need to log here.
248
263
return err
249
264
}
265
+ defer func () { _ = sendCtx (c .ctx , c .closeConnections , agpl .Queue (cIO )) }()
250
266
251
267
if err := c .addSubscription (cIO , agent ); err != nil {
252
268
return err
@@ -271,6 +287,8 @@ func (c *pgCoord) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
271
287
// can only be a context error, no need to log here.
272
288
return err
273
289
}
290
+ defer func () { _ = sendCtx (c .ctx , c .closeConnections , agpl .Queue (cIO )) }()
291
+
274
292
<- cIO .ctx .Done ()
275
293
return nil
276
294
}
@@ -311,7 +329,6 @@ type subscriber struct {
311
329
coordinatorID uuid.UUID
312
330
store database.Store
313
331
subscriptions <- chan subscribe
314
- querierCh chan <- subscribe
315
332
316
333
mu sync.Mutex
317
334
// map[clientID]map[agentID]subscribe
@@ -324,7 +341,6 @@ func newSubscriber(ctx context.Context,
324
341
id uuid.UUID ,
325
342
store database.Store ,
326
343
subscriptions <- chan subscribe ,
327
- querierCh chan <- subscribe ,
328
344
startWorkers <- chan struct {},
329
345
) * subscriber {
330
346
s := & subscriber {
@@ -333,7 +349,6 @@ func newSubscriber(ctx context.Context,
333
349
coordinatorID : id ,
334
350
store : store ,
335
351
subscriptions : subscriptions ,
336
- querierCh : querierCh ,
337
352
latest : make (map [uuid.UUID ]map [uuid.UUID ]subscribe ),
338
353
workQ : newWorkQ [sKey ](ctx ),
339
354
}
@@ -356,7 +371,6 @@ func (s *subscriber) handleSubscriptions() {
356
371
case sub := <- s .subscriptions :
357
372
s .storeSubscription (sub )
358
373
s .workQ .enqueue (sub .sKey )
359
- s .querierCh <- sub
360
374
}
361
375
}
362
376
}
@@ -780,8 +794,9 @@ type querier struct {
780
794
pubsub pubsub.Pubsub
781
795
store database.Store
782
796
783
- newConnections chan agpl.Queue
784
- subscriptions chan subscribe
797
+ newConnections chan agpl.Queue
798
+ closeConnections chan agpl.Queue
799
+ subscriptions chan subscribe
785
800
786
801
workQ * workQ [mKey ]
787
802
@@ -810,6 +825,7 @@ func newQuerier(ctx context.Context,
810
825
store database.Store ,
811
826
self uuid.UUID ,
812
827
newConnections chan agpl.Queue ,
828
+ closeConnections chan agpl.Queue ,
813
829
subscriptions chan subscribe ,
814
830
numWorkers int ,
815
831
firstHeartbeat chan struct {},
@@ -822,6 +838,7 @@ func newQuerier(ctx context.Context,
822
838
pubsub : ps ,
823
839
store : store ,
824
840
newConnections : newConnections ,
841
+ closeConnections : closeConnections ,
825
842
subscriptions : subscriptions ,
826
843
workQ : newWorkQ [mKey ](ctx ),
827
844
heartbeats : newHeartbeats (ctx , logger , ps , store , self , updates , firstHeartbeat ),
@@ -860,6 +877,9 @@ func (q *querier) handleIncoming() {
860
877
panic (fmt .Sprint ("unreachable: invalid queue kind " , c .Kind ()))
861
878
}
862
879
880
+ case c := <- q .closeConnections :
881
+ q .cleanupConn (c )
882
+
863
883
case sub := <- q .subscriptions :
864
884
if sub .active {
865
885
q .newClientSubscription (sub .q , sub .agentID )
@@ -903,7 +923,6 @@ func (q *querier) newAgentConn(c agpl.Queue) {
903
923
}
904
924
cm .count ++
905
925
q .conns [c .UniqueID ()] = c
906
- go q .waitCleanupConn (c )
907
926
}
908
927
909
928
func (q * querier ) newClientSubscription (c agpl.Queue , agentID uuid.UUID ) {
@@ -981,12 +1000,6 @@ func (q *querier) newClientConn(c agpl.Queue) {
981
1000
}
982
1001
983
1002
q .conns [c .UniqueID ()] = c
984
- go q .waitCleanupConn (c )
985
- }
986
-
987
- func (q * querier ) waitCleanupConn (c agpl.Queue ) {
988
- <- c .Done ()
989
- q .cleanupConn (c )
990
1003
}
991
1004
992
1005
func (q * querier ) cleanupConn (c agpl.Queue ) {
0 commit comments