Skip to content

Commit fa4eff3

Browse files
committed
chore: immortal streams manager and agent api integration
1 parent e95c9a4 commit fa4eff3

File tree

9 files changed

+2673
-0
lines changed

9 files changed

+2673
-0
lines changed

agent/agent.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/coder/coder/v2/agent/agentexec"
4242
"github.com/coder/coder/v2/agent/agentscripts"
4343
"github.com/coder/coder/v2/agent/agentssh"
44+
"github.com/coder/coder/v2/agent/immortalstreams"
4445
"github.com/coder/coder/v2/agent/proto"
4546
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
4647
"github.com/coder/coder/v2/agent/reconnectingpty"
@@ -280,6 +281,9 @@ type agent struct {
280281
devcontainers bool
281282
containerAPIOptions []agentcontainers.Option
282283
containerAPI *agentcontainers.API
284+
285+
// Immortal streams
286+
immortalStreamsManager *immortalstreams.Manager
283287
}
284288

285289
func (a *agent) TailnetConn() *tailnet.Conn {
@@ -347,6 +351,9 @@ func (a *agent) init() {
347351

348352
a.containerAPI = agentcontainers.NewAPI(a.logger.Named("containers"), containerAPIOpts...)
349353

354+
// Initialize immortal streams manager
355+
a.immortalStreamsManager = immortalstreams.New(a.logger.Named("immortal-streams"), &net.Dialer{})
356+
350357
a.reconnectingPTYServer = reconnectingpty.NewServer(
351358
a.logger.Named("reconnecting-pty"),
352359
a.sshServer,
@@ -1930,6 +1937,12 @@ func (a *agent) Close() error {
19301937
a.logger.Error(a.hardCtx, "container API close", slog.Error(err))
19311938
}
19321939

1940+
if a.immortalStreamsManager != nil {
1941+
if err := a.immortalStreamsManager.Close(); err != nil {
1942+
a.logger.Error(a.hardCtx, "immortal streams manager close", slog.Error(err))
1943+
}
1944+
}
1945+
19331946
// Wait for the graceful shutdown to complete, but don't wait forever so
19341947
// that we don't break user expectations.
19351948
go func() {

agent/api.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/go-chi/chi/v5"
99
"github.com/google/uuid"
1010

11+
"github.com/coder/coder/v2/coderd/agentapi"
1112
"github.com/coder/coder/v2/coderd/httpapi"
1213
"github.com/coder/coder/v2/codersdk"
1314
)
@@ -66,6 +67,12 @@ func (a *agent) apiHandler() http.Handler {
6667
r.Get("/debug/manifest", a.HandleHTTPDebugManifest)
6768
r.Get("/debug/prometheus", promHandler.ServeHTTP)
6869

70+
// Mount immortal streams API
71+
if a.immortalStreamsManager != nil {
72+
immortalStreamsHandler := agentapi.NewImmortalStreamsHandler(a.logger, a.immortalStreamsManager)
73+
r.Mount("/api/v0/immortal-stream", immortalStreamsHandler.Routes())
74+
}
75+
6976
return r
7077
}
7178

agent/immortalstreams/manager.go

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
package immortalstreams
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net"
8+
"sync"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/moby/moby/pkg/namesgenerator"
13+
"golang.org/x/xerrors"
14+
15+
"cdr.dev/slog"
16+
"github.com/coder/coder/v2/codersdk"
17+
)
18+
19+
const (
20+
// MaxStreams is the maximum number of immortal streams allowed per agent
21+
MaxStreams = 32
22+
// BufferSize is the size of the ring buffer for each stream (64 MiB)
23+
BufferSize = 64 * 1024 * 1024
24+
)
25+
26+
// Manager manages immortal streams for an agent
27+
type Manager struct {
28+
logger slog.Logger
29+
30+
mu sync.RWMutex
31+
streams map[uuid.UUID]*Stream
32+
33+
// dialer is used to dial local services
34+
dialer Dialer
35+
}
36+
37+
// Dialer dials a local service
38+
type Dialer interface {
39+
DialContext(ctx context.Context, network, address string) (net.Conn, error)
40+
}
41+
42+
// New creates a new immortal streams manager
43+
func New(logger slog.Logger, dialer Dialer) *Manager {
44+
return &Manager{
45+
logger: logger,
46+
streams: make(map[uuid.UUID]*Stream),
47+
dialer: dialer,
48+
}
49+
}
50+
51+
// CreateStream creates a new immortal stream
52+
func (m *Manager) CreateStream(ctx context.Context, port int) (*codersdk.ImmortalStream, error) {
53+
m.mu.Lock()
54+
defer m.mu.Unlock()
55+
56+
// Check if we're at the limit
57+
if len(m.streams) >= MaxStreams {
58+
// Try to evict a disconnected stream
59+
evicted := m.evictOldestDisconnectedLocked()
60+
if !evicted {
61+
return nil, xerrors.New("too many immortal streams")
62+
}
63+
}
64+
65+
// Dial the local service
66+
addr := fmt.Sprintf("localhost:%d", port)
67+
conn, err := m.dialer.DialContext(ctx, "tcp", addr)
68+
if err != nil {
69+
if isConnectionRefused(err) {
70+
return nil, xerrors.Errorf("the connection was refused")
71+
}
72+
return nil, xerrors.Errorf("dial local service: %w", err)
73+
}
74+
75+
// Create the stream
76+
id := uuid.New()
77+
name := namesgenerator.GetRandomName(0)
78+
stream := NewStream(
79+
id,
80+
name,
81+
port,
82+
m.logger.With(slog.F("stream_id", id), slog.F("stream_name", name)),
83+
BufferSize,
84+
)
85+
86+
// Start the stream
87+
if err := stream.Start(conn); err != nil {
88+
_ = conn.Close()
89+
return nil, xerrors.Errorf("start stream: %w", err)
90+
}
91+
92+
m.streams[id] = stream
93+
94+
return &codersdk.ImmortalStream{
95+
ID: id,
96+
Name: name,
97+
TCPPort: port,
98+
CreatedAt: stream.createdAt,
99+
LastConnectionAt: stream.createdAt,
100+
}, nil
101+
}
102+
103+
// GetStream returns a stream by ID
104+
func (m *Manager) GetStream(id uuid.UUID) (*Stream, bool) {
105+
m.mu.RLock()
106+
defer m.mu.RUnlock()
107+
stream, ok := m.streams[id]
108+
return stream, ok
109+
}
110+
111+
// ListStreams returns all streams
112+
func (m *Manager) ListStreams() []codersdk.ImmortalStream {
113+
m.mu.RLock()
114+
defer m.mu.RUnlock()
115+
116+
streams := make([]codersdk.ImmortalStream, 0, len(m.streams))
117+
for _, stream := range m.streams {
118+
streams = append(streams, stream.ToAPI())
119+
}
120+
return streams
121+
}
122+
123+
// DeleteStream deletes a stream by ID
124+
func (m *Manager) DeleteStream(id uuid.UUID) error {
125+
m.mu.Lock()
126+
defer m.mu.Unlock()
127+
128+
stream, ok := m.streams[id]
129+
if !ok {
130+
return xerrors.New("stream not found")
131+
}
132+
133+
if err := stream.Close(); err != nil {
134+
m.logger.Warn(context.Background(), "failed to close stream", slog.Error(err))
135+
}
136+
137+
delete(m.streams, id)
138+
return nil
139+
}
140+
141+
// Close closes all streams
142+
func (m *Manager) Close() error {
143+
m.mu.Lock()
144+
defer m.mu.Unlock()
145+
146+
var firstErr error
147+
for id, stream := range m.streams {
148+
if err := stream.Close(); err != nil && firstErr == nil {
149+
firstErr = err
150+
}
151+
delete(m.streams, id)
152+
}
153+
return firstErr
154+
}
155+
156+
// evictOldestDisconnectedLocked evicts the oldest disconnected stream
157+
// Must be called with mu held
158+
func (m *Manager) evictOldestDisconnectedLocked() bool {
159+
var (
160+
oldestID uuid.UUID
161+
oldestDisconnected time.Time
162+
found bool
163+
)
164+
165+
for id, stream := range m.streams {
166+
if stream.IsConnected() {
167+
continue
168+
}
169+
170+
disconnectedAt := stream.LastDisconnectionAt()
171+
172+
// Prioritize streams that have actually been disconnected over never-connected streams
173+
switch {
174+
case !found:
175+
oldestID = id
176+
oldestDisconnected = disconnectedAt
177+
found = true
178+
case disconnectedAt.IsZero() && !oldestDisconnected.IsZero():
179+
// Keep the current choice (it was actually disconnected)
180+
continue
181+
case !disconnectedAt.IsZero() && oldestDisconnected.IsZero():
182+
// Prefer this stream (it was actually disconnected) over never-connected
183+
oldestID = id
184+
oldestDisconnected = disconnectedAt
185+
case !disconnectedAt.IsZero() && !oldestDisconnected.IsZero():
186+
// Both were actually disconnected, pick the oldest
187+
if disconnectedAt.Before(oldestDisconnected) {
188+
oldestID = id
189+
oldestDisconnected = disconnectedAt
190+
}
191+
}
192+
// If both are zero time, keep the first one found
193+
}
194+
195+
if !found {
196+
return false
197+
}
198+
199+
// Close and remove the oldest disconnected stream
200+
if stream, ok := m.streams[oldestID]; ok {
201+
m.logger.Info(context.Background(), "evicting oldest disconnected stream",
202+
slog.F("stream_id", oldestID),
203+
slog.F("stream_name", stream.name),
204+
slog.F("disconnected_at", oldestDisconnected))
205+
206+
if err := stream.Close(); err != nil {
207+
m.logger.Warn(context.Background(), "failed to close evicted stream", slog.Error(err))
208+
}
209+
delete(m.streams, oldestID)
210+
}
211+
212+
return true
213+
}
214+
215+
// HandleConnection handles a new connection for an existing stream
216+
func (m *Manager) HandleConnection(id uuid.UUID, conn io.ReadWriteCloser, readSeqNum uint64) error {
217+
m.mu.RLock()
218+
stream, ok := m.streams[id]
219+
m.mu.RUnlock()
220+
221+
if !ok {
222+
return xerrors.New("stream not found")
223+
}
224+
225+
return stream.HandleReconnect(conn, readSeqNum)
226+
}
227+
228+
// isConnectionRefused checks if an error is a connection refused error
229+
func isConnectionRefused(err error) bool {
230+
var opErr *net.OpError
231+
if xerrors.As(err, &opErr) {
232+
return opErr.Op == "dial"
233+
}
234+
return false
235+
}

0 commit comments

Comments
 (0)