Skip to content

feat: add single tailnet support to pgcoord #9351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a20d318
feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
efa2071
fixup! feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
2976c00
fixup! feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
a7b39df
separate table + use channels
coadler Aug 31, 2023
618b0e0
fix migrations
coadler Sep 7, 2023
f50f929
fixup! fix migrations
coadler Sep 7, 2023
3a5bb76
fixup! fix migrations
coadler Sep 8, 2023
3ddc783
fixup! fix migrations
coadler Sep 8, 2023
c84626a
Merge branch 'main' into colin/single-pgcoord
coadler Sep 13, 2023
a0cb904
Merge branch 'main' into colin/single-pgcoord
coadler Sep 14, 2023
dcb007d
add subscriber subsystem
coadler Sep 15, 2023
bdd7ef1
fixup! add subscriber subsystem
coadler Sep 15, 2023
751b22f
fixup! add subscriber subsystem
coadler Sep 15, 2023
3af1af1
fixup! add subscriber subsystem
coadler Sep 15, 2023
7762a73
querier <- subscriber
coadler Sep 19, 2023
08501e6
Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
eb681ff
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
390e837
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
a1c3acf
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
8256670
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
a75f6f1
add extensive multiagent tests
coadler Sep 20, 2023
d143d2d
use dedicated channels for querier subscribe and closing conns
coadler Sep 20, 2023
036094f
fixup! use dedicated channels for querier subscribe and closing conns
coadler Sep 20, 2023
e8a2b01
fixup! use dedicated channels for querier subscribe and closing conns
coadler Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use dedicated channels for querier subscribe and closing conns
  • Loading branch information
coadler committed Sep 20, 2023
commit d143d2d9930de26d9fd6aa72d36d0f2418d12bf2
2 changes: 1 addition & 1 deletion enterprise/tailnet/multiagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestPGCoordinator_MultiAgent_TwoAgents(t *testing.T) {
agent2.sendNode(&agpl.Node{PreferredDERP: 6})

id := uuid.New()
ma1 := coord2.ServeMultiAgent(id)
ma1 := coord3.ServeMultiAgent(id)
defer ma1.Close()

err = ma1.SubscribeAgent(agent1.id)
Expand Down
77 changes: 45 additions & 32 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ type pgCoord struct {

bindings chan binding
newConnections chan agpl.Queue
newSubscriptions chan subscribe
closeConnections chan agpl.Queue
subscriberCh chan subscribe
querierSubCh chan subscribe
id uuid.UUID

cancel context.CancelFunc
Expand Down Expand Up @@ -109,7 +111,10 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store
id := uuid.New()
logger = logger.Named("pgcoord").With(slog.F("coordinator_id", id))
bCh := make(chan binding)
// used for opening connections
cCh := make(chan agpl.Queue)
// used for closing connections
ccCh := make(chan agpl.Queue)
// for communicating subscriptions with the subscriber
sCh := make(chan subscribe)
// for communicating subscriptions with the querier
Expand All @@ -126,10 +131,12 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store
binder: newBinder(ctx, logger, id, store, bCh, fHB),
bindings: bCh,
newConnections: cCh,
subscriber: newSubscriber(ctx, logger, id, store, sCh, qsCh, fHB),
newSubscriptions: sCh,
closeConnections: ccCh,
subscriber: newSubscriber(ctx, logger, id, store, sCh, fHB),
subscriberCh: sCh,
querierSubCh: qsCh,
id: id,
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, qsCh, numQuerierWorkers, fHB),
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, qsCh, numQuerierWorkers, fHB),
closed: make(chan struct{}),
}
logger.Info(ctx, "starting coordinator")
Expand All @@ -152,22 +159,18 @@ func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
})
},
OnRemove: func(enq agpl.Queue) {
b := binding{
_ = sendCtx(c.ctx, c.bindings, binding{
bKey: bKey{
id: enq.UniqueID(),
kind: enq.Kind(),
},
}
if err := sendCtx(c.ctx, c.bindings, b); err != nil {
c.logger.Debug(c.ctx, "parent context expired while withdrawing binding", slog.Error(err))
}
if err := sendCtx(c.ctx, c.newSubscriptions, subscribe{
})
_ = sendCtx(c.ctx, c.subscriberCh, subscribe{
sKey: sKey{clientID: id},
q: enq,
active: false,
}); err != nil {
c.logger.Debug(c.ctx, "parent context expired while withdrawing subscriptions", slog.Error(err))
}
})
_ = sendCtx(c.ctx, c.closeConnections, enq)
},
}).Init()

Expand All @@ -182,32 +185,44 @@ func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
}

func (c *pgCoord) addSubscription(q agpl.Queue, agentID uuid.UUID) error {
err := sendCtx(c.ctx, c.newSubscriptions, subscribe{
sub := subscribe{
sKey: sKey{
clientID: q.UniqueID(),
agentID: agentID,
},
q: q,
active: true,
})
if err != nil {
}
if err := sendCtx(c.ctx, c.subscriberCh, sub); err != nil {
return err
}
if err := sendCtx(c.ctx, c.querierSubCh, sub); err != nil {
// There's no need to clean up the sub sent to the subscriber if this
// fails, since it means the entire coordinator is being torn down.
return err
}

return nil
}

func (c *pgCoord) removeSubscription(q agpl.Queue, agentID uuid.UUID) error {
err := sendCtx(c.ctx, c.newSubscriptions, subscribe{
sub := subscribe{
sKey: sKey{
clientID: q.UniqueID(),
agentID: agentID,
},
q: q,
active: false,
})
if err != nil {
}
if err := sendCtx(c.ctx, c.subscriberCh, sub); err != nil {
return err
}
if err := sendCtx(c.ctx, c.querierSubCh, sub); err != nil {
// There's no need to clean up the sub sent to the subscriber if this
// fails, since it means the entire coordinator is being torn down.
return err
}

return nil
}

Expand Down Expand Up @@ -247,6 +262,7 @@ func (c *pgCoord) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) erro
// can only be a context error, no need to log here.
return err
}
defer func() { _ = sendCtx(c.ctx, c.closeConnections, agpl.Queue(cIO)) }()

if err := c.addSubscription(cIO, agent); err != nil {
return err
Expand All @@ -271,6 +287,8 @@ func (c *pgCoord) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
// can only be a context error, no need to log here.
return err
}
defer func() { _ = sendCtx(c.ctx, c.closeConnections, agpl.Queue(cIO)) }()

<-cIO.ctx.Done()
return nil
}
Expand Down Expand Up @@ -311,7 +329,6 @@ type subscriber struct {
coordinatorID uuid.UUID
store database.Store
subscriptions <-chan subscribe
querierCh chan<- subscribe

mu sync.Mutex
// map[clientID]map[agentID]subscribe
Expand All @@ -324,7 +341,6 @@ func newSubscriber(ctx context.Context,
id uuid.UUID,
store database.Store,
subscriptions <-chan subscribe,
querierCh chan<- subscribe,
startWorkers <-chan struct{},
) *subscriber {
s := &subscriber{
Expand All @@ -333,7 +349,6 @@ func newSubscriber(ctx context.Context,
coordinatorID: id,
store: store,
subscriptions: subscriptions,
querierCh: querierCh,
latest: make(map[uuid.UUID]map[uuid.UUID]subscribe),
workQ: newWorkQ[sKey](ctx),
}
Expand All @@ -356,7 +371,6 @@ func (s *subscriber) handleSubscriptions() {
case sub := <-s.subscriptions:
s.storeSubscription(sub)
s.workQ.enqueue(sub.sKey)
s.querierCh <- sub
}
}
}
Expand Down Expand Up @@ -780,8 +794,9 @@ type querier struct {
pubsub pubsub.Pubsub
store database.Store

newConnections chan agpl.Queue
subscriptions chan subscribe
newConnections chan agpl.Queue
closeConnections chan agpl.Queue
subscriptions chan subscribe

workQ *workQ[mKey]

Expand Down Expand Up @@ -810,6 +825,7 @@ func newQuerier(ctx context.Context,
store database.Store,
self uuid.UUID,
newConnections chan agpl.Queue,
closeConnections chan agpl.Queue,
subscriptions chan subscribe,
numWorkers int,
firstHeartbeat chan struct{},
Expand All @@ -822,6 +838,7 @@ func newQuerier(ctx context.Context,
pubsub: ps,
store: store,
newConnections: newConnections,
closeConnections: closeConnections,
subscriptions: subscriptions,
workQ: newWorkQ[mKey](ctx),
heartbeats: newHeartbeats(ctx, logger, ps, store, self, updates, firstHeartbeat),
Expand Down Expand Up @@ -860,6 +877,9 @@ func (q *querier) handleIncoming() {
panic(fmt.Sprint("unreachable: invalid queue kind ", c.Kind()))
}

case c := <-q.closeConnections:
q.cleanupConn(c)

case sub := <-q.subscriptions:
if sub.active {
q.newClientSubscription(sub.q, sub.agentID)
Expand Down Expand Up @@ -903,7 +923,6 @@ func (q *querier) newAgentConn(c agpl.Queue) {
}
cm.count++
q.conns[c.UniqueID()] = c
go q.waitCleanupConn(c)
}

func (q *querier) newClientSubscription(c agpl.Queue, agentID uuid.UUID) {
Expand Down Expand Up @@ -981,12 +1000,6 @@ func (q *querier) newClientConn(c agpl.Queue) {
}

q.conns[c.UniqueID()] = c
go q.waitCleanupConn(c)
}

func (q *querier) waitCleanupConn(c agpl.Queue) {
<-c.Done()
q.cleanupConn(c)
}

func (q *querier) cleanupConn(c agpl.Queue) {
Expand Down