-
Notifications
You must be signed in to change notification settings - Fork 899
feat: add single tailnet support to pgcoord #9351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
a20d318
efa2071
2976c00
a7b39df
618b0e0
f50f929
3a5bb76
3ddc783
c84626a
a0cb904
dcb007d
bdd7ef1
751b22f
3af1af1
7762a73
08501e6
eb681ff
390e837
a1c3acf
8256670
a75f6f1
d143d2d
036094f
e8a2b01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,7 +110,10 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store | |
logger = logger.Named("pgcoord").With(slog.F("coordinator_id", id)) | ||
bCh := make(chan binding) | ||
cCh := make(chan agpl.Queue) | ||
// for communicating subscriptions with the subscriber | ||
sCh := make(chan subscribe) | ||
// for communicating subscriptions with the querier | ||
qsCh := make(chan subscribe) | ||
// signals when first heartbeat has been sent, so it's safe to start binding. | ||
fHB := make(chan struct{}) | ||
|
||
|
@@ -123,10 +126,10 @@ func NewPGCoord(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store | |
binder: newBinder(ctx, logger, id, store, bCh, fHB), | ||
bindings: bCh, | ||
newConnections: cCh, | ||
subscriber: newSubscriber(ctx, logger, id, store, sCh, fHB), | ||
subscriber: newSubscriber(ctx, logger, id, store, sCh, qsCh, fHB), | ||
newSubscriptions: sCh, | ||
id: id, | ||
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, numQuerierWorkers, fHB), | ||
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, qsCh, numQuerierWorkers, fHB), | ||
closed: make(chan struct{}), | ||
} | ||
logger.Info(ctx, "starting coordinator") | ||
|
@@ -160,11 +163,11 @@ func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn { | |
} | ||
if err := sendCtx(c.ctx, c.newSubscriptions, subscribe{ | ||
sKey: sKey{clientID: id}, | ||
q: enq, | ||
active: false, | ||
}); err != nil { | ||
c.logger.Debug(c.ctx, "parent context expired while withdrawing subscriptions", slog.Error(err)) | ||
} | ||
c.querier.cleanupConn(enq) | ||
}, | ||
}).Init() | ||
|
||
|
@@ -184,13 +187,12 @@ func (c *pgCoord) addSubscription(q agpl.Queue, agentID uuid.UUID) error { | |
clientID: q.UniqueID(), | ||
agentID: agentID, | ||
}, | ||
q: q, | ||
active: true, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
c.querier.newClientSubscription(q, agentID) | ||
return nil | ||
} | ||
|
||
|
@@ -200,13 +202,12 @@ func (c *pgCoord) removeSubscription(q agpl.Queue, agentID uuid.UUID) error { | |
clientID: q.UniqueID(), | ||
agentID: agentID, | ||
}, | ||
q: q, | ||
active: false, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
c.querier.removeClientSubscription(q, agentID) | ||
return nil | ||
} | ||
|
||
|
@@ -307,6 +308,8 @@ type sKey struct { | |
|
||
type subscribe struct { | ||
sKey | ||
|
||
q agpl.Queue | ||
// whether the subscription should be active. if true, the subscription is | ||
// added. if false, the subscription is removed. | ||
active bool | ||
|
@@ -318,6 +321,7 @@ type subscriber struct { | |
coordinatorID uuid.UUID | ||
store database.Store | ||
subscriptions <-chan subscribe | ||
querierCh chan<- subscribe | ||
|
||
mu sync.Mutex | ||
// map[clientID]map[agentID]subscribe | ||
|
@@ -330,6 +334,7 @@ func newSubscriber(ctx context.Context, | |
id uuid.UUID, | ||
store database.Store, | ||
subscriptions <-chan subscribe, | ||
querierCh chan<- subscribe, | ||
startWorkers <-chan struct{}, | ||
) *subscriber { | ||
s := &subscriber{ | ||
|
@@ -338,6 +343,7 @@ func newSubscriber(ctx context.Context, | |
coordinatorID: id, | ||
store: store, | ||
subscriptions: subscriptions, | ||
querierCh: querierCh, | ||
latest: make(map[uuid.UUID]map[uuid.UUID]subscribe), | ||
workQ: newWorkQ[sKey](ctx), | ||
} | ||
|
@@ -360,6 +366,7 @@ func (s *subscriber) handleSubscriptions() { | |
case sub := <-s.subscriptions: | ||
s.storeSubscription(sub) | ||
s.workQ.enqueue(sub.sKey) | ||
s.querierCh <- sub | ||
} | ||
} | ||
} | ||
|
@@ -784,6 +791,7 @@ type querier struct { | |
store database.Store | ||
|
||
newConnections chan agpl.Queue | ||
subscriptions chan subscribe | ||
|
||
workQ *workQ[mKey] | ||
|
||
|
@@ -812,6 +820,7 @@ func newQuerier(ctx context.Context, | |
store database.Store, | ||
self uuid.UUID, | ||
newConnections chan agpl.Queue, | ||
subscriptions chan subscribe, | ||
numWorkers int, | ||
firstHeartbeat chan struct{}, | ||
) *querier { | ||
|
@@ -823,6 +832,7 @@ func newQuerier(ctx context.Context, | |
pubsub: ps, | ||
store: store, | ||
newConnections: newConnections, | ||
subscriptions: subscriptions, | ||
workQ: newWorkQ[mKey](ctx), | ||
heartbeats: newHeartbeats(ctx, logger, ps, store, self, updates, firstHeartbeat), | ||
mappers: make(map[mKey]*countedMapper), | ||
|
@@ -835,7 +845,7 @@ func newQuerier(ctx context.Context, | |
|
||
go func() { | ||
<-firstHeartbeat | ||
go q.handleNewConnections() | ||
go q.handleIncoming() | ||
for i := 0; i < numWorkers; i++ { | ||
go q.worker() | ||
} | ||
|
@@ -844,11 +854,12 @@ func newQuerier(ctx context.Context, | |
return q | ||
} | ||
|
||
func (q *querier) handleNewConnections() { | ||
func (q *querier) handleIncoming() { | ||
for { | ||
select { | ||
case <-q.ctx.Done(): | ||
return | ||
|
||
case c := <-q.newConnections: | ||
switch c.Kind() { | ||
case agpl.QueueKindAgent: | ||
|
@@ -858,6 +869,13 @@ func (q *querier) handleNewConnections() { | |
default: | ||
panic(fmt.Sprint("unreachable: invalid queue kind ", c.Kind())) | ||
} | ||
|
||
case sub := <-q.subscriptions: | ||
if sub.active { | ||
q.newClientSubscription(sub.q, sub.agentID) | ||
} else { | ||
q.removeClientSubscription(sub.q, sub.agentID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you send a subscription with agentID unset on remove. The subscriber handles this case, but then it forwards it to the querier which I don't think handles the case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's technically handled by checking the |
||
} | ||
} | ||
} | ||
} | ||
|
@@ -905,6 +923,11 @@ func (q *querier) newClientSubscription(c agpl.Queue, agentID uuid.UUID) { | |
if _, ok := q.clientSubscriptions[c.UniqueID()]; !ok { | ||
q.clientSubscriptions[c.UniqueID()] = map[uuid.UUID]struct{}{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a memory leak: you add to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You still never delete anything from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whoops, I think I got this now. |
||
} | ||
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID) | ||
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID) | ||
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID) | ||
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID) | ||
fmt.Println("CREATEDC SUBSCRIPTION", c.UniqueID(), agentID) | ||
|
||
mk := mKey{ | ||
agent: agentID, | ||
|
@@ -934,6 +957,12 @@ func (q *querier) removeClientSubscription(c agpl.Queue, agentID uuid.UUID) { | |
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
|
||
// agentID: uuid.Nil indicates that a client is going away. The querier | ||
// handles that in cleanupConn below instead. | ||
if agentID == uuid.Nil { | ||
return | ||
} | ||
|
||
mk := mKey{ | ||
agent: agentID, | ||
kind: agpl.QueueKindClient, | ||
|
@@ -948,6 +977,9 @@ func (q *querier) removeClientSubscription(c agpl.Queue, agentID uuid.UUID) { | |
cm.cancel() | ||
delete(q.mappers, mk) | ||
} | ||
if len(q.clientSubscriptions[c.UniqueID()]) == 0 { | ||
delete(q.clientSubscriptions, c.UniqueID()) | ||
} | ||
} | ||
|
||
func (q *querier) newClientConn(c agpl.Queue) { | ||
|
@@ -982,18 +1014,17 @@ func (q *querier) cleanupConn(c agpl.Queue) { | |
agent: agentID, | ||
kind: c.Kind(), | ||
} | ||
cm, ok := q.mappers[mk] | ||
if ok { | ||
if err := sendCtx(cm.ctx, cm.del, c); err != nil { | ||
continue | ||
} | ||
cm.count-- | ||
if cm.count == 0 { | ||
cm.cancel() | ||
delete(q.mappers, mk) | ||
} | ||
cm := q.mappers[mk] | ||
if err := sendCtx(cm.ctx, cm.del, c); err != nil { | ||
continue | ||
} | ||
cm.count-- | ||
if cm.count == 0 { | ||
cm.cancel() | ||
delete(q.mappers, mk) | ||
} | ||
} | ||
coadler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
delete(q.clientSubscriptions, c.UniqueID()) | ||
|
||
mk := mKey{ | ||
agent: c.UniqueID(), | ||
|
@@ -1190,28 +1221,26 @@ func (q *querier) listenClient(_ context.Context, msg []byte, err error) { | |
q.logger.Warn(q.ctx, "unhandled pubsub error", slog.Error(err)) | ||
return | ||
} | ||
client, agents, err := parseClientUpdate(string(msg)) | ||
client, agent, err := parseClientUpdate(string(msg)) | ||
if err != nil { | ||
q.logger.Error(q.ctx, "failed to parse client update", slog.F("msg", string(msg)), slog.Error(err)) | ||
return | ||
} | ||
logger := q.logger.With(slog.F("client_id", client)) | ||
logger := q.logger.With(slog.F("client_id", client), slog.F("agent_id", agent)) | ||
logger.Debug(q.ctx, "got client update") | ||
for _, agentID := range agents { | ||
logger := q.logger.With(slog.F("agent_id", agentID)) | ||
mk := mKey{ | ||
agent: agentID, | ||
kind: agpl.QueueKindAgent, | ||
} | ||
q.mu.Lock() | ||
_, ok := q.mappers[mk] | ||
q.mu.Unlock() | ||
if !ok { | ||
logger.Debug(q.ctx, "ignoring update because we have no mapper") | ||
return | ||
} | ||
q.workQ.enqueue(mk) | ||
|
||
mk := mKey{ | ||
agent: agent, | ||
kind: agpl.QueueKindAgent, | ||
} | ||
q.mu.Lock() | ||
_, ok := q.mappers[mk] | ||
q.mu.Unlock() | ||
if !ok { | ||
logger.Debug(q.ctx, "ignoring update because we have no mapper") | ||
return | ||
} | ||
q.workQ.enqueue(mk) | ||
} | ||
|
||
func (q *querier) listenAgent(_ context.Context, msg []byte, err error) { | ||
|
@@ -1348,27 +1377,22 @@ func (q *querier) getAll(ctx context.Context) (map[uuid.UUID]database.TailnetAge | |
return agentsMap, clientsMap, nil | ||
} | ||
|
||
func parseClientUpdate(msg string) (client uuid.UUID, agents []uuid.UUID, err error) { | ||
func parseClientUpdate(msg string) (client, agent uuid.UUID, err error) { | ||
parts := strings.Split(msg, ",") | ||
if len(parts) != 2 { | ||
return uuid.Nil, nil, xerrors.Errorf("expected 2 parts separated by comma") | ||
return uuid.Nil, uuid.Nil, xerrors.Errorf("expected 2 parts separated by comma") | ||
} | ||
client, err = uuid.Parse(parts[0]) | ||
if err != nil { | ||
return uuid.Nil, nil, xerrors.Errorf("failed to parse client UUID: %w", err) | ||
return uuid.Nil, uuid.Nil, xerrors.Errorf("failed to parse client UUID: %w", err) | ||
} | ||
|
||
agents = []uuid.UUID{} | ||
for _, agentStr := range parts[1:] { | ||
agent, err := uuid.Parse(agentStr) | ||
if err != nil { | ||
return uuid.Nil, nil, xerrors.Errorf("failed to parse agent UUID: %w", err) | ||
} | ||
|
||
agents = append(agents, agent) | ||
agent, err = uuid.Parse(parts[1]) | ||
if err != nil { | ||
return uuid.Nil, uuid.Nil, xerrors.Errorf("failed to parse agent UUID: %w", err) | ||
} | ||
|
||
return client, agents, nil | ||
return client, agent, nil | ||
} | ||
|
||
func parseUpdateMessage(msg string) (agent uuid.UUID, err error) { | ||
|
Uh oh!
There was an error while loading. Please reload this page.