Skip to content

Commit 2193ef9

Browse files
committed
chore: added immortal streams, manager and agent API integration
1 parent 2a84799 commit 2193ef9

File tree

9 files changed

+2860
-0
lines changed

9 files changed

+2860
-0
lines changed

agent/agent.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/coder/coder/v2/agent/agentexec"
4242
"github.com/coder/coder/v2/agent/agentscripts"
4343
"github.com/coder/coder/v2/agent/agentssh"
44+
"github.com/coder/coder/v2/agent/immortalstreams"
4445
"github.com/coder/coder/v2/agent/proto"
4546
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
4647
"github.com/coder/coder/v2/agent/reconnectingpty"
@@ -280,6 +281,9 @@ type agent struct {
280281
devcontainers bool
281282
containerAPIOptions []agentcontainers.Option
282283
containerAPI *agentcontainers.API
284+
285+
// Immortal streams
286+
immortalStreamsManager *immortalstreams.Manager
283287
}
284288

285289
func (a *agent) TailnetConn() *tailnet.Conn {
@@ -347,6 +351,9 @@ func (a *agent) init() {
347351

348352
a.containerAPI = agentcontainers.NewAPI(a.logger.Named("containers"), containerAPIOpts...)
349353

354+
// Initialize immortal streams manager
355+
a.immortalStreamsManager = immortalstreams.New(a.logger.Named("immortal-streams"), &net.Dialer{})
356+
350357
a.reconnectingPTYServer = reconnectingpty.NewServer(
351358
a.logger.Named("reconnecting-pty"),
352359
a.sshServer,
@@ -1930,6 +1937,12 @@ func (a *agent) Close() error {
19301937
a.logger.Error(a.hardCtx, "container API close", slog.Error(err))
19311938
}
19321939

1940+
if a.immortalStreamsManager != nil {
1941+
if err := a.immortalStreamsManager.Close(); err != nil {
1942+
a.logger.Error(a.hardCtx, "immortal streams manager close", slog.Error(err))
1943+
}
1944+
}
1945+
19331946
// Wait for the graceful shutdown to complete, but don't wait forever so
19341947
// that we don't break user expectations.
19351948
go func() {

agent/api.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/go-chi/chi/v5"
99
"github.com/google/uuid"
1010

11+
"github.com/coder/coder/v2/coderd/agentapi"
1112
"github.com/coder/coder/v2/coderd/httpapi"
1213
"github.com/coder/coder/v2/codersdk"
1314
)
@@ -66,6 +67,12 @@ func (a *agent) apiHandler() http.Handler {
6667
r.Get("/debug/manifest", a.HandleHTTPDebugManifest)
6768
r.Get("/debug/prometheus", promHandler.ServeHTTP)
6869

70+
// Mount immortal streams API
71+
if a.immortalStreamsManager != nil {
72+
immortalStreamsHandler := agentapi.NewImmortalStreamsHandler(a.logger, a.immortalStreamsManager)
73+
r.Mount("/api/v0/immortal-stream", immortalStreamsHandler.Routes())
74+
}
75+
6976
return r
7077
}
7178

agent/immortalstreams/manager.go

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package immortalstreams
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net"
8+
"sync"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/moby/moby/pkg/namesgenerator"
13+
"golang.org/x/xerrors"
14+
15+
"cdr.dev/slog"
16+
"github.com/coder/coder/v2/codersdk"
17+
)
18+
19+
const (
20+
// MaxStreams is the maximum number of immortal streams allowed per agent
21+
MaxStreams = 32
22+
// BufferSize is the size of the ring buffer for each stream (64 MiB)
23+
BufferSize = 64 * 1024 * 1024
24+
)
25+
26+
// Manager manages immortal streams for an agent
27+
type Manager struct {
28+
logger slog.Logger
29+
30+
mu sync.RWMutex
31+
streams map[uuid.UUID]*Stream
32+
33+
// dialer is used to dial local services
34+
dialer Dialer
35+
}
36+
37+
// Dialer dials a local service
38+
type Dialer interface {
39+
DialContext(ctx context.Context, network, address string) (net.Conn, error)
40+
}
41+
42+
// New creates a new immortal streams manager
43+
func New(logger slog.Logger, dialer Dialer) *Manager {
44+
return &Manager{
45+
logger: logger,
46+
streams: make(map[uuid.UUID]*Stream),
47+
dialer: dialer,
48+
}
49+
}
50+
51+
// CreateStream creates a new immortal stream
52+
func (m *Manager) CreateStream(ctx context.Context, port int) (*codersdk.ImmortalStream, error) {
53+
m.mu.Lock()
54+
defer m.mu.Unlock()
55+
56+
// Check if we're at the limit
57+
if len(m.streams) >= MaxStreams {
58+
// Try to evict a disconnected stream
59+
evicted := m.evictOldestDisconnectedLocked()
60+
if !evicted {
61+
return nil, xerrors.New("too many immortal streams")
62+
}
63+
}
64+
65+
// Dial the local service
66+
addr := fmt.Sprintf("localhost:%d", port)
67+
conn, err := m.dialer.DialContext(ctx, "tcp", addr)
68+
if err != nil {
69+
if isConnectionRefused(err) {
70+
return nil, xerrors.Errorf("the connection was refused")
71+
}
72+
return nil, xerrors.Errorf("dial local service: %w", err)
73+
}
74+
75+
// Create the stream
76+
id := uuid.New()
77+
name := namesgenerator.GetRandomName(0)
78+
stream := NewStream(
79+
id,
80+
name,
81+
port,
82+
m.logger.With(slog.F("stream_id", id), slog.F("stream_name", name)),
83+
)
84+
85+
// Start the stream
86+
if err := stream.Start(conn); err != nil {
87+
_ = conn.Close()
88+
return nil, xerrors.Errorf("start stream: %w", err)
89+
}
90+
91+
m.streams[id] = stream
92+
93+
return &codersdk.ImmortalStream{
94+
ID: id,
95+
Name: name,
96+
TCPPort: port,
97+
CreatedAt: stream.createdAt,
98+
LastConnectionAt: stream.createdAt,
99+
}, nil
100+
}
101+
102+
// GetStream returns a stream by ID
103+
func (m *Manager) GetStream(id uuid.UUID) (*Stream, bool) {
104+
m.mu.RLock()
105+
defer m.mu.RUnlock()
106+
stream, ok := m.streams[id]
107+
return stream, ok
108+
}
109+
110+
// ListStreams returns all streams
111+
func (m *Manager) ListStreams() []codersdk.ImmortalStream {
112+
m.mu.RLock()
113+
defer m.mu.RUnlock()
114+
115+
streams := make([]codersdk.ImmortalStream, 0, len(m.streams))
116+
for _, stream := range m.streams {
117+
streams = append(streams, stream.ToAPI())
118+
}
119+
return streams
120+
}
121+
122+
// DeleteStream deletes a stream by ID
123+
func (m *Manager) DeleteStream(id uuid.UUID) error {
124+
m.mu.Lock()
125+
defer m.mu.Unlock()
126+
127+
stream, ok := m.streams[id]
128+
if !ok {
129+
return xerrors.New("stream not found")
130+
}
131+
132+
if err := stream.Close(); err != nil {
133+
m.logger.Warn(context.Background(), "failed to close stream", slog.Error(err))
134+
}
135+
136+
delete(m.streams, id)
137+
return nil
138+
}
139+
140+
// Close closes all streams
141+
func (m *Manager) Close() error {
142+
m.mu.Lock()
143+
defer m.mu.Unlock()
144+
145+
var firstErr error
146+
for id, stream := range m.streams {
147+
if err := stream.Close(); err != nil && firstErr == nil {
148+
firstErr = err
149+
}
150+
delete(m.streams, id)
151+
}
152+
return firstErr
153+
}
154+
155+
// evictOldestDisconnectedLocked evicts the oldest disconnected stream
156+
// Must be called with mu held
157+
func (m *Manager) evictOldestDisconnectedLocked() bool {
158+
var (
159+
oldestID uuid.UUID
160+
oldestDisconnected time.Time
161+
found bool
162+
)
163+
164+
for id, stream := range m.streams {
165+
if stream.IsConnected() {
166+
continue
167+
}
168+
169+
disconnectedAt := stream.LastDisconnectionAt()
170+
171+
// Prioritize streams that have actually been disconnected over never-connected streams
172+
switch {
173+
case !found:
174+
oldestID = id
175+
oldestDisconnected = disconnectedAt
176+
found = true
177+
case disconnectedAt.IsZero() && !oldestDisconnected.IsZero():
178+
// Keep the current choice (it was actually disconnected)
179+
continue
180+
case !disconnectedAt.IsZero() && oldestDisconnected.IsZero():
181+
// Prefer this stream (it was actually disconnected) over never-connected
182+
oldestID = id
183+
oldestDisconnected = disconnectedAt
184+
case !disconnectedAt.IsZero() && !oldestDisconnected.IsZero():
185+
// Both were actually disconnected, pick the oldest
186+
if disconnectedAt.Before(oldestDisconnected) {
187+
oldestID = id
188+
oldestDisconnected = disconnectedAt
189+
}
190+
}
191+
// If both are zero time, keep the first one found
192+
}
193+
194+
if !found {
195+
return false
196+
}
197+
198+
// Close and remove the oldest disconnected stream
199+
if stream, ok := m.streams[oldestID]; ok {
200+
m.logger.Info(context.Background(), "evicting oldest disconnected stream",
201+
slog.F("stream_id", oldestID),
202+
slog.F("stream_name", stream.name),
203+
slog.F("disconnected_at", oldestDisconnected))
204+
205+
if err := stream.Close(); err != nil {
206+
m.logger.Warn(context.Background(), "failed to close evicted stream", slog.Error(err))
207+
}
208+
delete(m.streams, oldestID)
209+
}
210+
211+
return true
212+
}
213+
214+
// HandleConnection handles a new connection for an existing stream
215+
func (m *Manager) HandleConnection(id uuid.UUID, conn io.ReadWriteCloser, readSeqNum uint64) error {
216+
m.mu.RLock()
217+
stream, ok := m.streams[id]
218+
m.mu.RUnlock()
219+
220+
if !ok {
221+
return xerrors.New("stream not found")
222+
}
223+
224+
return stream.HandleReconnect(conn, readSeqNum)
225+
}
226+
227+
// isConnectionRefused checks if an error is a connection refused error
228+
func isConnectionRefused(err error) bool {
229+
var opErr *net.OpError
230+
if xerrors.As(err, &opErr) {
231+
return opErr.Op == "dial"
232+
}
233+
return false
234+
}

0 commit comments

Comments
 (0)