diff --git a/agent/agent.go b/agent/agent.go index 31b48edd4dc83..4cefcfa9f8616 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -70,6 +70,47 @@ const ( EnvProcOOMScore = "CODER_PROC_OOM_SCORE" ) +// agentImmortalDialer is a custom dialer for immortal streams that can +// connect to the agent's own services via tailnet addresses. +type agentImmortalDialer struct { + agent *agent + standardDialer *net.Dialer +} + +func (d *agentImmortalDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + host, portStr, err := net.SplitHostPort(address) + if err != nil { + return nil, xerrors.Errorf("split host port %q: %w", address, err) + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, xerrors.Errorf("parse port %q: %w", portStr, err) + } + + // Check if this is a connection to one of the agent's own services + isLocalhost := host == "localhost" || host == "127.0.0.1" || host == "::1" + isAgentPort := port == int(workspacesdk.AgentSSHPort) || port == int(workspacesdk.AgentHTTPAPIServerPort) || + port == int(workspacesdk.AgentReconnectingPTYPort) || port == int(workspacesdk.AgentSpeedtestPort) + + if isLocalhost && isAgentPort { + // Get the agent ID from the current manifest + manifest := d.agent.manifest.Load() + if manifest == nil || manifest.AgentID == uuid.Nil { + // Fallback to standard dialing if no manifest available yet + return d.standardDialer.DialContext(ctx, network, address) + } + + // Connect to the agent's own tailnet address instead of localhost + agentAddr := tailnet.TailscaleServicePrefix.AddrFromUUID(manifest.AgentID) + agentAddress := net.JoinHostPort(agentAddr.String(), portStr) + return d.standardDialer.DialContext(ctx, network, agentAddress) + } + + // For other addresses, use standard dialing + return d.standardDialer.DialContext(ctx, network, address) +} + type Options struct { Filesystem afero.Fs LogDir string @@ -351,8 +392,13 @@ func (a *agent) init() { a.containerAPI = agentcontainers.NewAPI(a.logger.Named("containers"), containerAPIOpts...) - // Initialize immortal streams manager - a.immortalStreamsManager = immortalstreams.New(a.logger.Named("immortal-streams"), &net.Dialer{}) + // Initialize immortal streams manager with a custom dialer + // that can connect to the agent's own services + immortalDialer := &agentImmortalDialer{ + agent: a, + standardDialer: &net.Dialer{}, + } + a.immortalStreamsManager = immortalstreams.New(a.logger.Named("immortal-streams"), immortalDialer) a.reconnectingPTYServer = reconnectingpty.NewServer( a.logger.Named("reconnecting-pty"), diff --git a/cli/exp.go b/cli/exp.go index dafd85402663e..a4d4640bf6057 100644 --- a/cli/exp.go +++ b/cli/exp.go @@ -16,6 +16,7 @@ func (r *RootCmd) expCmd() *serpent.Command { r.mcpCommand(), r.promptExample(), r.rptyCommand(), + r.immortalStreamCmd(), }, } return cmd diff --git a/cli/immortalstreams.go b/cli/immortalstreams.go new file mode 100644 index 0000000000000..7dc3e0300d7ab --- /dev/null +++ b/cli/immortalstreams.go @@ -0,0 +1,188 @@ +package cli + +import ( + "context" + "fmt" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "github.com/coder/coder/v2/cli/cliui" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/serpent" +) + +// immortalStreamClient provides methods to interact with immortal streams API +// This uses the main codersdk.Client to make server-proxied requests to agents +type immortalStreamClient struct { + client *codersdk.Client + agentID uuid.UUID + logger slog.Logger +} + +// newImmortalStreamClient creates a new client for immortal streams +func newImmortalStreamClient(client *codersdk.Client, agentID uuid.UUID, logger slog.Logger) *immortalStreamClient { + return &immortalStreamClient{ + client: client, + agentID: agentID, + logger: logger, + } +} + +// createStream creates a new immortal stream +func (c *immortalStreamClient) createStream(ctx context.Context, port int) (*codersdk.ImmortalStream, error) { + stream, err := c.client.WorkspaceAgentCreateImmortalStream(ctx, c.agentID, codersdk.CreateImmortalStreamRequest{ + TCPPort: port, + }) + if err != nil { + return nil, err + } + return &stream, nil +} + +// listStreams lists all immortal streams +func (c *immortalStreamClient) listStreams(ctx context.Context) ([]codersdk.ImmortalStream, error) { + return c.client.WorkspaceAgentImmortalStreams(ctx, c.agentID) +} + +// deleteStream deletes an immortal stream +func (c *immortalStreamClient) deleteStream(ctx context.Context, streamID uuid.UUID) error { + return c.client.WorkspaceAgentDeleteImmortalStream(ctx, c.agentID, streamID) +} + +// CLI Commands + +func (r *RootCmd) immortalStreamCmd() *serpent.Command { + client := new(codersdk.Client) + cmd := &serpent.Command{ + Use: "immortal-stream", + Short: "Manage immortal streams in workspaces", + Long: "Immortal streams provide persistent TCP connections to workspace services that automatically reconnect when interrupted.", + Middleware: serpent.Chain( + r.InitClient(client), + ), + Handler: func(inv *serpent.Invocation) error { + return inv.Command.HelpHandler(inv) + }, + Children: []*serpent.Command{ + r.immortalStreamListCmd(), + r.immortalStreamDeleteCmd(), + }, + } + return cmd +} + +func (r *RootCmd) immortalStreamListCmd() *serpent.Command { + client := new(codersdk.Client) + cmd := &serpent.Command{ + Use: "list ", + Short: "List active immortal streams in a workspace", + Middleware: serpent.Chain( + serpent.RequireNArgs(1), + r.InitClient(client), + ), + Handler: func(inv *serpent.Invocation) error { + ctx := inv.Context() + workspaceName := inv.Args[0] + + workspace, workspaceAgent, _, err := getWorkspaceAndAgent(ctx, inv, client, false, workspaceName) + if err != nil { + return err + } + + if workspace.LatestBuild.Transition != codersdk.WorkspaceTransitionStart { + return xerrors.New("workspace must be running to list immortal streams") + } + + // Create immortal stream client + // Note: We don't need to dial the agent for management operations + // as these go through the server's proxy endpoints + streamClient := newImmortalStreamClient(client, workspaceAgent.ID, inv.Logger) + streams, err := streamClient.listStreams(ctx) + if err != nil { + return xerrors.Errorf("list immortal streams: %w", err) + } + + if len(streams) == 0 { + cliui.Info(inv.Stderr, "No active immortal streams found.") + return nil + } + + // Display the streams in a table + displayImmortalStreams(inv, streams) + return nil + }, + } + return cmd +} + +func (r *RootCmd) immortalStreamDeleteCmd() *serpent.Command { + client := new(codersdk.Client) + cmd := &serpent.Command{ + Use: "delete ", + Short: "Delete an active immortal stream", + Middleware: serpent.Chain( + serpent.RequireNArgs(2), + r.InitClient(client), + ), + Handler: func(inv *serpent.Invocation) error { + ctx := inv.Context() + workspaceName := inv.Args[0] + streamName := inv.Args[1] + + workspace, workspaceAgent, _, err := getWorkspaceAndAgent(ctx, inv, client, false, workspaceName) + if err != nil { + return err + } + + if workspace.LatestBuild.Transition != codersdk.WorkspaceTransitionStart { + return xerrors.New("workspace must be running to delete immortal streams") + } + + // Create immortal stream client + streamClient := newImmortalStreamClient(client, workspaceAgent.ID, inv.Logger) + streams, err := streamClient.listStreams(ctx) + if err != nil { + return xerrors.Errorf("list immortal streams: %w", err) + } + + var targetStream *codersdk.ImmortalStream + for _, stream := range streams { + if stream.Name == streamName { + targetStream = &stream + break + } + } + + if targetStream == nil { + return xerrors.Errorf("immortal stream %q not found", streamName) + } + + // Delete the stream + err = streamClient.deleteStream(ctx, targetStream.ID) + if err != nil { + return xerrors.Errorf("delete immortal stream: %w", err) + } + + cliui.Info(inv.Stderr, fmt.Sprintf("Deleted immortal stream %q (ID: %s)", streamName, targetStream.ID)) + return nil + }, + } + return cmd +} + +func displayImmortalStreams(inv *serpent.Invocation, streams []codersdk.ImmortalStream) { + _, _ = fmt.Fprintf(inv.Stderr, "Active Immortal Streams:\n\n") + _, _ = fmt.Fprintf(inv.Stderr, "%-20s %-6s %-20s %-20s\n", "NAME", "PORT", "CREATED", "LAST CONNECTED") + _, _ = fmt.Fprintf(inv.Stderr, "%-20s %-6s %-20s %-20s\n", "----", "----", "-------", "--------------") + + for _, stream := range streams { + createdTime := stream.CreatedAt.Format("2006-01-02 15:04:05") + lastConnTime := stream.LastConnectionAt.Format("2006-01-02 15:04:05") + + _, _ = fmt.Fprintf(inv.Stderr, "%-20s %-6d %-20s %-20s\n", + stream.Name, stream.TCPPort, createdTime, lastConnTime) + } + _, _ = fmt.Fprintf(inv.Stderr, "\n") +} diff --git a/cli/portforward.go b/cli/portforward.go index 7a7723213f760..3afb834f9c902 100644 --- a/cli/portforward.go +++ b/cli/portforward.go @@ -39,6 +39,10 @@ func (r *RootCmd) portForward() *serpent.Command { udpForwards []string // : disableAutostart bool appearanceConfig codersdk.AppearanceConfig + + // Immortal streams flags + immortal bool + immortalFallback bool = true // Default to true for port-forward ) client := new(codersdk.Client) cmd := &serpent.Command{ @@ -212,6 +216,19 @@ func (r *RootCmd) portForward() *serpent.Command { Description: "Forward UDP port(s) from the workspace to the local machine. The UDP connection has TCP-like semantics to support stateful UDP protocols.", Value: serpent.StringArrayOf(&udpForwards), }, + { + Flag: "immortal", + Description: "Use immortal streams for port forwarding connections, providing automatic reconnection when interrupted.", + Value: serpent.BoolOf(&immortal), + Hidden: true, + }, + { + Flag: "immortal-fallback", + Description: "If immortal streams are unavailable due to connection limits, fall back to regular TCP connection.", + Default: "true", + Value: serpent.BoolOf(&immortalFallback), + Hidden: true, + }, sshDisableAutostartOption(serpent.BoolOf(&disableAutostart)), } diff --git a/cli/ssh.go b/cli/ssh.go index a2bca46c72f32..feb255cdc5f12 100644 --- a/cli/ssh.go +++ b/cli/ssh.go @@ -48,6 +48,7 @@ import ( "github.com/coder/quartz" "github.com/coder/retry" "github.com/coder/serpent" + "github.com/coder/websocket" ) const ( @@ -85,6 +86,10 @@ func (r *RootCmd) ssh() *serpent.Command { containerName string containerUser string + + // Immortal streams flags + immortal bool + immortalFallback bool // Default to false for SSH ) client := new(codersdk.Client) wsClient := workspacesdk.New(client) @@ -387,11 +392,83 @@ func (r *RootCmd) ssh() *serpent.Command { } if stdio { - rawSSH, err := conn.SSH(ctx) - if err != nil { - return xerrors.Errorf("connect SSH: %w", err) + var rawSSH net.Conn + var immortalStreamClient *immortalStreamClient + var streamID *uuid.UUID + + if immortal { + // Use immortal stream for SSH connection + immortalStreamClient = newImmortalStreamClient(client, workspaceAgent.ID, logger) + + // Create immortal stream to agent SSH port (1) + stream, err := immortalStreamClient.createStream(ctx, 1) + if err != nil { + logger.Error(ctx, "failed to create immortal stream for SSH", + slog.Error(err), + slog.F("agent_id", workspaceAgent.ID), + slog.F("target_port", 1), + slog.F("workspace", workspace.Name), + slog.F("agent_status", workspaceAgent.Status), + slog.F("immortal_fallback_enabled", immortalFallback)) + + shouldFallback := immortalFallback && (strings.Contains(err.Error(), "too many immortal streams") || + strings.Contains(err.Error(), "The connection was refused")) + + if shouldFallback { + if strings.Contains(err.Error(), "too many immortal streams") { + logger.Warn(ctx, "too many immortal streams, falling back to regular SSH connection", + slog.F("max_streams", "32")) + } else { + logger.Warn(ctx, "Agent SSH service not available on port 1, falling back to regular SSH connection", + slog.F("reason", "connection_refused"), + slog.F("suggestion", "agent SSH server may not be running")) + } + logger.Info(ctx, "attempting fallback to regular SSH connection") + rawSSH, err = conn.SSH(ctx) + if err != nil { + logger.Error(ctx, "fallback SSH connection also failed", slog.Error(err)) + return xerrors.Errorf("connect SSH (fallback): %w", err) + } + logger.Info(ctx, "successfully connected via regular SSH fallback") + } else { + return xerrors.Errorf("create immortal stream for SSH: %w", err) + } + } else { + streamID = &stream.ID + logger.Info(ctx, "created immortal stream for SSH", slog.F("stream_name", stream.Name), slog.F("stream_id", stream.ID)) + + // Connect to the immortal stream via WebSocket + rawSSH, err = connectToImmortalStreamWebSocket(ctx, conn, stream.ID, logger) + if err != nil { + // Clean up the stream if connection fails + _ = immortalStreamClient.deleteStream(ctx, stream.ID) + return xerrors.Errorf("connect to immortal stream: %w", err) + } + } + } else { + // Use regular SSH connection + rawSSH, err = conn.SSH(ctx) + if err != nil { + return xerrors.Errorf("connect SSH: %w", err) + } } - copier := newRawSSHCopier(logger, rawSSH, stdioReader, stdioWriter) + + var copier io.Closer + + if tcpConn, ok := rawSSH.(*gonet.TCPConn); ok { + // Use specialized raw SSH copier for regular TCP connections + rawCopier := newRawSSHCopier(logger, tcpConn, stdioReader, stdioWriter) + copier = rawCopier + // Start copying in the background for rawSSHCopier + go rawCopier.copy(&wg) + } else { + // Use generic copier for immortal stream connections + genericCopier := newGenericSSHCopier(logger, rawSSH, stdioReader, stdioWriter) + copier = genericCopier + // Start copying in the background for genericSSHCopier + go genericCopier.copy(&wg) + } + if err = stack.push("rawSSHCopier", copier); err != nil { return err } @@ -404,22 +481,108 @@ func (r *RootCmd) ssh() *serpent.Command { } } + // Set up cleanup for immortal stream + if immortalStreamClient != nil && streamID != nil { + defer func() { + if err := immortalStreamClient.deleteStream(context.Background(), *streamID); err != nil { + logger.Error(context.Background(), "failed to cleanup immortal stream", slog.Error(err)) + } + }() + } + wg.Add(1) go func() { defer wg.Done() watchAndClose(ctx, func() error { + // Clean up immortal stream on termination + if immortalStreamClient != nil && streamID != nil { + if err := immortalStreamClient.deleteStream(context.Background(), *streamID); err != nil { + logger.Error(context.Background(), "failed to cleanup immortal stream on termination", slog.Error(err)) + } + } stack.close(xerrors.New("watchAndClose")) return nil }, logger, client, workspace, errCh) }() - copier.copy(&wg) + // The copying is already started in the background above + wg.Wait() return nil } - sshClient, err := conn.SSHClient(ctx) - if err != nil { - return xerrors.Errorf("ssh client: %w", err) + var sshClient *gossh.Client + var immortalStreamClient *immortalStreamClient + var streamID *uuid.UUID + + if immortal { + // Use immortal stream for SSH connection + immortalStreamClient = newImmortalStreamClient(client, workspaceAgent.ID, logger) + + // Create immortal stream to agent SSH port (1) + stream, err := immortalStreamClient.createStream(ctx, 1) + if err != nil { + logger.Error(ctx, "failed to create immortal stream for SSH (regular mode)", + slog.Error(err), + slog.F("agent_id", workspaceAgent.ID), + slog.F("target_port", 1), + slog.F("workspace", workspace.Name), + slog.F("agent_status", workspaceAgent.Status), + slog.F("immortal_fallback_enabled", immortalFallback)) + + shouldFallback := immortalFallback && (strings.Contains(err.Error(), "too many immortal streams") || + strings.Contains(err.Error(), "The connection was refused")) + + if shouldFallback { + if strings.Contains(err.Error(), "too many immortal streams") { + logger.Warn(ctx, "too many immortal streams, falling back to regular SSH connection", + slog.F("max_streams", "32")) + } else { + logger.Warn(ctx, "Agent SSH service not available on port 1, falling back to regular SSH connection", + slog.F("reason", "connection_refused"), + slog.F("suggestion", "agent SSH server may not be running")) + } + logger.Info(ctx, "attempting fallback to regular SSH client") + sshClient, err = conn.SSHClient(ctx) + if err != nil { + logger.Error(ctx, "fallback SSH client creation also failed", slog.Error(err)) + return xerrors.Errorf("ssh client (fallback): %w", err) + } + logger.Info(ctx, "successfully created SSH client via regular fallback") + } else { + return xerrors.Errorf("create immortal stream for SSH: %w", err) + } + } else { + streamID = &stream.ID + logger.Info(ctx, "created immortal stream for SSH", slog.F("stream_name", stream.Name), slog.F("stream_id", stream.ID)) + + // Connect to the immortal stream and create SSH client + rawConn, err := connectToImmortalStreamWebSocket(ctx, conn, stream.ID, logger) + if err != nil { + // Clean up the stream if connection fails + _ = immortalStreamClient.deleteStream(ctx, stream.ID) + return xerrors.Errorf("connect to immortal stream: %w", err) + } + + // Create SSH client over the immortal stream connection + sshConn, chans, reqs, err := gossh.NewClientConn(rawConn, "localhost:22", &gossh.ClientConfig{ + HostKeyCallback: gossh.InsecureIgnoreHostKey(), + Timeout: 30 * time.Second, + }) + if err != nil { + rawConn.Close() + _ = immortalStreamClient.deleteStream(ctx, stream.ID) + return xerrors.Errorf("ssh handshake over immortal stream: %w", err) + } + + sshClient = gossh.NewClient(sshConn, chans, reqs) + } + } else { + // Use regular SSH connection + sshClient, err = conn.SSHClient(ctx) + if err != nil { + return xerrors.Errorf("ssh client: %w", err) + } } + if err = stack.push("ssh client", sshClient); err != nil { return err } @@ -440,12 +603,27 @@ func (r *RootCmd) ssh() *serpent.Command { } } + // Set up cleanup for immortal stream in regular SSH mode + if immortalStreamClient != nil && streamID != nil { + defer func() { + if err := immortalStreamClient.deleteStream(context.Background(), *streamID); err != nil { + logger.Error(context.Background(), "failed to cleanup immortal stream", slog.Error(err)) + } + }() + } + wg.Add(1) go func() { defer wg.Done() watchAndClose( ctx, func() error { + // Clean up immortal stream on termination + if immortalStreamClient != nil && streamID != nil { + if err := immortalStreamClient.deleteStream(context.Background(), *streamID); err != nil { + logger.Error(context.Background(), "failed to cleanup immortal stream on termination", slog.Error(err)) + } + } stack.close(xerrors.New("watchAndClose")) return nil }, @@ -728,11 +906,83 @@ func (r *RootCmd) ssh() *serpent.Command { Value: serpent.BoolOf(&forceNewTunnel), Hidden: true, }, + { + Flag: "immortal", + Description: "Use immortal streams for SSH connection, providing automatic reconnection when interrupted.", + Value: serpent.BoolOf(&immortal), + Hidden: true, + }, + { + Flag: "immortal-fallback", + Description: "If immortal streams are unavailable due to connection limits, fall back to regular TCP connection.", + Value: serpent.BoolOf(&immortalFallback), + Hidden: true, + }, sshDisableAutostartOption(serpent.BoolOf(&disableAutostart)), } return cmd } +// connectToImmortalStreamWebSocket connects to an immortal stream via WebSocket and returns a net.Conn +func connectToImmortalStreamWebSocket(ctx context.Context, agentConn *workspacesdk.AgentConn, streamID uuid.UUID, logger slog.Logger) (net.Conn, error) { + // Build the target address for the agent's HTTP API server + // We'll let the WebSocket dialer handle the actual connection through the agent + apiServerAddr := fmt.Sprintf("127.0.0.1:%d", workspacesdk.AgentHTTPAPIServerPort) + wsURL := fmt.Sprintf("ws://%s/api/v0/immortal-stream/%s", apiServerAddr, streamID) + + // Create WebSocket connection using the agent's tailnet connection + // The key is to use a custom dialer that routes through the agent connection + dialOptions := &websocket.DialOptions{ + HTTPClient: &http.Client{ + Transport: &http.Transport{ + DialContext: func(dialCtx context.Context, network, addr string) (net.Conn, error) { + // Route all connections through the agent connection + // The agent connection will handle routing to the correct internal address + + conn, err := agentConn.DialContext(dialCtx, network, addr) + if err != nil { + return nil, err + } + + return conn, nil + }, + }, + }, + // Disable compression for raw TCP data + CompressionMode: websocket.CompressionDisabled, + } + + // Connect to the WebSocket endpoint + conn, res, err := websocket.Dial(ctx, wsURL, dialOptions) + if err != nil { + if res != nil { + logger.Error(ctx, "WebSocket dial failed", + slog.F("stream_id", streamID), + slog.F("websocket_url", wsURL), + slog.F("status", res.StatusCode), + slog.F("status_text", res.Status), + slog.Error(err)) + } else { + logger.Error(ctx, "WebSocket dial failed (no response)", + slog.F("stream_id", streamID), + slog.F("websocket_url", wsURL), + slog.Error(err)) + } + return nil, xerrors.Errorf("dial immortal stream WebSocket: %w", err) + } + + logger.Info(ctx, "successfully connected to immortal stream WebSocket", + slog.F("stream_id", streamID)) + + // Convert WebSocket to net.Conn for SSH usage + // Use MessageBinary for raw TCP data transport + netConn := websocket.NetConn(ctx, conn, websocket.MessageBinary) + + logger.Debug(ctx, "converted WebSocket to net.Conn for SSH usage") + + return netConn, nil +} + // findWorkspaceAndAgentByHostname parses the hostname from the commandline and finds the workspace and agent it // corresponds to, taking into account any name prefixes or suffixes configured (e.g. myworkspace.coder, or // vscode-coder--myusername--myworkspace). @@ -1276,6 +1526,44 @@ func newRawSSHCopier(logger slog.Logger, conn *gonet.TCPConn, r io.Reader, w io. return &rawSSHCopier{conn: conn, logger: logger, r: r, w: w, done: make(chan struct{})} } +// genericSSHCopier is similar to rawSSHCopier but works with any net.Conn (e.g., immortal streams) +type genericSSHCopier struct { + conn net.Conn + logger slog.Logger + r io.Reader + w io.Writer + done chan struct{} +} + +func newGenericSSHCopier(logger slog.Logger, conn net.Conn, r io.Reader, w io.Writer) *genericSSHCopier { + return &genericSSHCopier{conn: conn, logger: logger, r: r, w: w, done: make(chan struct{})} +} + +func (c *genericSSHCopier) copy(wg *sync.WaitGroup) { + defer close(c.done) + + // Copy stdin to connection + go func() { + defer c.conn.Close() + _, err := io.Copy(c.conn, c.r) + if err != nil { + c.logger.Debug(context.Background(), "error copying stdin to connection", slog.Error(err)) + } + }() + + // Copy connection to stdout + _, err := io.Copy(c.w, c.conn) + if err != nil { + c.logger.Debug(context.Background(), "error copying connection to stdout", slog.Error(err)) + } +} + +func (c *genericSSHCopier) Close() error { + c.conn.Close() + <-c.done + return nil +} + func (c *rawSSHCopier) copy(wg *sync.WaitGroup) { defer close(c.done) logCtx := context.Background() diff --git a/coderd/coderd.go b/coderd/coderd.go index 2aa30c9d7a45c..b6c7117681782 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -1391,6 +1391,11 @@ func New(options *Options) *API { r.Get("/containers/watch", api.watchWorkspaceAgentContainers) r.Post("/containers/devcontainers/{devcontainer}/recreate", api.workspaceAgentRecreateDevcontainer) r.Get("/coordinate", api.workspaceAgentClientCoordinate) + r.Route("/immortal-streams", func(r chi.Router) { + r.Get("/", api.workspaceAgentImmortalStreams) + r.Post("/", api.workspaceAgentCreateImmortalStream) + r.Delete("/{immortalstream}", api.workspaceAgentDeleteImmortalStream) + }) // PTY is part of workspaceAppServer. }) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index d600eff6ecfec..8388472b1e5ce 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -805,6 +805,212 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req httpapi.Write(ctx, rw, http.StatusOK, portsResponse) } +// @Summary Get workspace agent immortal streams +// @ID get-workspace-agent-immortal-streams +// @Security CoderSessionToken +// @Produce json +// @Tags Agents +// @Param workspaceagent path string true "Workspace agent ID" format(uuid) +// @Success 200 {array} codersdk.ImmortalStream +// @Router /workspaceagents/{workspaceagent}/immortal-streams [get] +func (api *API) workspaceAgentImmortalStreams(rw http.ResponseWriter, r *http.Request) { + ctx := r.Context() + workspaceAgent := httpmw.WorkspaceAgentParam(r) + + // Check agent connectivity with timeout + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + apiAgent, err := db2sdk.WorkspaceAgent( + api.DERPMap(), *api.TailnetCoordinator.Load(), workspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout, + api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), + ) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error reading workspace agent.", + Detail: err.Error(), + }) + return + } + if apiAgent.Status != codersdk.WorkspaceAgentConnected { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected), + }) + return + } + + agentConn, release, err := api.agentProvider.AgentConn(ctx, workspaceAgent.ID) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error dialing workspace agent.", + Detail: err.Error(), + }) + return + } + defer release() + + streams, err := agentConn.ImmortalStreams(ctx) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error fetching immortal streams.", + Detail: err.Error(), + }) + return + } + + httpapi.Write(ctx, rw, http.StatusOK, streams) +} + +// @Summary Create workspace agent immortal stream +// @ID create-workspace-agent-immortal-stream +// @Security CoderSessionToken +// @Accept json +// @Produce json +// @Tags Agents +// @Param workspaceagent path string true "Workspace agent ID" format(uuid) +// @Param request body codersdk.CreateImmortalStreamRequest true "Create immortal stream request" +// @Success 201 {object} codersdk.ImmortalStream +// @Router /workspaceagents/{workspaceagent}/immortal-streams [post] +func (api *API) workspaceAgentCreateImmortalStream(rw http.ResponseWriter, r *http.Request) { + ctx := r.Context() + workspaceAgent := httpmw.WorkspaceAgentParam(r) + + var req codersdk.CreateImmortalStreamRequest + if !httpapi.Read(ctx, rw, r, &req) { + return + } + + // Check agent connectivity with timeout + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + apiAgent, err := db2sdk.WorkspaceAgent( + api.DERPMap(), *api.TailnetCoordinator.Load(), workspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout, + api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), + ) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error reading workspace agent.", + Detail: err.Error(), + }) + return + } + if apiAgent.Status != codersdk.WorkspaceAgentConnected { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected), + }) + return + } + + agentConn, release, err := api.agentProvider.AgentConn(ctx, workspaceAgent.ID) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error dialing workspace agent.", + Detail: err.Error(), + }) + return + } + defer release() + + stream, err := agentConn.CreateImmortalStream(ctx, req) + if err != nil { + // Check for specific error types from the agent + if strings.Contains(err.Error(), "too many immortal streams") { + httpapi.Write(ctx, rw, http.StatusServiceUnavailable, codersdk.Response{ + Message: "Too many immortal streams.", + }) + return + } + if strings.Contains(err.Error(), "connection was refused") { + httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{ + Message: "The connection was refused.", + }) + return + } + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error creating immortal stream.", + Detail: err.Error(), + }) + return + } + + httpapi.Write(ctx, rw, http.StatusCreated, stream) +} + +// @Summary Delete workspace agent immortal stream +// @ID delete-workspace-agent-immortal-stream +// @Security CoderSessionToken +// @Tags Agents +// @Param workspaceagent path string true "Workspace agent ID" format(uuid) +// @Param immortalstream path string true "Immortal stream ID" format(uuid) +// @Success 200 {object} codersdk.Response +// @Router /workspaceagents/{workspaceagent}/immortal-streams/{immortalstream} [delete] +func (api *API) workspaceAgentDeleteImmortalStream(rw http.ResponseWriter, r *http.Request) { + ctx := r.Context() + workspaceAgent := httpmw.WorkspaceAgentParam(r) + + streamIDStr := chi.URLParam(r, "immortalstream") + streamID, err := uuid.Parse(streamIDStr) + if err != nil { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: "Invalid immortal stream ID format.", + Detail: err.Error(), + }) + return + } + + // Check agent connectivity with timeout + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + apiAgent, err := db2sdk.WorkspaceAgent( + api.DERPMap(), *api.TailnetCoordinator.Load(), workspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout, + api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), + ) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error reading workspace agent.", + Detail: err.Error(), + }) + return + } + if apiAgent.Status != codersdk.WorkspaceAgentConnected { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected), + }) + return + } + + agentConn, release, err := api.agentProvider.AgentConn(ctx, workspaceAgent.ID) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error dialing workspace agent.", + Detail: err.Error(), + }) + return + } + defer release() + + err = agentConn.DeleteImmortalStream(ctx, streamID) + if err != nil { + if strings.Contains(err.Error(), "stream not found") { + httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{ + Message: "Immortal stream not found.", + }) + return + } + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error deleting immortal stream.", + Detail: err.Error(), + }) + return + } + + httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{ + Message: "Immortal stream deleted successfully.", + }) +} + // @Summary Watch workspace agent for container updates. // @ID watch-workspace-agent-for-container-updates // @Security CoderSessionToken diff --git a/codersdk/workspaceagents.go b/codersdk/workspaceagents.go index 1eb37bb07c989..250ce5ef8084a 100644 --- a/codersdk/workspaceagents.go +++ b/codersdk/workspaceagents.go @@ -393,6 +393,47 @@ func (c *Client) WorkspaceAgentListeningPorts(ctx context.Context, agentID uuid. return listeningPorts, json.NewDecoder(res.Body).Decode(&listeningPorts) } +// WorkspaceAgentImmortalStreams returns a list of immortal streams for the given agent. +func (c *Client) WorkspaceAgentImmortalStreams(ctx context.Context, agentID uuid.UUID) ([]ImmortalStream, error) { + res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("/api/v2/workspaceagents/%s/immortal-streams", agentID), nil) + if err != nil { + return nil, err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, ReadBodyAsError(res) + } + var streams []ImmortalStream + return streams, json.NewDecoder(res.Body).Decode(&streams) +} + +// WorkspaceAgentCreateImmortalStream creates a new immortal stream for the given agent. +func (c *Client) WorkspaceAgentCreateImmortalStream(ctx context.Context, agentID uuid.UUID, req CreateImmortalStreamRequest) (ImmortalStream, error) { + res, err := c.Request(ctx, http.MethodPost, fmt.Sprintf("/api/v2/workspaceagents/%s/immortal-streams", agentID), req) + if err != nil { + return ImmortalStream{}, err + } + defer res.Body.Close() + if res.StatusCode != http.StatusCreated { + return ImmortalStream{}, ReadBodyAsError(res) + } + var stream ImmortalStream + return stream, json.NewDecoder(res.Body).Decode(&stream) +} + +// WorkspaceAgentDeleteImmortalStream deletes an immortal stream for the given agent. +func (c *Client) WorkspaceAgentDeleteImmortalStream(ctx context.Context, agentID uuid.UUID, streamID uuid.UUID) error { + res, err := c.Request(ctx, http.MethodDelete, fmt.Sprintf("/api/v2/workspaceagents/%s/immortal-streams/%s", agentID, streamID), nil) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return ReadBodyAsError(res) + } + return nil +} + // WorkspaceAgentDevcontainerStatus is the status of a devcontainer. type WorkspaceAgentDevcontainerStatus string diff --git a/codersdk/workspacesdk/agentconn.go b/codersdk/workspacesdk/agentconn.go index ce66d5e1b8a70..9bbaa2ded4b91 100644 --- a/codersdk/workspacesdk/agentconn.go +++ b/codersdk/workspacesdk/agentconn.go @@ -1,6 +1,7 @@ package workspacesdk import ( + "bytes" "context" "encoding/binary" "encoding/json" @@ -281,6 +282,74 @@ func (c *AgentConn) ListeningPorts(ctx context.Context) (codersdk.WorkspaceAgent return resp, json.NewDecoder(res.Body).Decode(&resp) } +// ImmortalStreams lists the immortal streams that are currently active in the workspace. +func (c *AgentConn) ImmortalStreams(ctx context.Context) ([]codersdk.ImmortalStream, error) { + ctx, span := tracing.StartSpan(ctx) + defer span.End() + res, err := c.apiRequest(ctx, http.MethodGet, "/api/v0/immortal-stream", nil) + if err != nil { + return nil, xerrors.Errorf("do request: %w", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, codersdk.ReadBodyAsError(res) + } + + var streams []codersdk.ImmortalStream + return streams, json.NewDecoder(res.Body).Decode(&streams) +} + +// CreateImmortalStream creates a new immortal stream to the specified port. +func (c *AgentConn) CreateImmortalStream(ctx context.Context, req codersdk.CreateImmortalStreamRequest) (codersdk.ImmortalStream, error) { + ctx, span := tracing.StartSpan(ctx) + defer span.End() + + // Note: We can't easily add logging here since AgentConn doesn't have a logger + // But we can add some debug info to the error messages + + reqBody, err := json.Marshal(req) + if err != nil { + return codersdk.ImmortalStream{}, xerrors.Errorf("marshal request: %w", err) + } + + res, err := c.apiRequest(ctx, http.MethodPost, "/api/v0/immortal-stream", bytes.NewReader(reqBody)) + if err != nil { + return codersdk.ImmortalStream{}, xerrors.Errorf("do request to agent /api/v0/immortal-stream: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != http.StatusCreated { + bodyErr := codersdk.ReadBodyAsError(res) + return codersdk.ImmortalStream{}, xerrors.Errorf("agent responded with status %d: %w", res.StatusCode, bodyErr) + } + + var stream codersdk.ImmortalStream + err = json.NewDecoder(res.Body).Decode(&stream) + if err != nil { + return codersdk.ImmortalStream{}, xerrors.Errorf("decode response: %w", err) + } + return stream, nil +} + +// DeleteImmortalStream deletes an immortal stream by ID. +func (c *AgentConn) DeleteImmortalStream(ctx context.Context, streamID uuid.UUID) error { + ctx, span := tracing.StartSpan(ctx) + defer span.End() + + path := fmt.Sprintf("/api/v0/immortal-stream/%s", streamID) + res, err := c.apiRequest(ctx, http.MethodDelete, path, nil) + if err != nil { + return xerrors.Errorf("do request: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != http.StatusNoContent { + return codersdk.ReadBodyAsError(res) + } + + return nil +} + // Netcheck returns a network check report from the workspace agent. func (c *AgentConn) Netcheck(ctx context.Context) (healthsdk.AgentNetcheckReport, error) { ctx, span := tracing.StartSpan(ctx)