Skip to content

feat: add single tailnet support to pgcoord #9351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a20d318
feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
efa2071
fixup! feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
2976c00
fixup! feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
a7b39df
separate table + use channels
coadler Aug 31, 2023
618b0e0
fix migrations
coadler Sep 7, 2023
f50f929
fixup! fix migrations
coadler Sep 7, 2023
3a5bb76
fixup! fix migrations
coadler Sep 8, 2023
3ddc783
fixup! fix migrations
coadler Sep 8, 2023
c84626a
Merge branch 'main' into colin/single-pgcoord
coadler Sep 13, 2023
a0cb904
Merge branch 'main' into colin/single-pgcoord
coadler Sep 14, 2023
dcb007d
add subscriber subsystem
coadler Sep 15, 2023
bdd7ef1
fixup! add subscriber subsystem
coadler Sep 15, 2023
751b22f
fixup! add subscriber subsystem
coadler Sep 15, 2023
3af1af1
fixup! add subscriber subsystem
coadler Sep 15, 2023
7762a73
querier <- subscriber
coadler Sep 19, 2023
08501e6
Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
eb681ff
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
390e837
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
a1c3acf
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
8256670
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
a75f6f1
add extensive multiagent tests
coadler Sep 20, 2023
d143d2d
use dedicated channels for querier subscribe and closing conns
coadler Sep 20, 2023
036094f
fixup! use dedicated channels for querier subscribe and closing conns
coadler Sep 20, 2023
e8a2b01
fixup! use dedicated channels for querier subscribe and closing conns
coadler Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add subscriber subsystem
  • Loading branch information
coadler committed Sep 15, 2023
commit dcb007d87064449e95134dbebb9c7644077ee4cb
2 changes: 1 addition & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func New(options *Options) *API {
api.DERPMap,
options.DeploymentValues.DERP.Config.ForceWebSockets.Value(),
func(context.Context) (tailnet.MultiAgentConn, error) {
return (*api.TailnetCoordinator.Load()).ServeMultiAgent(uuid.New())
return (*api.TailnetCoordinator.Load()).ServeMultiAgent(uuid.New()), nil
},
wsconncache.New(api._dialWorkspaceAgentTailnet, 0),
api.TracerProvider,
Expand Down
11 changes: 9 additions & 2 deletions coderd/database/dbauthz/dbauthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,13 +774,20 @@ func (q *querier) DeleteTailnetClient(ctx context.Context, arg database.DeleteTa
return q.db.DeleteTailnetClient(ctx, arg)
}

func (q *querier) DeleteTailnetClientSubscription(ctx context.Context, arg database.DeleteTailnetClientSubscriptionParams) (database.DeleteTailnetClientSubscriptionRow, error) {
func (q *querier) DeleteTailnetClientSubscription(ctx context.Context, arg database.DeleteTailnetClientSubscriptionParams) error {
if err := q.authorizeContext(ctx, rbac.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
return database.DeleteTailnetClientSubscriptionRow{}, err
return err
}
return q.db.DeleteTailnetClientSubscription(ctx, arg)
}

func (q *querier) DeleteAllTailnetClientSubscriptions(ctx context.Context, arg database.DeleteAllTailnetClientSubscriptionsParams) error {
if err := q.authorizeContext(ctx, rbac.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
return err
}
return q.db.DeleteAllTailnetClientSubscriptions(ctx, arg)
}

func (q *querier) GetAPIKeyByID(ctx context.Context, id string) (database.APIKey, error) {
return fetch(q.log, q.auth, q.db.GetAPIKeyByID)(ctx, id)
}
Expand Down
13 changes: 11 additions & 2 deletions coderd/database/dbfake/dbfake.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,15 @@ func (q *FakeQuerier) DeleteAPIKeysByUserID(_ context.Context, userID uuid.UUID)
return nil
}

func (q *FakeQuerier) DeleteAllTailnetClientSubscriptions(ctx context.Context, arg database.DeleteAllTailnetClientSubscriptionsParams) error {
err := validateDatabaseType(arg)
if err != nil {
return err
}

panic("not implemented")
}

func (q *FakeQuerier) DeleteApplicationConnectAPIKeysByUserID(_ context.Context, userID uuid.UUID) error {
q.mutex.Lock()
defer q.mutex.Unlock()
Expand Down Expand Up @@ -977,8 +986,8 @@ func (*FakeQuerier) DeleteTailnetClient(context.Context, database.DeleteTailnetC
return database.DeleteTailnetClientRow{}, ErrUnimplemented
}

func (*FakeQuerier) DeleteTailnetClientSubscription(context.Context, database.DeleteTailnetClientSubscriptionParams) (database.DeleteTailnetClientSubscriptionRow, error) {
return database.DeleteTailnetClientSubscriptionRow{}, ErrUnimplemented
func (*FakeQuerier) DeleteTailnetClientSubscription(context.Context, database.DeleteTailnetClientSubscriptionParams) error {
return ErrUnimplemented
}

func (q *FakeQuerier) GetAPIKeyByID(_ context.Context, id string) (database.APIKey, error) {
Expand Down
13 changes: 10 additions & 3 deletions coderd/database/dbmetrics/dbmetrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 17 additions & 4 deletions coderd/database/dbmock/dbmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions coderd/database/foreign_key_constraint.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40,47 +40,42 @@ CREATE OR REPLACE FUNCTION tailnet_notify_client_change() RETURNS trigger
AS $$
DECLARE
var_client_id uuid;
var_coordinator_id uuid;
var_agent_ids uuid[];
var_agent_id uuid;
BEGIN
IF (NEW.id IS NOT NULL) THEN
var_client_id = NEW.id;
SELECT
array_agg(agent_id)
INTO
var_agent_ids
FROM
tailnet_client_subscriptions subs
WHERE
subs.client_id = NEW.id AND
subs.coordinator_id = NEW.coordinator_id;
var_coordinator_id = NEW.coordinator_id;
ELSIF (OLD.id IS NOT NULL) THEN
-- if new is null and old is not null, that means the row was deleted.
-- simulate a foreign key by deleting all of the subscriptions.
var_client_id = OLD.id;
WITH agent_ids AS (
DELETE FROM
tailnet_client_subscriptions subs
WHERE
subs.client_id = OLD.id AND
subs.coordinator_id = OLD.coordinator_id
RETURNING
subs.agent_id
)
SELECT
array_agg(agent_id)
INTO
var_agent_ids
FROM
agent_ids;
var_coordinator_id = OLD.coordinator_id;
END IF;

-- Read all agents the client is subscribed to, so we can notify them.
SELECT
array_agg(agent_id)
INTO
var_agent_ids
FROM
tailnet_client_subscriptions subs
WHERE
subs.client_id = NEW.id AND
subs.coordinator_id = NEW.coordinator_id;

-- No agents to notify
if (var_agent_ids IS NULL) THEN
return NULL;
END IF;

PERFORM pg_notify('tailnet_client_update', var_client_id || ',' || array_to_string(var_agent_ids, ','));
-- pg_notify is limited to 8k bytes, which is approximately 221 UUIDs.
-- Instead of sending all agent ids in a single update, send one for each
-- agent id to prevent overflow.
FOREACH var_agent_id IN ARRAY var_agent_ids
LOOP
PERFORM pg_notify('tailnet_client_update', var_client_id || ',' || var_agent_id);
END LOOP;

return NULL;
END;
$$;
Expand Down
3 changes: 2 additions & 1 deletion coderd/database/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 20 additions & 13 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions coderd/database/queries/tailnet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ FROM tailnet_clients
WHERE id = $1 and coordinator_id = $2
RETURNING id, coordinator_id;

-- name: DeleteTailnetClientSubscription :one
-- name: DeleteTailnetClientSubscription :exec
DELETE
FROM tailnet_client_subscriptions
WHERE client_id = $1 and agent_id = $2 and coordinator_id = $3
RETURNING client_id, agent_id, coordinator_id;
WHERE client_id = $1 and agent_id = $2 and coordinator_id = $3;

-- name: DeleteAllTailnetClientSubscriptions :exec
DELETE
FROM tailnet_client_subscriptions
WHERE client_id = $1 and coordinator_id = $2;

-- name: DeleteTailnetAgent :one
DELETE
Expand Down
2 changes: 1 addition & 1 deletion coderd/tailnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func setupAgent(t *testing.T, agentAddresses []netip.Prefix) (uuid.UUID, agent.A
derpServer,
func() *tailcfg.DERPMap { return manifest.DERPMap },
false,
func(context.Context) (tailnet.MultiAgentConn, error) { return coord.ServeMultiAgent(uuid.New()) },
func(context.Context) (tailnet.MultiAgentConn, error) { return coord.ServeMultiAgent(uuid.New()), nil },
cache,
trace.NewNoopTracerProvider(),
)
Expand Down
9 changes: 1 addition & 8 deletions enterprise/coderd/workspaceproxycoordinate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,7 @@ func (api *API) workspaceProxyCoordinate(rw http.ResponseWriter, r *http.Request
}

id := uuid.New()
sub, err := (*api.AGPL.TailnetCoordinator.Load()).ServeMultiAgent(id)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to serve multi agent.",
Detail: err.Error(),
})
return
}
sub := (*api.AGPL.TailnetCoordinator.Load()).ServeMultiAgent(id)

ctx, nc := websocketNetConn(ctx, conn, websocket.MessageText)
defer nc.Close()
Expand Down
6 changes: 2 additions & 4 deletions enterprise/coderd/workspaceproxycoordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func Test_agentIsLegacy(t *testing.T) {
defer cancel()

nodeID := uuid.New()
ma, err := coordinator.ServeMultiAgent(nodeID)
require.NoError(t, err)
ma := coordinator.ServeMultiAgent(nodeID)
defer ma.Close()
require.NoError(t, ma.UpdateSelf(&agpl.Node{
ID: 55,
Expand Down Expand Up @@ -124,8 +123,7 @@ func Test_agentIsLegacy(t *testing.T) {
defer cancel()

nodeID := uuid.New()
ma, err := coordinator.ServeMultiAgent(nodeID)
require.NoError(t, err)
ma := coordinator.ServeMultiAgent(nodeID)
defer ma.Close()
require.NoError(t, ma.UpdateSelf(&agpl.Node{
ID: 55,
Expand Down
4 changes: 2 additions & 2 deletions enterprise/tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewCoordinator(logger slog.Logger, ps pubsub.Pubsub) (agpl.Coordinator, err
return coord, nil
}

func (c *haCoordinator) ServeMultiAgent(id uuid.UUID) (agpl.MultiAgentConn, error) {
func (c *haCoordinator) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
m := (&agpl.MultiAgent{
ID: id,
AgentIsLegacyFunc: c.agentIsLegacy,
Expand All @@ -61,7 +61,7 @@ func (c *haCoordinator) ServeMultiAgent(id uuid.UUID) (agpl.MultiAgentConn, erro
OnRemove: func(enq agpl.Queue) { c.clientDisconnected(enq.UniqueID()) },
}).Init()
c.addClient(id, m)
return m, nil
return m
}

func (c *haCoordinator) addClient(id uuid.UUID, q agpl.Queue) {
Expand Down
Loading