From 10629c712fd7cbe901d774b47b95b4518b7f7364 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Tue, 2 Jan 2024 16:26:12 +0400 Subject: [PATCH 1/3] feat: promote PG Coordinator out of experimental --- coderd/apidoc/docs.go | 2 - coderd/apidoc/swagger.json | 7 +- codersdk/deployment.go | 4 - docs/api/schemas.md | 1 - enterprise/coderd/coderd.go | 7 +- enterprise/coderd/coderd_test.go | 4 + enterprise/tailnet/coordinator.go | 951 ------------------------- enterprise/tailnet/coordinator_test.go | 261 ------- site/src/api/typesGenerated.ts | 6 +- site/src/testHelpers/entities.ts | 2 +- 10 files changed, 8 insertions(+), 1237 deletions(-) delete mode 100644 enterprise/tailnet/coordinator.go delete mode 100644 enterprise/tailnet/coordinator_test.go diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index 3778f590e4f8a..a1f2553e25407 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -9109,12 +9109,10 @@ const docTemplate = `{ "type": "string", "enum": [ "workspace_actions", - "tailnet_pg_coordinator", "deployment_health_page" ], "x-enum-varnames": [ "ExperimentWorkspaceActions", - "ExperimentTailnetPGCoordinator", "ExperimentDeploymentHealthPage" ] }, diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index 0bfff1c165371..d4fe6ffd558db 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -8153,14 +8153,9 @@ }, "codersdk.Experiment": { "type": "string", - "enum": [ - "workspace_actions", - "tailnet_pg_coordinator", - "deployment_health_page" - ], + "enum": ["workspace_actions", "deployment_health_page"], "x-enum-varnames": [ "ExperimentWorkspaceActions", - "ExperimentTailnetPGCoordinator", "ExperimentDeploymentHealthPage" ] }, diff --git a/codersdk/deployment.go b/codersdk/deployment.go index 831ac91291c2b..ce80e32622167 100644 --- a/codersdk/deployment.go +++ b/codersdk/deployment.go @@ -2080,10 +2080,6 @@ const ( // https://github.com/coder/coder/milestone/19 ExperimentWorkspaceActions Experiment = "workspace_actions" - // ExperimentTailnetPGCoordinator enables the PGCoord in favor of the pubsub- - // only Coordinator - ExperimentTailnetPGCoordinator Experiment = "tailnet_pg_coordinator" - // Deployment health page ExperimentDeploymentHealthPage Experiment = "deployment_health_page" diff --git a/docs/api/schemas.md b/docs/api/schemas.md index 0f2072a1a2f57..c8ccc1fba5be7 100644 --- a/docs/api/schemas.md +++ b/docs/api/schemas.md @@ -2879,7 +2879,6 @@ AuthorizationObject can represent a "set" of objects, such as: all workspaces in | Value | | ------------------------ | | `workspace_actions` | -| `tailnet_pg_coordinator` | | `deployment_health_page` | ## codersdk.ExternalAuth diff --git a/enterprise/coderd/coderd.go b/enterprise/coderd/coderd.go index bd6997506a32e..4134e591dd313 100644 --- a/enterprise/coderd/coderd.go +++ b/enterprise/coderd/coderd.go @@ -613,12 +613,7 @@ func (api *API) updateEntitlements(ctx context.Context) error { if initial, changed, enabled := featureChanged(codersdk.FeatureHighAvailability); shouldUpdate(initial, changed, enabled) { var coordinator agpltailnet.Coordinator if enabled { - var haCoordinator agpltailnet.Coordinator - if api.AGPL.Experiments.Enabled(codersdk.ExperimentTailnetPGCoordinator) { - haCoordinator, err = tailnet.NewPGCoord(api.ctx, api.Logger, api.Pubsub, api.Database) - } else { - haCoordinator, err = tailnet.NewCoordinator(api.Logger, api.Pubsub) - } + haCoordinator, err := tailnet.NewPGCoord(api.ctx, api.Logger, api.Pubsub, api.Database) if err != nil { api.Logger.Error(ctx, "unable to set up high availability coordinator", slog.Error(err)) // If we try to setup the HA coordinator and it fails, nothing diff --git a/enterprise/coderd/coderd_test.go b/enterprise/coderd/coderd_test.go index 59fbe1818c781..f69fbff8d49cd 100644 --- a/enterprise/coderd/coderd_test.go +++ b/enterprise/coderd/coderd_test.go @@ -48,6 +48,10 @@ func TestEntitlements(t *testing.T) { require.Empty(t, res.Warnings) }) t.Run("FullLicense", func(t *testing.T) { + // PGCoordinator requires a real postgres + if !dbtestutil.WillUsePostgres() { + t.Skip("test only with postgres") + } t.Parallel() adminClient, _ := coderdenttest.New(t, &coderdenttest.Options{ AuditLogging: true, diff --git a/enterprise/tailnet/coordinator.go b/enterprise/tailnet/coordinator.go deleted file mode 100644 index 687ec236b4a44..0000000000000 --- a/enterprise/tailnet/coordinator.go +++ /dev/null @@ -1,951 +0,0 @@ -package tailnet - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "html/template" - "io" - "net" - "net/http" - "sync" - "time" - - "github.com/google/uuid" - lru "github.com/hashicorp/golang-lru/v2" - "golang.org/x/exp/slices" - "golang.org/x/xerrors" - - "cdr.dev/slog" - "github.com/coder/coder/v2/coderd/database/pubsub" - "github.com/coder/coder/v2/coderd/util/slice" - "github.com/coder/coder/v2/codersdk" - agpl "github.com/coder/coder/v2/tailnet" - "github.com/coder/coder/v2/tailnet/proto" -) - -// NewCoordinator creates a new high availability coordinator -// that uses PostgreSQL pubsub to exchange handshakes. -func NewCoordinator(logger slog.Logger, ps pubsub.Pubsub) (agpl.Coordinator, error) { - ctx, cancelFunc := context.WithCancel(context.Background()) - - nameCache, err := lru.New[uuid.UUID, string](512) - if err != nil { - panic("make lru cache: " + err.Error()) - } - - coord := &haCoordinator{ - id: uuid.New(), - log: logger, - pubsub: ps, - closeFunc: cancelFunc, - close: make(chan struct{}), - nodes: map[uuid.UUID]*agpl.Node{}, - agentSockets: map[uuid.UUID]agpl.Queue{}, - agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]agpl.Queue{}, - agentNameCache: nameCache, - clients: map[uuid.UUID]agpl.Queue{}, - clientsToAgents: map[uuid.UUID]map[uuid.UUID]agpl.Queue{}, - legacyAgents: map[uuid.UUID]struct{}{}, - } - - if err := coord.runPubsub(ctx); err != nil { - return nil, xerrors.Errorf("run coordinator pubsub: %w", err) - } - - return coord, nil -} - -func (c *haCoordinator) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn { - m := (&agpl.MultiAgent{ - ID: id, - AgentIsLegacyFunc: c.agentIsLegacy, - OnSubscribe: c.clientSubscribeToAgent, - OnUnsubscribe: c.clientUnsubscribeFromAgent, - OnNodeUpdate: c.clientNodeUpdate, - OnRemove: c.clientDisconnected, - }).Init() - c.addClient(id, m) - return m -} - -func (c *haCoordinator) addClient(id uuid.UUID, q agpl.Queue) { - c.mutex.Lock() - c.clients[id] = q - c.clientsToAgents[id] = map[uuid.UUID]agpl.Queue{} - c.mutex.Unlock() -} - -func (c *haCoordinator) clientSubscribeToAgent(enq agpl.Queue, agentID uuid.UUID) (*agpl.Node, error) { - c.mutex.Lock() - defer c.mutex.Unlock() - - c.initOrSetAgentConnectionSocketLocked(agentID, enq) - - node := c.nodes[enq.UniqueID()] - if node != nil { - err := c.sendNodeToAgentLocked(agentID, node) - if err != nil { - return nil, xerrors.Errorf("handle client update: %w", err) - } - } - - agentNode, ok := c.nodes[agentID] - // If we have the node locally, give it back to the multiagent. - if ok { - return agentNode, nil - } - - // If we don't have the node locally, notify other coordinators. - err := c.publishClientHello(agentID) - if err != nil { - return nil, xerrors.Errorf("publish client hello: %w", err) - } - - // nolint:nilnil - return nil, nil -} - -func (c *haCoordinator) clientUnsubscribeFromAgent(enq agpl.Queue, agentID uuid.UUID) error { - c.mutex.Lock() - defer c.mutex.Unlock() - - connectionSockets, ok := c.agentToConnectionSockets[agentID] - if !ok { - return nil - } - delete(connectionSockets, enq.UniqueID()) - if len(connectionSockets) == 0 { - delete(c.agentToConnectionSockets, agentID) - } - - return nil -} - -type haCoordinator struct { - id uuid.UUID - log slog.Logger - mutex sync.RWMutex - pubsub pubsub.Pubsub - close chan struct{} - closeFunc context.CancelFunc - - // nodes maps agent and connection IDs their respective node. - nodes map[uuid.UUID]*agpl.Node - // agentSockets maps agent IDs to their open websocket. - agentSockets map[uuid.UUID]agpl.Queue - // agentToConnectionSockets maps agent IDs to connection IDs of conns that - // are subscribed to updates for that agent. - agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]agpl.Queue - - // clients holds a map of all clients connected to the coordinator. This is - // necessary because a client may not be subscribed into any agents. - clients map[uuid.UUID]agpl.Queue - // clientsToAgents is an index of clients to all of their subscribed agents. - clientsToAgents map[uuid.UUID]map[uuid.UUID]agpl.Queue - - // agentNameCache holds a cache of agent names. If one of them disappears, - // it's helpful to have a name cached for debugging. - agentNameCache *lru.Cache[uuid.UUID, string] - - // legacyAgents holda a mapping of all agents detected as legacy, meaning - // they only listen on codersdk.WorkspaceAgentIP. They aren't compatible - // with the new ServerTailnet, so they must be connected through - // wsconncache. - legacyAgents map[uuid.UUID]struct{} -} - -func (c *haCoordinator) Coordinate(ctx context.Context, _ uuid.UUID, _ string, _ agpl.TunnelAuth) (chan<- *proto.CoordinateRequest, <-chan *proto.CoordinateResponse) { - // HA Coordinator does NOT support v2 API and this is just here to appease the compiler and prevent - // panics while we build out v2 support elsewhere. We will retire the HA Coordinator in favor of - // PG Coordinator before we turn on the v2 API. - c.log.Warn(ctx, "v2 API invoked but unimplemented") - resp := make(chan *proto.CoordinateResponse) - close(resp) - req := make(chan *proto.CoordinateRequest) - go func() { - for { - if _, ok := <-req; !ok { - return - } - } - }() - return req, resp -} - -// Node returns an in-memory node by ID. -func (c *haCoordinator) Node(id uuid.UUID) *agpl.Node { - c.mutex.Lock() - defer c.mutex.Unlock() - node := c.nodes[id] - return node -} - -func (c *haCoordinator) clientLogger(id, agent uuid.UUID) slog.Logger { - return c.log.With(slog.F("client_id", id), slog.F("agent_id", agent)) -} - -func (c *haCoordinator) agentLogger(agent uuid.UUID) slog.Logger { - return c.log.With(slog.F("agent_id", agent)) -} - -// ServeClient accepts a WebSocket connection that wants to connect to an agent -// with the specified ID. -func (c *haCoordinator) ServeClient(conn net.Conn, id, agentID uuid.UUID) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - logger := c.clientLogger(id, agentID) - - tc := agpl.NewTrackedConn(ctx, cancel, conn, id, logger, id.String(), 0, agpl.QueueKindClient) - defer tc.Close() - - c.addClient(id, tc) - defer c.clientDisconnected(tc) - - agentNode, err := c.clientSubscribeToAgent(tc, agentID) - if err != nil { - return xerrors.Errorf("subscribe agent: %w", err) - } - - if agentNode != nil { - err := tc.Enqueue([]*agpl.Node{agentNode}) - if err != nil { - logger.Debug(ctx, "enqueue initial node", slog.Error(err)) - } - } - - go tc.SendUpdates() - - decoder := json.NewDecoder(conn) - // Indefinitely handle messages from the client websocket. - for { - err := c.handleNextClientMessage(id, decoder) - if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { - return nil - } - return xerrors.Errorf("handle next client message: %w", err) - } - } -} - -func (c *haCoordinator) initOrSetAgentConnectionSocketLocked(agentID uuid.UUID, enq agpl.Queue) { - connectionSockets, ok := c.agentToConnectionSockets[agentID] - if !ok { - connectionSockets = map[uuid.UUID]agpl.Queue{} - c.agentToConnectionSockets[agentID] = connectionSockets - } - connectionSockets[enq.UniqueID()] = enq - c.clientsToAgents[enq.UniqueID()][agentID] = c.agentSockets[agentID] -} - -func (c *haCoordinator) clientDisconnected(enq agpl.Queue) { - c.mutex.Lock() - defer c.mutex.Unlock() - - for agentID := range c.clientsToAgents[enq.UniqueID()] { - connectionSockets, ok := c.agentToConnectionSockets[agentID] - if !ok { - continue - } - delete(connectionSockets, enq.UniqueID()) - if len(connectionSockets) == 0 { - delete(c.agentToConnectionSockets, agentID) - } - } - - delete(c.nodes, enq.UniqueID()) - delete(c.clients, enq.UniqueID()) - delete(c.clientsToAgents, enq.UniqueID()) -} - -func (c *haCoordinator) handleNextClientMessage(id uuid.UUID, decoder *json.Decoder) error { - var node agpl.Node - err := decoder.Decode(&node) - if err != nil { - return xerrors.Errorf("read json: %w", err) - } - - return c.clientNodeUpdate(id, &node) -} - -func (c *haCoordinator) clientNodeUpdate(id uuid.UUID, node *agpl.Node) error { - c.mutex.Lock() - defer c.mutex.Unlock() - // Update the node of this client in our in-memory map. If an agent entirely - // shuts down and reconnects, it needs to be aware of all clients attempting - // to establish connections. - c.nodes[id] = node - - for agentID, agentSocket := range c.clientsToAgents[id] { - if agentSocket == nil { - // If we don't own the agent locally, send it over pubsub to a node that - // owns the agent. - err := c.publishNodesToAgent(agentID, []*agpl.Node{node}) - if err != nil { - c.log.Error(context.Background(), "publish node to agent", slog.Error(err), slog.F("agent_id", agentID)) - } - } else { - // Write the new node from this client to the actively connected agent. - err := agentSocket.Enqueue([]*agpl.Node{node}) - if err != nil { - c.log.Error(context.Background(), "enqueue node to agent", slog.Error(err), slog.F("agent_id", agentID)) - } - } - } - - return nil -} - -func (c *haCoordinator) sendNodeToAgentLocked(agentID uuid.UUID, node *agpl.Node) error { - agentSocket, ok := c.agentSockets[agentID] - if !ok { - // If we don't own the agent locally, send it over pubsub to a node that - // owns the agent. - err := c.publishNodesToAgent(agentID, []*agpl.Node{node}) - if err != nil { - return xerrors.Errorf("publish node to agent") - } - return nil - } - err := agentSocket.Enqueue([]*agpl.Node{node}) - if err != nil { - return xerrors.Errorf("enqueue node: %w", err) - } - return nil -} - -// ServeAgent accepts a WebSocket connection to an agent that listens to -// incoming connections and publishes node updates. -func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - logger := c.agentLogger(id) - c.agentNameCache.Add(id, name) - - c.mutex.Lock() - overwrites := int64(0) - // If an old agent socket is connected, we Close it to avoid any leaks. This - // shouldn't ever occur because we expect one agent to be running, but it's - // possible for a race condition to happen when an agent is disconnected and - // attempts to reconnect before the server realizes the old connection is - // dead. - oldAgentSocket, ok := c.agentSockets[id] - if ok { - overwrites = oldAgentSocket.Overwrites() + 1 - _ = oldAgentSocket.Close() - } - // This uniquely identifies a connection that belongs to this goroutine. - unique := uuid.New() - tc := agpl.NewTrackedConn(ctx, cancel, conn, unique, logger, name, overwrites, agpl.QueueKindAgent) - - // Publish all nodes on this instance that want to connect to this agent. - nodes := c.nodesSubscribedToAgent(id) - if len(nodes) > 0 { - err := tc.Enqueue(nodes) - if err != nil { - c.mutex.Unlock() - return xerrors.Errorf("enqueue nodes: %w", err) - } - } - c.agentSockets[id] = tc - for clientID := range c.agentToConnectionSockets[id] { - c.clientsToAgents[clientID][id] = tc - } - c.mutex.Unlock() - go tc.SendUpdates() - - // Tell clients on other instances to send a callmemaybe to us. - err := c.publishAgentHello(id) - if err != nil { - return xerrors.Errorf("publish agent hello: %w", err) - } - - defer func() { - c.mutex.Lock() - defer c.mutex.Unlock() - - // Only delete the connection if it's ours. It could have been - // overwritten. - if idConn, ok := c.agentSockets[id]; ok && idConn.UniqueID() == unique { - delete(c.agentSockets, id) - delete(c.nodes, id) - } - for clientID := range c.agentToConnectionSockets[id] { - c.clientsToAgents[clientID][id] = nil - } - }() - - decoder := json.NewDecoder(conn) - for { - node, err := c.handleAgentUpdate(id, decoder) - if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, context.Canceled) { - return nil - } - return xerrors.Errorf("handle next agent message: %w", err) - } - - err = c.publishAgentToNodes(id, node) - if err != nil { - return xerrors.Errorf("publish agent to nodes: %w", err) - } - } -} - -func (c *haCoordinator) nodesSubscribedToAgent(agentID uuid.UUID) []*agpl.Node { - sockets, ok := c.agentToConnectionSockets[agentID] - if !ok { - return nil - } - - nodes := make([]*agpl.Node, 0, len(sockets)) - for targetID := range sockets { - node, ok := c.nodes[targetID] - if !ok { - continue - } - nodes = append(nodes, node) - } - - return nodes -} - -func (c *haCoordinator) handleClientHello(id uuid.UUID) error { - c.mutex.Lock() - node, ok := c.nodes[id] - c.mutex.Unlock() - if !ok { - return nil - } - return c.publishAgentToNodes(id, node) -} - -func (c *haCoordinator) agentIsLegacy(agentID uuid.UUID) bool { - c.mutex.RLock() - _, ok := c.legacyAgents[agentID] - c.mutex.RUnlock() - return ok -} - -func (c *haCoordinator) handleAgentUpdate(id uuid.UUID, decoder *json.Decoder) (*agpl.Node, error) { - var node agpl.Node - err := decoder.Decode(&node) - if err != nil { - return nil, xerrors.Errorf("read json: %w", err) - } - - c.mutex.Lock() - // Keep a cache of all legacy agents. - if len(node.Addresses) > 0 && node.Addresses[0].Addr() == codersdk.WorkspaceAgentIP { - c.legacyAgents[id] = struct{}{} - } - - oldNode := c.nodes[id] - if oldNode != nil { - if oldNode.AsOf.After(node.AsOf) { - c.mutex.Unlock() - return oldNode, nil - } - } - c.nodes[id] = &node - connectionSockets, ok := c.agentToConnectionSockets[id] - if !ok { - c.mutex.Unlock() - return &node, nil - } - - // Publish the new node to every listening socket. - for _, connectionSocket := range connectionSockets { - _ = connectionSocket.Enqueue([]*agpl.Node{&node}) - } - - c.mutex.Unlock() - - return &node, nil -} - -// Close closes all of the open connections in the coordinator and stops the -// coordinator from accepting new connections. -func (c *haCoordinator) Close() error { - c.mutex.Lock() - defer c.mutex.Unlock() - select { - case <-c.close: - return nil - default: - } - close(c.close) - c.closeFunc() - - wg := sync.WaitGroup{} - - wg.Add(len(c.agentSockets)) - for _, socket := range c.agentSockets { - socket := socket - go func() { - _ = socket.CoordinatorClose() - wg.Done() - }() - } - - wg.Add(len(c.clients)) - for _, client := range c.clients { - client := client - go func() { - _ = client.CoordinatorClose() - wg.Done() - }() - } - - wg.Wait() - return nil -} - -func (c *haCoordinator) publishNodesToAgent(recipient uuid.UUID, nodes []*agpl.Node) error { - msg, err := c.formatCallMeMaybe(recipient, nodes) - if err != nil { - return xerrors.Errorf("format publish message: %w", err) - } - - err = c.pubsub.Publish("wireguard_peers", msg) - if err != nil { - return xerrors.Errorf("publish message: %w", err) - } - - return nil -} - -func (c *haCoordinator) publishAgentHello(id uuid.UUID) error { - msg, err := c.formatAgentHello(id) - if err != nil { - return xerrors.Errorf("format publish message: %w", err) - } - - err = c.pubsub.Publish("wireguard_peers", msg) - if err != nil { - return xerrors.Errorf("publish message: %w", err) - } - - return nil -} - -func (c *haCoordinator) publishClientHello(id uuid.UUID) error { - msg, err := c.formatClientHello(id) - if err != nil { - return xerrors.Errorf("format client hello: %w", err) - } - err = c.pubsub.Publish("wireguard_peers", msg) - if err != nil { - return xerrors.Errorf("publish client hello: %w", err) - } - return nil -} - -func (c *haCoordinator) publishAgentToNodes(id uuid.UUID, node *agpl.Node) error { - msg, err := c.formatAgentUpdate(id, node) - if err != nil { - return xerrors.Errorf("format publish message: %w", err) - } - - err = c.pubsub.Publish("wireguard_peers", msg) - if err != nil { - return xerrors.Errorf("publish message: %w", err) - } - - return nil -} - -func (c *haCoordinator) runPubsub(ctx context.Context) error { - messageQueue := make(chan []byte, 64) - cancelSub, err := c.pubsub.Subscribe("wireguard_peers", func(ctx context.Context, message []byte) { - select { - case messageQueue <- message: - case <-ctx.Done(): - return - } - }) - if err != nil { - return xerrors.Errorf("subscribe wireguard peers") - } - go func() { - for { - select { - case <-ctx.Done(): - return - case message := <-messageQueue: - c.handlePubsubMessage(ctx, message) - } - } - }() - - go func() { - defer cancelSub() - <-c.close - }() - - return nil -} - -func (c *haCoordinator) handlePubsubMessage(ctx context.Context, message []byte) { - sp := bytes.Split(message, []byte("|")) - if len(sp) != 4 { - c.log.Error(ctx, "invalid wireguard peer message", slog.F("msg", string(message))) - return - } - - var ( - coordinatorID = sp[0] - eventType = sp[1] - agentID = sp[2] - nodeJSON = sp[3] - ) - - sender, err := uuid.ParseBytes(coordinatorID) - if err != nil { - c.log.Error(ctx, "invalid sender id", slog.F("id", string(coordinatorID)), slog.F("msg", string(message))) - return - } - - // We sent this message! - if sender == c.id { - return - } - - switch string(eventType) { - case "callmemaybe": - agentUUID, err := uuid.ParseBytes(agentID) - if err != nil { - c.log.Error(ctx, "invalid agent id", slog.F("id", string(agentID))) - return - } - - c.mutex.Lock() - agentSocket, ok := c.agentSockets[agentUUID] - c.mutex.Unlock() - if !ok { - return - } - - // Socket takes a slice of Nodes, so we need to parse the JSON here. - var nodes []*agpl.Node - err = json.Unmarshal(nodeJSON, &nodes) - if err != nil { - c.log.Error(ctx, "invalid nodes JSON", slog.F("id", agentID), slog.Error(err), slog.F("node", string(nodeJSON))) - } - err = agentSocket.Enqueue(nodes) - if err != nil { - c.log.Error(ctx, "send callmemaybe to agent", slog.Error(err)) - return - } - case "clienthello": - agentUUID, err := uuid.ParseBytes(agentID) - if err != nil { - c.log.Error(ctx, "invalid agent id", slog.F("id", string(agentID))) - return - } - - err = c.handleClientHello(agentUUID) - if err != nil { - c.log.Error(ctx, "handle agent request node", slog.Error(err)) - return - } - case "agenthello": - agentUUID, err := uuid.ParseBytes(agentID) - if err != nil { - c.log.Error(ctx, "invalid agent id", slog.F("id", string(agentID))) - return - } - - c.mutex.RLock() - nodes := c.nodesSubscribedToAgent(agentUUID) - c.mutex.RUnlock() - if len(nodes) > 0 { - err := c.publishNodesToAgent(agentUUID, nodes) - if err != nil { - c.log.Error(ctx, "publish nodes to agent", slog.Error(err)) - return - } - } - case "agentupdate": - agentUUID, err := uuid.ParseBytes(agentID) - if err != nil { - c.log.Error(ctx, "invalid agent id", slog.F("id", string(agentID))) - return - } - - decoder := json.NewDecoder(bytes.NewReader(nodeJSON)) - _, err = c.handleAgentUpdate(agentUUID, decoder) - if err != nil { - c.log.Error(ctx, "handle agent update", slog.Error(err)) - return - } - default: - c.log.Error(ctx, "unknown peer event", slog.F("name", string(eventType))) - } -} - -// format: |callmemaybe|| -func (c *haCoordinator) formatCallMeMaybe(recipient uuid.UUID, nodes []*agpl.Node) ([]byte, error) { - buf := bytes.Buffer{} - - _, _ = buf.WriteString(c.id.String() + "|") - _, _ = buf.WriteString("callmemaybe|") - _, _ = buf.WriteString(recipient.String() + "|") - err := json.NewEncoder(&buf).Encode(nodes) - if err != nil { - return nil, xerrors.Errorf("encode node: %w", err) - } - - return buf.Bytes(), nil -} - -// format: |agenthello|| -func (c *haCoordinator) formatAgentHello(id uuid.UUID) ([]byte, error) { - buf := bytes.Buffer{} - - _, _ = buf.WriteString(c.id.String() + "|") - _, _ = buf.WriteString("agenthello|") - _, _ = buf.WriteString(id.String() + "|") - - return buf.Bytes(), nil -} - -// format: |clienthello|| -func (c *haCoordinator) formatClientHello(id uuid.UUID) ([]byte, error) { - buf := bytes.Buffer{} - - _, _ = buf.WriteString(c.id.String() + "|") - _, _ = buf.WriteString("clienthello|") - _, _ = buf.WriteString(id.String() + "|") - - return buf.Bytes(), nil -} - -// format: |agentupdate|| -func (c *haCoordinator) formatAgentUpdate(id uuid.UUID, node *agpl.Node) ([]byte, error) { - buf := bytes.Buffer{} - - _, _ = buf.WriteString(c.id.String() + "|") - _, _ = buf.WriteString("agentupdate|") - _, _ = buf.WriteString(id.String() + "|") - err := json.NewEncoder(&buf).Encode(node) - if err != nil { - return nil, xerrors.Errorf("encode node: %w", err) - } - - return buf.Bytes(), nil -} - -func (c *haCoordinator) ServeHTTPDebug(w http.ResponseWriter, r *http.Request) { - c.mutex.RLock() - defer c.mutex.RUnlock() - - CoordinatorHTTPDebug( - HTTPDebugFromLocal(true, c.agentSockets, c.agentToConnectionSockets, c.nodes, c.agentNameCache), - )(w, r) -} - -func HTTPDebugFromLocal( - ha bool, - agentSocketsMap map[uuid.UUID]agpl.Queue, - agentToConnectionSocketsMap map[uuid.UUID]map[uuid.UUID]agpl.Queue, - nodesMap map[uuid.UUID]*agpl.Node, - agentNameCache *lru.Cache[uuid.UUID, string], -) HTMLDebugHA { - now := time.Now() - data := HTMLDebugHA{HA: ha} - for id, conn := range agentSocketsMap { - start, lastWrite := conn.Stats() - agent := &HTMLAgent{ - Name: conn.Name(), - ID: id, - CreatedAge: now.Sub(time.Unix(start, 0)).Round(time.Second), - LastWriteAge: now.Sub(time.Unix(lastWrite, 0)).Round(time.Second), - Overwrites: int(conn.Overwrites()), - } - - for id, conn := range agentToConnectionSocketsMap[id] { - start, lastWrite := conn.Stats() - agent.Connections = append(agent.Connections, &HTMLClient{ - Name: conn.Name(), - ID: id, - CreatedAge: now.Sub(time.Unix(start, 0)).Round(time.Second), - LastWriteAge: now.Sub(time.Unix(lastWrite, 0)).Round(time.Second), - }) - } - slices.SortFunc(agent.Connections, func(a, b *HTMLClient) int { - return slice.Ascending(a.Name, b.Name) - }) - - data.Agents = append(data.Agents, agent) - } - slices.SortFunc(data.Agents, func(a, b *HTMLAgent) int { - return slice.Ascending(a.Name, b.Name) - }) - - for agentID, conns := range agentToConnectionSocketsMap { - if len(conns) == 0 { - continue - } - - if _, ok := agentSocketsMap[agentID]; ok { - continue - } - - agentName, ok := agentNameCache.Get(agentID) - if !ok { - agentName = "unknown" - } - agent := &HTMLAgent{ - Name: agentName, - ID: agentID, - } - for id, conn := range conns { - start, lastWrite := conn.Stats() - agent.Connections = append(agent.Connections, &HTMLClient{ - Name: conn.Name(), - ID: id, - CreatedAge: now.Sub(time.Unix(start, 0)).Round(time.Second), - LastWriteAge: now.Sub(time.Unix(lastWrite, 0)).Round(time.Second), - }) - } - slices.SortFunc(agent.Connections, func(a, b *HTMLClient) int { - return slice.Ascending(a.Name, b.Name) - }) - - data.MissingAgents = append(data.MissingAgents, agent) - } - slices.SortFunc(data.MissingAgents, func(a, b *HTMLAgent) int { - return slice.Ascending(a.Name, b.Name) - }) - - for id, node := range nodesMap { - name, _ := agentNameCache.Get(id) - data.Nodes = append(data.Nodes, &HTMLNode{ - ID: id, - Name: name, - Node: node, - }) - } - slices.SortFunc(data.Nodes, func(a, b *HTMLNode) int { - return slice.Ascending(a.Name+a.ID.String(), b.Name+b.ID.String()) - }) - - return data -} - -func CoordinatorHTTPDebug(data HTMLDebugHA) func(w http.ResponseWriter, _ *http.Request) { - return func(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Content-Type", "text/html; charset=utf-8") - - tmpl, err := template.New("coordinator_debug").Funcs(template.FuncMap{ - "marshal": func(v any) template.JS { - a, err := json.MarshalIndent(v, "", " ") - if err != nil { - //nolint:gosec - return template.JS(fmt.Sprintf(`{"err": %q}`, err)) - } - //nolint:gosec - return template.JS(a) - }, - }).Parse(haCoordinatorDebugTmpl) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte(err.Error())) - return - } - - err = tmpl.Execute(w, data) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte(err.Error())) - return - } - } -} - -type HTMLDebugHA struct { - HA bool - Agents []*HTMLAgent - MissingAgents []*HTMLAgent - Nodes []*HTMLNode -} - -type HTMLAgent struct { - Name string - ID uuid.UUID - CreatedAge time.Duration - LastWriteAge time.Duration - Overwrites int - Connections []*HTMLClient -} - -type HTMLClient struct { - Name string - ID uuid.UUID - CreatedAge time.Duration - LastWriteAge time.Duration -} - -type HTMLNode struct { - ID uuid.UUID - Name string - Node any -} - -var haCoordinatorDebugTmpl = ` - - - - - - - {{- if .HA }} -

high-availability wireguard coordinator debug

-

warning: this only provides info from the node that served the request, if there are multiple replicas this data may be incomplete

- {{- else }} -

in-memory wireguard coordinator debug

- {{- end }} - -

# agents: total {{ len .Agents }}

-
    - {{- range .Agents }} -
  • - {{ .Name }} ({{ .ID }}): created {{ .CreatedAge }} ago, write {{ .LastWriteAge }} ago, overwrites {{ .Overwrites }} -

    connections: total {{ len .Connections}}

    -
      - {{- range .Connections }} -
    • {{ .Name }} ({{ .ID }}): created {{ .CreatedAge }} ago, write {{ .LastWriteAge }} ago
    • - {{- end }} -
    -
  • - {{- end }} -
- -

# missing agents: total {{ len .MissingAgents }}

-
    - {{- range .MissingAgents}} -
  • {{ .Name }} ({{ .ID }}): created ? ago, write ? ago, overwrites ?
  • -

    connections: total {{ len .Connections }}

    -
      - {{- range .Connections }} -
    • {{ .Name }} ({{ .ID }}): created {{ .CreatedAge }} ago, write {{ .LastWriteAge }} ago
    • - {{- end }} -
    - {{- end }} -
- -

# nodes: total {{ len .Nodes }}

-
    - {{- range .Nodes }} -
  • {{ .Name }} ({{ .ID }}): - {{ marshal .Node }} -
  • - {{- end }} -
- - -` diff --git a/enterprise/tailnet/coordinator_test.go b/enterprise/tailnet/coordinator_test.go deleted file mode 100644 index 367b07c586faa..0000000000000 --- a/enterprise/tailnet/coordinator_test.go +++ /dev/null @@ -1,261 +0,0 @@ -package tailnet_test - -import ( - "net" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "cdr.dev/slog/sloggers/slogtest" - - "github.com/coder/coder/v2/coderd/database/dbtestutil" - "github.com/coder/coder/v2/coderd/database/pubsub" - "github.com/coder/coder/v2/enterprise/tailnet" - agpl "github.com/coder/coder/v2/tailnet" - "github.com/coder/coder/v2/testutil" -) - -func TestCoordinatorSingle(t *testing.T) { - t.Parallel() - t.Run("ClientWithoutAgent", func(t *testing.T) { - t.Parallel() - coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), pubsub.NewInMemory()) - require.NoError(t, err) - defer coordinator.Close() - - client, server := net.Pipe() - sendNode, errChan := agpl.ServeCoordinator(client, func(node []*agpl.Node) error { - return nil - }) - id := uuid.New() - closeChan := make(chan struct{}) - go func() { - err := coordinator.ServeClient(server, id, uuid.New()) - assert.NoError(t, err) - close(closeChan) - }() - sendNode(&agpl.Node{}) - require.Eventually(t, func() bool { - return coordinator.Node(id) != nil - }, testutil.WaitShort, testutil.IntervalFast) - - err = client.Close() - require.NoError(t, err) - <-errChan - <-closeChan - }) - - t.Run("AgentWithoutClients", func(t *testing.T) { - t.Parallel() - coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), pubsub.NewInMemory()) - require.NoError(t, err) - defer coordinator.Close() - - client, server := net.Pipe() - sendNode, errChan := agpl.ServeCoordinator(client, func(node []*agpl.Node) error { - return nil - }) - id := uuid.New() - closeChan := make(chan struct{}) - go func() { - err := coordinator.ServeAgent(server, id, "") - assert.NoError(t, err) - close(closeChan) - }() - sendNode(&agpl.Node{}) - require.Eventually(t, func() bool { - return coordinator.Node(id) != nil - }, testutil.WaitShort, testutil.IntervalFast) - err = client.Close() - require.NoError(t, err) - <-errChan - <-closeChan - }) - - t.Run("AgentWithClient", func(t *testing.T) { - t.Parallel() - - coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), pubsub.NewInMemory()) - require.NoError(t, err) - defer coordinator.Close() - - agentWS, agentServerWS := net.Pipe() - defer agentWS.Close() - agentNodeChan := make(chan []*agpl.Node) - sendAgentNode, agentErrChan := agpl.ServeCoordinator(agentWS, func(nodes []*agpl.Node) error { - agentNodeChan <- nodes - return nil - }) - agentID := uuid.New() - closeAgentChan := make(chan struct{}) - go func() { - err := coordinator.ServeAgent(agentServerWS, agentID, "") - assert.NoError(t, err) - close(closeAgentChan) - }() - sendAgentNode(&agpl.Node{PreferredDERP: 1}) - require.Eventually(t, func() bool { - return coordinator.Node(agentID) != nil - }, testutil.WaitShort, testutil.IntervalFast) - - clientWS, clientServerWS := net.Pipe() - defer clientWS.Close() - defer clientServerWS.Close() - clientNodeChan := make(chan []*agpl.Node) - sendClientNode, clientErrChan := agpl.ServeCoordinator(clientWS, func(nodes []*agpl.Node) error { - clientNodeChan <- nodes - return nil - }) - clientID := uuid.New() - closeClientChan := make(chan struct{}) - go func() { - err := coordinator.ServeClient(clientServerWS, clientID, agentID) - assert.NoError(t, err) - close(closeClientChan) - }() - agentNodes := <-clientNodeChan - require.Len(t, agentNodes, 1) - sendClientNode(&agpl.Node{PreferredDERP: 2}) - clientNodes := <-agentNodeChan - require.Len(t, clientNodes, 1) - - // Ensure an update to the agent node reaches the client! - sendAgentNode(&agpl.Node{PreferredDERP: 3}) - agentNodes = <-clientNodeChan - require.Len(t, agentNodes, 1) - - // Close the agent WebSocket so a new one can connect. - err = agentWS.Close() - require.NoError(t, err) - <-agentErrChan - <-closeAgentChan - - // Create a new agent connection. This is to simulate a reconnect! - agentWS, agentServerWS = net.Pipe() - defer agentWS.Close() - agentNodeChan = make(chan []*agpl.Node) - _, agentErrChan = agpl.ServeCoordinator(agentWS, func(nodes []*agpl.Node) error { - agentNodeChan <- nodes - return nil - }) - closeAgentChan = make(chan struct{}) - go func() { - err := coordinator.ServeAgent(agentServerWS, agentID, "") - assert.NoError(t, err) - close(closeAgentChan) - }() - // Ensure the existing listening client sends it's node immediately! - clientNodes = <-agentNodeChan - require.Len(t, clientNodes, 1) - - err = agentWS.Close() - require.NoError(t, err) - <-agentErrChan - <-closeAgentChan - - err = clientWS.Close() - require.NoError(t, err) - <-clientErrChan - <-closeClientChan - }) -} - -func TestCoordinatorHA(t *testing.T) { - t.Parallel() - - t.Run("AgentWithClient", func(t *testing.T) { - t.Parallel() - - _, pubsub := dbtestutil.NewDB(t) - - coordinator1, err := tailnet.NewCoordinator(slogtest.Make(t, nil), pubsub) - require.NoError(t, err) - defer coordinator1.Close() - - agentWS, agentServerWS := net.Pipe() - defer agentWS.Close() - agentNodeChan := make(chan []*agpl.Node) - sendAgentNode, agentErrChan := agpl.ServeCoordinator(agentWS, func(nodes []*agpl.Node) error { - agentNodeChan <- nodes - return nil - }) - agentID := uuid.New() - closeAgentChan := make(chan struct{}) - go func() { - err := coordinator1.ServeAgent(agentServerWS, agentID, "") - assert.NoError(t, err) - close(closeAgentChan) - }() - sendAgentNode(&agpl.Node{PreferredDERP: 1}) - require.Eventually(t, func() bool { - return coordinator1.Node(agentID) != nil - }, testutil.WaitShort, testutil.IntervalFast) - - coordinator2, err := tailnet.NewCoordinator(slogtest.Make(t, nil), pubsub) - require.NoError(t, err) - defer coordinator2.Close() - - clientWS, clientServerWS := net.Pipe() - defer clientWS.Close() - defer clientServerWS.Close() - clientNodeChan := make(chan []*agpl.Node) - sendClientNode, clientErrChan := agpl.ServeCoordinator(clientWS, func(nodes []*agpl.Node) error { - clientNodeChan <- nodes - return nil - }) - clientID := uuid.New() - closeClientChan := make(chan struct{}) - go func() { - err := coordinator2.ServeClient(clientServerWS, clientID, agentID) - assert.NoError(t, err) - close(closeClientChan) - }() - agentNodes := <-clientNodeChan - require.Len(t, agentNodes, 1) - sendClientNode(&agpl.Node{PreferredDERP: 2}) - _ = sendClientNode - clientNodes := <-agentNodeChan - require.Len(t, clientNodes, 1) - - // Ensure an update to the agent node reaches the client! - sendAgentNode(&agpl.Node{PreferredDERP: 3}) - agentNodes = <-clientNodeChan - require.Len(t, agentNodes, 1) - - // Close the agent WebSocket so a new one can connect. - require.NoError(t, agentWS.Close()) - require.NoError(t, agentServerWS.Close()) - <-agentErrChan - <-closeAgentChan - - // Create a new agent connection. This is to simulate a reconnect! - agentWS, agentServerWS = net.Pipe() - defer agentWS.Close() - agentNodeChan = make(chan []*agpl.Node) - _, agentErrChan = agpl.ServeCoordinator(agentWS, func(nodes []*agpl.Node) error { - agentNodeChan <- nodes - return nil - }) - closeAgentChan = make(chan struct{}) - go func() { - err := coordinator1.ServeAgent(agentServerWS, agentID, "") - assert.NoError(t, err) - close(closeAgentChan) - }() - // Ensure the existing listening client sends it's node immediately! - clientNodes = <-agentNodeChan - require.Len(t, clientNodes, 1) - - err = agentWS.Close() - require.NoError(t, err) - <-agentErrChan - <-closeAgentChan - - err = clientWS.Close() - require.NoError(t, err) - <-clientErrChan - <-closeClientChan - }) -} diff --git a/site/src/api/typesGenerated.ts b/site/src/api/typesGenerated.ts index 572f6d4996a39..17b3091cfe2a5 100644 --- a/site/src/api/typesGenerated.ts +++ b/site/src/api/typesGenerated.ts @@ -1812,13 +1812,9 @@ export const Entitlements: Entitlement[] = [ ]; // From codersdk/deployment.go -export type Experiment = - | "deployment_health_page" - | "tailnet_pg_coordinator" - | "workspace_actions"; +export type Experiment = "deployment_health_page" | "workspace_actions"; export const Experiments: Experiment[] = [ "deployment_health_page", - "tailnet_pg_coordinator", "workspace_actions", ]; diff --git a/site/src/testHelpers/entities.ts b/site/src/testHelpers/entities.ts index f12cd6763a4c2..3940984541e3b 100644 --- a/site/src/testHelpers/entities.ts +++ b/site/src/testHelpers/entities.ts @@ -2048,7 +2048,7 @@ export const MockEntitlementsWithUserLimit: TypesGen.Entitlements = { }; export const MockExperiments: TypesGen.Experiment[] = [ - "tailnet_pg_coordinator", + "workspace_actions", ]; export const MockAuditLog: TypesGen.AuditLog = { From 58cecd44adc22a7e1e714618a0b29ce32ef7bd35 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Thu, 4 Jan 2024 10:36:29 +0000 Subject: [PATCH 2/3] prettier on entities.ts --- site/src/testHelpers/entities.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/site/src/testHelpers/entities.ts b/site/src/testHelpers/entities.ts index 3940984541e3b..3edf538eab4b1 100644 --- a/site/src/testHelpers/entities.ts +++ b/site/src/testHelpers/entities.ts @@ -2047,9 +2047,7 @@ export const MockEntitlementsWithUserLimit: TypesGen.Entitlements = { }), }; -export const MockExperiments: TypesGen.Experiment[] = [ - "workspace_actions", -]; +export const MockExperiments: TypesGen.Experiment[] = ["workspace_actions"]; export const MockAuditLog: TypesGen.AuditLog = { id: "fbd2116a-8961-4954-87ae-e4575bd29ce0", From 9af1797be2c68541513e2bba1441496211de7084 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Wed, 3 Jan 2024 11:47:06 +0400 Subject: [PATCH 3/3] chore: remove unused context/cancel in tailnet Conn --- tailnet/conn.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 3620cc5244390..34712ee0ffb9f 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -282,12 +282,9 @@ func NewConn(options *Options) (conn *Conn, err error) { Logger(options.Logger.Named("net.packet-filter")), )) - dialContext, dialCancel := context.WithCancel(context.Background()) server := &Conn{ blockEndpoints: options.BlockEndpoints, derpForceWebSockets: options.DERPForceWebSockets, - dialContext: dialContext, - dialCancel: dialCancel, closed: make(chan struct{}), logger: options.Logger, magicConn: magicConn, @@ -392,8 +389,6 @@ func IPFromUUID(uid uuid.UUID) netip.Addr { // Conn is an actively listening Wireguard connection. type Conn struct { - dialContext context.Context - dialCancel context.CancelFunc mutex sync.Mutex closed chan struct{} logger slog.Logger @@ -789,7 +784,6 @@ func (c *Conn) Close() error { _ = c.netStack.Close() c.logger.Debug(context.Background(), "closed netstack") - c.dialCancel() _ = c.wireguardMonitor.Close() _ = c.dialer.Close() // Stops internals, e.g. tunDevice, magicConn and dnsManager.