Skip to content

Commit d143d2d

Browse files
committed
use dedicated channels for querier subscribe and closing conns
1 parent a75f6f1 commit d143d2d

File tree

2 files changed

+46
-33
lines changed

2 files changed

+46
-33
lines changed

enterprise/tailnet/multiagent_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ func TestPGCoordinator_MultiAgent_TwoAgents(t *testing.T) {
323323
agent2.sendNode(&agpl.Node{PreferredDERP: 6})
324324

325325
id := uuid.New()
326-
ma1 := coord2.ServeMultiAgent(id)
326+
ma1 := coord3.ServeMultiAgent(id)
327327
defer ma1.Close()
328328

329329
err = ma1.SubscribeAgent(agent1.id)

enterprise/tailnet/pgcoord.go

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ type pgCoord struct {
7474

7575
bindings chan binding
7676
newConnections chan agpl.Queue
77-
newSubscriptions chan subscribe
77+
closeConnections chan agpl.Queue
78+
subscriberCh chan subscribe
79+
querierSubCh chan subscribe
7880
id uuid.UUID
7981

8082
cancel context.CancelFunc
@@ -109,7 +111,10 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store
109111
id := uuid.New()
110112
logger = logger.Named("pgcoord").With(slog.F("coordinator_id", id))
111113
bCh := make(chan binding)
114+
// used for opening connections
112115
cCh := make(chan agpl.Queue)
116+
// used for closing connections
117+
ccCh := make(chan agpl.Queue)
113118
// for communicating subscriptions with the subscriber
114119
sCh := make(chan subscribe)
115120
// for communicating subscriptions with the querier
@@ -126,10 +131,12 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store
126131
binder: newBinder(ctx, logger, id, store, bCh, fHB),
127132
bindings: bCh,
128133
newConnections: cCh,
129-
subscriber: newSubscriber(ctx, logger, id, store, sCh, qsCh, fHB),
130-
newSubscriptions: sCh,
134+
closeConnections: ccCh,
135+
subscriber: newSubscriber(ctx, logger, id, store, sCh, fHB),
136+
subscriberCh: sCh,
137+
querierSubCh: qsCh,
131138
id: id,
132-
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, qsCh, numQuerierWorkers, fHB),
139+
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, qsCh, numQuerierWorkers, fHB),
133140
closed: make(chan struct{}),
134141
}
135142
logger.Info(ctx, "starting coordinator")
@@ -152,22 +159,18 @@ func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
152159
})
153160
},
154161
OnRemove: func(enq agpl.Queue) {
155-
b := binding{
162+
_ = sendCtx(c.ctx, c.bindings, binding{
156163
bKey: bKey{
157164
id: enq.UniqueID(),
158165
kind: enq.Kind(),
159166
},
160-
}
161-
if err := sendCtx(c.ctx, c.bindings, b); err != nil {
162-
c.logger.Debug(c.ctx, "parent context expired while withdrawing binding", slog.Error(err))
163-
}
164-
if err := sendCtx(c.ctx, c.newSubscriptions, subscribe{
167+
})
168+
_ = sendCtx(c.ctx, c.subscriberCh, subscribe{
165169
sKey: sKey{clientID: id},
166170
q: enq,
167171
active: false,
168-
}); err != nil {
169-
c.logger.Debug(c.ctx, "parent context expired while withdrawing subscriptions", slog.Error(err))
170-
}
172+
})
173+
_ = sendCtx(c.ctx, c.closeConnections, enq)
171174
},
172175
}).Init()
173176

@@ -182,32 +185,44 @@ func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
182185
}
183186

184187
func (c *pgCoord) addSubscription(q agpl.Queue, agentID uuid.UUID) error {
185-
err := sendCtx(c.ctx, c.newSubscriptions, subscribe{
188+
sub := subscribe{
186189
sKey: sKey{
187190
clientID: q.UniqueID(),
188191
agentID: agentID,
189192
},
190193
q: q,
191194
active: true,
192-
})
193-
if err != nil {
195+
}
196+
if err := sendCtx(c.ctx, c.subscriberCh, sub); err != nil {
194197
return err
195198
}
199+
if err := sendCtx(c.ctx, c.querierSubCh, sub); err != nil {
200+
// There's no need to clean up the sub sent to the subscriber if this
201+
// fails, since it means the entire coordinator is being torn down.
202+
return err
203+
}
204+
196205
return nil
197206
}
198207

199208
func (c *pgCoord) removeSubscription(q agpl.Queue, agentID uuid.UUID) error {
200-
err := sendCtx(c.ctx, c.newSubscriptions, subscribe{
209+
sub := subscribe{
201210
sKey: sKey{
202211
clientID: q.UniqueID(),
203212
agentID: agentID,
204213
},
205214
q: q,
206215
active: false,
207-
})
208-
if err != nil {
216+
}
217+
if err := sendCtx(c.ctx, c.subscriberCh, sub); err != nil {
209218
return err
210219
}
220+
if err := sendCtx(c.ctx, c.querierSubCh, sub); err != nil {
221+
// There's no need to clean up the sub sent to the subscriber if this
222+
// fails, since it means the entire coordinator is being torn down.
223+
return err
224+
}
225+
211226
return nil
212227
}
213228

@@ -247,6 +262,7 @@ func (c *pgCoord) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) erro
247262
// can only be a context error, no need to log here.
248263
return err
249264
}
265+
defer func() { _ = sendCtx(c.ctx, c.closeConnections, agpl.Queue(cIO)) }()
250266

251267
if err := c.addSubscription(cIO, agent); err != nil {
252268
return err
@@ -271,6 +287,8 @@ func (c *pgCoord) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
271287
// can only be a context error, no need to log here.
272288
return err
273289
}
290+
defer func() { _ = sendCtx(c.ctx, c.closeConnections, agpl.Queue(cIO)) }()
291+
274292
<-cIO.ctx.Done()
275293
return nil
276294
}
@@ -311,7 +329,6 @@ type subscriber struct {
311329
coordinatorID uuid.UUID
312330
store database.Store
313331
subscriptions <-chan subscribe
314-
querierCh chan<- subscribe
315332

316333
mu sync.Mutex
317334
// map[clientID]map[agentID]subscribe
@@ -324,7 +341,6 @@ func newSubscriber(ctx context.Context,
324341
id uuid.UUID,
325342
store database.Store,
326343
subscriptions <-chan subscribe,
327-
querierCh chan<- subscribe,
328344
startWorkers <-chan struct{},
329345
) *subscriber {
330346
s := &subscriber{
@@ -333,7 +349,6 @@ func newSubscriber(ctx context.Context,
333349
coordinatorID: id,
334350
store: store,
335351
subscriptions: subscriptions,
336-
querierCh: querierCh,
337352
latest: make(map[uuid.UUID]map[uuid.UUID]subscribe),
338353
workQ: newWorkQ[sKey](ctx),
339354
}
@@ -356,7 +371,6 @@ func (s *subscriber) handleSubscriptions() {
356371
case sub := <-s.subscriptions:
357372
s.storeSubscription(sub)
358373
s.workQ.enqueue(sub.sKey)
359-
s.querierCh <- sub
360374
}
361375
}
362376
}
@@ -780,8 +794,9 @@ type querier struct {
780794
pubsub pubsub.Pubsub
781795
store database.Store
782796

783-
newConnections chan agpl.Queue
784-
subscriptions chan subscribe
797+
newConnections chan agpl.Queue
798+
closeConnections chan agpl.Queue
799+
subscriptions chan subscribe
785800

786801
workQ *workQ[mKey]
787802

@@ -810,6 +825,7 @@ func newQuerier(ctx context.Context,
810825
store database.Store,
811826
self uuid.UUID,
812827
newConnections chan agpl.Queue,
828+
closeConnections chan agpl.Queue,
813829
subscriptions chan subscribe,
814830
numWorkers int,
815831
firstHeartbeat chan struct{},
@@ -822,6 +838,7 @@ func newQuerier(ctx context.Context,
822838
pubsub: ps,
823839
store: store,
824840
newConnections: newConnections,
841+
closeConnections: closeConnections,
825842
subscriptions: subscriptions,
826843
workQ: newWorkQ[mKey](ctx),
827844
heartbeats: newHeartbeats(ctx, logger, ps, store, self, updates, firstHeartbeat),
@@ -860,6 +877,9 @@ func (q *querier) handleIncoming() {
860877
panic(fmt.Sprint("unreachable: invalid queue kind ", c.Kind()))
861878
}
862879

880+
case c := <-q.closeConnections:
881+
q.cleanupConn(c)
882+
863883
case sub := <-q.subscriptions:
864884
if sub.active {
865885
q.newClientSubscription(sub.q, sub.agentID)
@@ -903,7 +923,6 @@ func (q *querier) newAgentConn(c agpl.Queue) {
903923
}
904924
cm.count++
905925
q.conns[c.UniqueID()] = c
906-
go q.waitCleanupConn(c)
907926
}
908927

909928
func (q *querier) newClientSubscription(c agpl.Queue, agentID uuid.UUID) {
@@ -981,12 +1000,6 @@ func (q *querier) newClientConn(c agpl.Queue) {
9811000
}
9821001

9831002
q.conns[c.UniqueID()] = c
984-
go q.waitCleanupConn(c)
985-
}
986-
987-
func (q *querier) waitCleanupConn(c agpl.Queue) {
988-
<-c.Done()
989-
q.cleanupConn(c)
9901003
}
9911004

9921005
func (q *querier) cleanupConn(c agpl.Queue) {

0 commit comments

Comments
 (0)