Skip to content

Commit 3160e6e

Browse files
committed
chore: added support for immortal streams to cli and agent
1 parent 3b16a89 commit 3160e6e

File tree

9 files changed

+871
-10
lines changed

9 files changed

+871
-10
lines changed

agent/agent.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,47 @@ const (
7070
EnvProcOOMScore = "CODER_PROC_OOM_SCORE"
7171
)
7272

73+
// agentImmortalDialer is a custom dialer for immortal streams that can
74+
// connect to the agent's own services via tailnet addresses.
75+
type agentImmortalDialer struct {
76+
agent *agent
77+
standardDialer *net.Dialer
78+
}
79+
80+
func (d *agentImmortalDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
81+
host, portStr, err := net.SplitHostPort(address)
82+
if err != nil {
83+
return nil, xerrors.Errorf("split host port %q: %w", address, err)
84+
}
85+
86+
port, err := strconv.Atoi(portStr)
87+
if err != nil {
88+
return nil, xerrors.Errorf("parse port %q: %w", portStr, err)
89+
}
90+
91+
// Check if this is a connection to one of the agent's own services
92+
isLocalhost := host == "localhost" || host == "127.0.0.1" || host == "::1"
93+
isAgentPort := port == int(workspacesdk.AgentSSHPort) || port == int(workspacesdk.AgentHTTPAPIServerPort) ||
94+
port == int(workspacesdk.AgentReconnectingPTYPort) || port == int(workspacesdk.AgentSpeedtestPort)
95+
96+
if isLocalhost && isAgentPort {
97+
// Get the agent ID from the current manifest
98+
manifest := d.agent.manifest.Load()
99+
if manifest == nil || manifest.AgentID == uuid.Nil {
100+
// Fallback to standard dialing if no manifest available yet
101+
return d.standardDialer.DialContext(ctx, network, address)
102+
}
103+
104+
// Connect to the agent's own tailnet address instead of localhost
105+
agentAddr := tailnet.TailscaleServicePrefix.AddrFromUUID(manifest.AgentID)
106+
agentAddress := net.JoinHostPort(agentAddr.String(), portStr)
107+
return d.standardDialer.DialContext(ctx, network, agentAddress)
108+
}
109+
110+
// For other addresses, use standard dialing
111+
return d.standardDialer.DialContext(ctx, network, address)
112+
}
113+
73114
type Options struct {
74115
Filesystem afero.Fs
75116
LogDir string
@@ -351,8 +392,13 @@ func (a *agent) init() {
351392

352393
a.containerAPI = agentcontainers.NewAPI(a.logger.Named("containers"), containerAPIOpts...)
353394

354-
// Initialize immortal streams manager
355-
a.immortalStreamsManager = immortalstreams.New(a.logger.Named("immortal-streams"), &net.Dialer{})
395+
// Initialize immortal streams manager with a custom dialer
396+
// that can connect to the agent's own services
397+
immortalDialer := &agentImmortalDialer{
398+
agent: a,
399+
standardDialer: &net.Dialer{},
400+
}
401+
a.immortalStreamsManager = immortalstreams.New(a.logger.Named("immortal-streams"), immortalDialer)
356402

357403
a.reconnectingPTYServer = reconnectingpty.NewServer(
358404
a.logger.Named("reconnecting-pty"),

cli/exp.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func (r *RootCmd) expCmd() *serpent.Command {
1616
r.mcpCommand(),
1717
r.promptExample(),
1818
r.rptyCommand(),
19+
r.immortalStreamCmd(),
1920
},
2021
}
2122
return cmd

cli/immortalstreams.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package cli
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/google/uuid"
8+
"golang.org/x/xerrors"
9+
10+
"cdr.dev/slog"
11+
"github.com/coder/coder/v2/cli/cliui"
12+
"github.com/coder/coder/v2/codersdk"
13+
"github.com/coder/serpent"
14+
)
15+
16+
// immortalStreamClient provides methods to interact with immortal streams API
17+
// This uses the main codersdk.Client to make server-proxied requests to agents
18+
type immortalStreamClient struct {
19+
client *codersdk.Client
20+
agentID uuid.UUID
21+
logger slog.Logger
22+
}
23+
24+
// newImmortalStreamClient creates a new client for immortal streams
25+
func newImmortalStreamClient(client *codersdk.Client, agentID uuid.UUID, logger slog.Logger) *immortalStreamClient {
26+
return &immortalStreamClient{
27+
client: client,
28+
agentID: agentID,
29+
logger: logger,
30+
}
31+
}
32+
33+
// createStream creates a new immortal stream
34+
func (c *immortalStreamClient) createStream(ctx context.Context, port int) (*codersdk.ImmortalStream, error) {
35+
stream, err := c.client.WorkspaceAgentCreateImmortalStream(ctx, c.agentID, codersdk.CreateImmortalStreamRequest{
36+
TCPPort: port,
37+
})
38+
if err != nil {
39+
return nil, err
40+
}
41+
return &stream, nil
42+
}
43+
44+
// listStreams lists all immortal streams
45+
func (c *immortalStreamClient) listStreams(ctx context.Context) ([]codersdk.ImmortalStream, error) {
46+
return c.client.WorkspaceAgentImmortalStreams(ctx, c.agentID)
47+
}
48+
49+
// deleteStream deletes an immortal stream
50+
func (c *immortalStreamClient) deleteStream(ctx context.Context, streamID uuid.UUID) error {
51+
return c.client.WorkspaceAgentDeleteImmortalStream(ctx, c.agentID, streamID)
52+
}
53+
54+
// CLI Commands
55+
56+
func (r *RootCmd) immortalStreamCmd() *serpent.Command {
57+
client := new(codersdk.Client)
58+
cmd := &serpent.Command{
59+
Use: "immortal-stream",
60+
Short: "Manage immortal streams in workspaces",
61+
Long: "Immortal streams provide persistent TCP connections to workspace services that automatically reconnect when interrupted.",
62+
Middleware: serpent.Chain(
63+
r.InitClient(client),
64+
),
65+
Handler: func(inv *serpent.Invocation) error {
66+
return inv.Command.HelpHandler(inv)
67+
},
68+
Children: []*serpent.Command{
69+
r.immortalStreamListCmd(),
70+
r.immortalStreamDeleteCmd(),
71+
},
72+
}
73+
return cmd
74+
}
75+
76+
func (r *RootCmd) immortalStreamListCmd() *serpent.Command {
77+
client := new(codersdk.Client)
78+
cmd := &serpent.Command{
79+
Use: "list <workspace-name>",
80+
Short: "List active immortal streams in a workspace",
81+
Middleware: serpent.Chain(
82+
serpent.RequireNArgs(1),
83+
r.InitClient(client),
84+
),
85+
Handler: func(inv *serpent.Invocation) error {
86+
ctx := inv.Context()
87+
workspaceName := inv.Args[0]
88+
89+
workspace, workspaceAgent, _, err := getWorkspaceAndAgent(ctx, inv, client, false, workspaceName)
90+
if err != nil {
91+
return err
92+
}
93+
94+
if workspace.LatestBuild.Transition != codersdk.WorkspaceTransitionStart {
95+
return xerrors.New("workspace must be running to list immortal streams")
96+
}
97+
98+
// Create immortal stream client
99+
// Note: We don't need to dial the agent for management operations
100+
// as these go through the server's proxy endpoints
101+
streamClient := newImmortalStreamClient(client, workspaceAgent.ID, inv.Logger)
102+
streams, err := streamClient.listStreams(ctx)
103+
if err != nil {
104+
return xerrors.Errorf("list immortal streams: %w", err)
105+
}
106+
107+
if len(streams) == 0 {
108+
cliui.Info(inv.Stderr, "No active immortal streams found.")
109+
return nil
110+
}
111+
112+
// Display the streams in a table
113+
displayImmortalStreams(inv, streams)
114+
return nil
115+
},
116+
}
117+
return cmd
118+
}
119+
120+
func (r *RootCmd) immortalStreamDeleteCmd() *serpent.Command {
121+
client := new(codersdk.Client)
122+
cmd := &serpent.Command{
123+
Use: "delete <workspace-name> <immortal-stream-name>",
124+
Short: "Delete an active immortal stream",
125+
Middleware: serpent.Chain(
126+
serpent.RequireNArgs(2),
127+
r.InitClient(client),
128+
),
129+
Handler: func(inv *serpent.Invocation) error {
130+
ctx := inv.Context()
131+
workspaceName := inv.Args[0]
132+
streamName := inv.Args[1]
133+
134+
workspace, workspaceAgent, _, err := getWorkspaceAndAgent(ctx, inv, client, false, workspaceName)
135+
if err != nil {
136+
return err
137+
}
138+
139+
if workspace.LatestBuild.Transition != codersdk.WorkspaceTransitionStart {
140+
return xerrors.New("workspace must be running to delete immortal streams")
141+
}
142+
143+
// Create immortal stream client
144+
streamClient := newImmortalStreamClient(client, workspaceAgent.ID, inv.Logger)
145+
streams, err := streamClient.listStreams(ctx)
146+
if err != nil {
147+
return xerrors.Errorf("list immortal streams: %w", err)
148+
}
149+
150+
var targetStream *codersdk.ImmortalStream
151+
for _, stream := range streams {
152+
if stream.Name == streamName {
153+
targetStream = &stream
154+
break
155+
}
156+
}
157+
158+
if targetStream == nil {
159+
return xerrors.Errorf("immortal stream %q not found", streamName)
160+
}
161+
162+
// Delete the stream
163+
err = streamClient.deleteStream(ctx, targetStream.ID)
164+
if err != nil {
165+
return xerrors.Errorf("delete immortal stream: %w", err)
166+
}
167+
168+
cliui.Info(inv.Stderr, fmt.Sprintf("Deleted immortal stream %q (ID: %s)", streamName, targetStream.ID))
169+
return nil
170+
},
171+
}
172+
return cmd
173+
}
174+
175+
func displayImmortalStreams(inv *serpent.Invocation, streams []codersdk.ImmortalStream) {
176+
_, _ = fmt.Fprintf(inv.Stderr, "Active Immortal Streams:\n\n")
177+
_, _ = fmt.Fprintf(inv.Stderr, "%-20s %-6s %-20s %-20s\n", "NAME", "PORT", "CREATED", "LAST CONNECTED")
178+
_, _ = fmt.Fprintf(inv.Stderr, "%-20s %-6s %-20s %-20s\n", "----", "----", "-------", "--------------")
179+
180+
for _, stream := range streams {
181+
createdTime := stream.CreatedAt.Format("2006-01-02 15:04:05")
182+
lastConnTime := stream.LastConnectionAt.Format("2006-01-02 15:04:05")
183+
184+
_, _ = fmt.Fprintf(inv.Stderr, "%-20s %-6d %-20s %-20s\n",
185+
stream.Name, stream.TCPPort, createdTime, lastConnTime)
186+
}
187+
_, _ = fmt.Fprintf(inv.Stderr, "\n")
188+
}

cli/portforward.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func (r *RootCmd) portForward() *serpent.Command {
3939
udpForwards []string // <port>:<port>
4040
disableAutostart bool
4141
appearanceConfig codersdk.AppearanceConfig
42+
43+
// Immortal streams flags
44+
immortal bool
45+
immortalFallback bool = true // Default to true for port-forward
4246
)
4347
client := new(codersdk.Client)
4448
cmd := &serpent.Command{
@@ -212,6 +216,19 @@ func (r *RootCmd) portForward() *serpent.Command {
212216
Description: "Forward UDP port(s) from the workspace to the local machine. The UDP connection has TCP-like semantics to support stateful UDP protocols.",
213217
Value: serpent.StringArrayOf(&udpForwards),
214218
},
219+
{
220+
Flag: "immortal",
221+
Description: "Use immortal streams for port forwarding connections, providing automatic reconnection when interrupted.",
222+
Value: serpent.BoolOf(&immortal),
223+
Hidden: true,
224+
},
225+
{
226+
Flag: "immortal-fallback",
227+
Description: "If immortal streams are unavailable due to connection limits, fall back to regular TCP connection.",
228+
Default: "true",
229+
Value: serpent.BoolOf(&immortalFallback),
230+
Hidden: true,
231+
},
215232
sshDisableAutostartOption(serpent.BoolOf(&disableAutostart)),
216233
}
217234

0 commit comments

Comments
 (0)