Skip to content

feat: add single tailnet support to pgcoord #9351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 21, 2023
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a20d318
feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
efa2071
fixup! feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
2976c00
fixup! feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
a7b39df
separate table + use channels
coadler Aug 31, 2023
618b0e0
fix migrations
coadler Sep 7, 2023
f50f929
fixup! fix migrations
coadler Sep 7, 2023
3a5bb76
fixup! fix migrations
coadler Sep 8, 2023
3ddc783
fixup! fix migrations
coadler Sep 8, 2023
c84626a
Merge branch 'main' into colin/single-pgcoord
coadler Sep 13, 2023
a0cb904
Merge branch 'main' into colin/single-pgcoord
coadler Sep 14, 2023
dcb007d
add subscriber subsystem
coadler Sep 15, 2023
bdd7ef1
fixup! add subscriber subsystem
coadler Sep 15, 2023
751b22f
fixup! add subscriber subsystem
coadler Sep 15, 2023
3af1af1
fixup! add subscriber subsystem
coadler Sep 15, 2023
7762a73
querier <- subscriber
coadler Sep 19, 2023
08501e6
Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
eb681ff
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
390e837
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
a1c3acf
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
8256670
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
a75f6f1
add extensive multiagent tests
coadler Sep 20, 2023
d143d2d
use dedicated channels for querier subscribe and closing conns
coadler Sep 20, 2023
036094f
fixup! use dedicated channels for querier subscribe and closing conns
coadler Sep 20, 2023
e8a2b01
fixup! use dedicated channels for querier subscribe and closing conns
coadler Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
querier <- subscriber
  • Loading branch information
coadler committed Sep 19, 2023
commit 7762a7393eb80d5e60dcae639dcb736adc18bc02
118 changes: 71 additions & 47 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store
logger = logger.Named("pgcoord").With(slog.F("coordinator_id", id))
bCh := make(chan binding)
cCh := make(chan agpl.Queue)
// for communicating subscriptions with the subscriber
sCh := make(chan subscribe)
// for communicating subscriptions with the querier
qsCh := make(chan subscribe)
// signals when first heartbeat has been sent, so it's safe to start binding.
fHB := make(chan struct{})

Expand All @@ -123,10 +126,10 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store
binder: newBinder(ctx, logger, id, store, bCh, fHB),
bindings: bCh,
newConnections: cCh,
subscriber: newSubscriber(ctx, logger, id, store, sCh, fHB),
subscriber: newSubscriber(ctx, logger, id, store, sCh, qsCh, fHB),
newSubscriptions: sCh,
id: id,
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, numQuerierWorkers, fHB),
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, qsCh, numQuerierWorkers, fHB),
closed: make(chan struct{}),
}
logger.Info(ctx, "starting coordinator")
Expand Down Expand Up @@ -160,11 +163,11 @@ func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
}
if err := sendCtx(c.ctx, c.newSubscriptions, subscribe{
sKey: sKey{clientID: id},
q: enq,
active: false,
}); err != nil {
c.logger.Debug(c.ctx, "parent context expired while withdrawing subscriptions", slog.Error(err))
}
c.querier.cleanupConn(enq)
},
}).Init()

Expand All @@ -184,13 +187,12 @@ func (c *pgCoord) addSubscription(q agpl.Queue, agentID uuid.UUID) error {
clientID: q.UniqueID(),
agentID: agentID,
},
q: q,
active: true,
})
if err != nil {
return err
}

c.querier.newClientSubscription(q, agentID)
return nil
}

Expand All @@ -200,13 +202,12 @@ func (c *pgCoord) removeSubscription(q agpl.Queue, agentID uuid.UUID) error {
clientID: q.UniqueID(),
agentID: agentID,
},
q: q,
active: false,
})
if err != nil {
return err
}

c.querier.removeClientSubscription(q, agentID)
return nil
}

Expand Down Expand Up @@ -307,6 +308,8 @@ type sKey struct {

type subscribe struct {
sKey

q agpl.Queue
// whether the subscription should be active. if true, the subscription is
// added. if false, the subscription is removed.
active bool
Expand All @@ -318,6 +321,7 @@ type subscriber struct {
coordinatorID uuid.UUID
store database.Store
subscriptions <-chan subscribe
querierCh chan<- subscribe

mu sync.Mutex
// map[clientID]map[agentID]subscribe
Expand All @@ -330,6 +334,7 @@ func newSubscriber(ctx context.Context,
id uuid.UUID,
store database.Store,
subscriptions <-chan subscribe,
querierCh chan<- subscribe,
startWorkers <-chan struct{},
) *subscriber {
s := &subscriber{
Expand All @@ -338,6 +343,7 @@ func newSubscriber(ctx context.Context,
coordinatorID: id,
store: store,
subscriptions: subscriptions,
querierCh: querierCh,
latest: make(map[uuid.UUID]map[uuid.UUID]subscribe),
workQ: newWorkQ[sKey](ctx),
}
Expand All @@ -360,6 +366,7 @@ func (s *subscriber) handleSubscriptions() {
case sub := <-s.subscriptions:
s.storeSubscription(sub)
s.workQ.enqueue(sub.sKey)
s.querierCh <- sub
}
}
}
Expand Down Expand Up @@ -784,6 +791,7 @@ type querier struct {
store database.Store

newConnections chan agpl.Queue
subscriptions chan subscribe

workQ *workQ[mKey]

Expand Down Expand Up @@ -812,6 +820,7 @@ func newQuerier(ctx context.Context,
store database.Store,
self uuid.UUID,
newConnections chan agpl.Queue,
subscriptions chan subscribe,
numWorkers int,
firstHeartbeat chan struct{},
) *querier {
Expand All @@ -823,6 +832,7 @@ func newQuerier(ctx context.Context,
pubsub: ps,
store: store,
newConnections: newConnections,
subscriptions: subscriptions,
workQ: newWorkQ[mKey](ctx),
heartbeats: newHeartbeats(ctx, logger, ps, store, self, updates, firstHeartbeat),
mappers: make(map[mKey]*countedMapper),
Expand All @@ -835,7 +845,7 @@ func newQuerier(ctx context.Context,

go func() {
<-firstHeartbeat
go q.handleNewConnections()
go q.handleIncoming()
for i := 0; i < numWorkers; i++ {
go q.worker()
}
Expand All @@ -844,11 +854,12 @@ func newQuerier(ctx context.Context,
return q
}

func (q *querier) handleNewConnections() {
func (q *querier) handleIncoming() {
for {
select {
case <-q.ctx.Done():
return

case c := <-q.newConnections:
switch c.Kind() {
case agpl.QueueKindAgent:
Expand All @@ -858,6 +869,13 @@ func (q *querier) handleNewConnections() {
default:
panic(fmt.Sprint("unreachable: invalid queue kind ", c.Kind()))
}

case sub := <-q.subscriptions:
if sub.active {
q.newClientSubscription(sub.q, sub.agentID)
} else {
q.removeClientSubscription(sub.q, sub.agentID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you send a subscription with agentID unset on remove. The subscriber handles this case, but then it forwards it to the querier which I don't think handles the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's technically handled by checking the clientSubscriptions map, since there will never be an agent subscription with uuid.Nil. I had an explicit check but just replaced it with the map lookup, since it essentially does the same thing. Might be good to comment.

}
}
}
}
Expand Down Expand Up @@ -905,6 +923,11 @@ func (q *querier) newClientSubscription(c agpl.Queue, agentID uuid.UUID) {
if _, ok := q.clientSubscriptions[c.UniqueID()]; !ok {
q.clientSubscriptions[c.UniqueID()] = map[uuid.UUID]struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a memory leak: you add to q.clientSubscriptions but never remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still never delete anything from q.clientSubscriptions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, I think I got this now.

}
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID)
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID)
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID)
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID)
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID)

mk := mKey{
agent: agentID,
Expand Down Expand Up @@ -934,6 +957,12 @@ func (q *querier) removeClientSubscription(c agpl.Queue, agentID uuid.UUID) {
q.mu.Lock()
defer q.mu.Unlock()

// agentID: uuid.Nil indicates that a client is going away. The querier
// handles that in cleanupConn below instead.
if agentID == uuid.Nil {
return
}

mk := mKey{
agent: agentID,
kind: agpl.QueueKindClient,
Expand All @@ -948,6 +977,9 @@ func (q *querier) removeClientSubscription(c agpl.Queue, agentID uuid.UUID) {
cm.cancel()
delete(q.mappers, mk)
}
if len(q.clientSubscriptions[c.UniqueID()]) == 0 {
delete(q.clientSubscriptions, c.UniqueID())
}
}

func (q *querier) newClientConn(c agpl.Queue) {
Expand Down Expand Up @@ -982,18 +1014,17 @@ func (q *querier) cleanupConn(c agpl.Queue) {
agent: agentID,
kind: c.Kind(),
}
cm, ok := q.mappers[mk]
if ok {
if err := sendCtx(cm.ctx, cm.del, c); err != nil {
continue
}
cm.count--
if cm.count == 0 {
cm.cancel()
delete(q.mappers, mk)
}
cm := q.mappers[mk]
if err := sendCtx(cm.ctx, cm.del, c); err != nil {
continue
}
cm.count--
if cm.count == 0 {
cm.cancel()
delete(q.mappers, mk)
}
}
delete(q.clientSubscriptions, c.UniqueID())

mk := mKey{
agent: c.UniqueID(),
Expand Down Expand Up @@ -1190,28 +1221,26 @@ func (q *querier) listenClient(_ context.Context, msg []byte, err error) {
q.logger.Warn(q.ctx, "unhandled pubsub error", slog.Error(err))
return
}
client, agents, err := parseClientUpdate(string(msg))
client, agent, err := parseClientUpdate(string(msg))
if err != nil {
q.logger.Error(q.ctx, "failed to parse client update", slog.F("msg", string(msg)), slog.Error(err))
return
}
logger := q.logger.With(slog.F("client_id", client))
logger := q.logger.With(slog.F("client_id", client), slog.F("agent_id", agent))
logger.Debug(q.ctx, "got client update")
for _, agentID := range agents {
logger := q.logger.With(slog.F("agent_id", agentID))
mk := mKey{
agent: agentID,
kind: agpl.QueueKindAgent,
}
q.mu.Lock()
_, ok := q.mappers[mk]
q.mu.Unlock()
if !ok {
logger.Debug(q.ctx, "ignoring update because we have no mapper")
return
}
q.workQ.enqueue(mk)

mk := mKey{
agent: agent,
kind: agpl.QueueKindAgent,
}
q.mu.Lock()
_, ok := q.mappers[mk]
q.mu.Unlock()
if !ok {
logger.Debug(q.ctx, "ignoring update because we have no mapper")
return
}
q.workQ.enqueue(mk)
}

func (q *querier) listenAgent(_ context.Context, msg []byte, err error) {
Expand Down Expand Up @@ -1348,27 +1377,22 @@ func (q *querier) getAll(ctx context.Context) (map[uuid.UUID]database.TailnetAge
return agentsMap, clientsMap, nil
}

func parseClientUpdate(msg string) (client uuid.UUID, agents []uuid.UUID, err error) {
func parseClientUpdate(msg string) (client, agent uuid.UUID, err error) {
parts := strings.Split(msg, ",")
if len(parts) != 2 {
return uuid.Nil, nil, xerrors.Errorf("expected 2 parts separated by comma")
return uuid.Nil, uuid.Nil, xerrors.Errorf("expected 2 parts separated by comma")
}
client, err = uuid.Parse(parts[0])
if err != nil {
return uuid.Nil, nil, xerrors.Errorf("failed to parse client UUID: %w", err)
return uuid.Nil, uuid.Nil, xerrors.Errorf("failed to parse client UUID: %w", err)
}

agents = []uuid.UUID{}
for _, agentStr := range parts[1:] {
agent, err := uuid.Parse(agentStr)
if err != nil {
return uuid.Nil, nil, xerrors.Errorf("failed to parse agent UUID: %w", err)
}

agents = append(agents, agent)
agent, err = uuid.Parse(parts[1])
if err != nil {
return uuid.Nil, uuid.Nil, xerrors.Errorf("failed to parse agent UUID: %w", err)
}

return client, agents, nil
return client, agent, nil
}

func parseUpdateMessage(msg string) (agent uuid.UUID, err error) {
Expand Down