Skip to content

Commit 4a08082

Browse files
authored
fix: ensure wsproxy MultiAgent is closed when websocket dies (#11414)
The `SingleTailnet` behavior only checked to see if the `MultiAgent` was closed, but the websocket error was not being propogated into the `MultiAgent`, causing it to never be swapped for a new working one. Fixes #11401 Before: ``` Coder Workspace Proxy v0.0.0-devel+85ff030 - Your Self-Hosted Remote Development Platform Started HTTP listener at http://0.0.0.0:3001 View the Web UI: http://127.0.0.1:3001 ==> Logs will stream in below (press ctrl+c to gracefully exit): 2024-01-04 20:11:56.376 [warn] net.workspace-proxy.servertailnet: broadcast server node to agents ... error= write message: github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk.(*remoteMultiAgentHandler).writeJSON /home/coder/coder/enterprise/wsproxy/wsproxysdk/wsproxysdk.go:524 - failed to write msg: WebSocket closed: failed to read frame header: EOF ``` After: ``` Coder Workspace Proxy v0.0.0-devel+12f1878 - Your Self-Hosted Remote Development Platform Started HTTP listener at http://0.0.0.0:3001 View the Web UI: http://127.0.0.1:3001 ==> Logs will stream in below (press ctrl+c to gracefully exit): 2024-01-04 20:26:38.545 [warn] net.workspace-proxy.servertailnet: multiagent closed, reinitializing 2024-01-04 20:26:38.546 [erro] net.workspace-proxy.servertailnet: reinit multi agent ... error= dial coordinate websocket: github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk.(*Client).DialCoordinator /home/coder/coder/enterprise/wsproxy/wsproxysdk/wsproxysdk.go:454 - failed to WebSocket dial: failed to send handshake request: Get "http://127.0.0.1:3000/api/v2/workspaceproxies/me/coordinate": dial tcp 127.0.0.1:3000: connect: connection refused 2024-01-04 20:26:38.587 [erro] net.workspace-proxy.servertailnet: reinit multi agent ... error= dial coordinate websocket: github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk.(*Client).DialCoordinator /home/coder/coder/enterprise/wsproxy/wsproxysdk/wsproxysdk.go:454 - failed to WebSocket dial: failed to send handshake request: Get "http://127.0.0.1:3000/api/v2/workspaceproxies/me/coordinate": dial tcp 127.0.0.1:3000: connect: connection refusedhandshake request: Get "http://127.0.0.1:3000/api/v2/workspaceproxies/me/coordinate": dial tcp 127.0.0.1:3000: connect: connection refused 2024-01-04 20:26:40.446 [info] net.workspace-proxy.servertailnet: successfully reinitialized multiagent agents=0 took=1.900892615s ```
1 parent d708ac7 commit 4a08082

File tree

3 files changed

+31
-10
lines changed

3 files changed

+31
-10
lines changed

coderd/httpapi/websocket.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"time"
66

7+
"cdr.dev/slog"
78
"nhooyr.io/websocket"
89
)
910

@@ -26,10 +27,10 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) {
2627
}
2728
}
2829

29-
// Heartbeat loops to ping a WebSocket to keep it alive. It kills the connection
30-
// on ping failure.
31-
func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) {
32-
ticker := time.NewTicker(30 * time.Second)
30+
// Heartbeat loops to ping a WebSocket to keep it alive. It calls `exit` on ping
31+
// failure.
32+
func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *websocket.Conn) {
33+
ticker := time.NewTicker(15 * time.Second)
3334
defer ticker.Stop()
3435

3536
for {
@@ -41,6 +42,7 @@ func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) {
4142
err := conn.Ping(ctx)
4243
if err != nil {
4344
_ = conn.Close(websocket.StatusGoingAway, "Ping failed")
45+
logger.Info(ctx, "failed to heartbeat ping", slog.Error(err))
4446
exit()
4547
return
4648
}

coderd/tailnet.go

+7
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ func (s *ServerTailnet) watchAgentUpdates() {
224224
nodes, ok := conn.NextUpdate(s.ctx)
225225
if !ok {
226226
if conn.IsClosed() && s.ctx.Err() == nil {
227+
s.logger.Warn(s.ctx, "multiagent closed, reinitializing")
227228
s.reinitCoordinator()
228229
continue
229230
}
@@ -247,6 +248,7 @@ func (s *ServerTailnet) getAgentConn() tailnet.MultiAgentConn {
247248
}
248249

249250
func (s *ServerTailnet) reinitCoordinator() {
251+
start := time.Now()
250252
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(s.ctx); {
251253
s.nodesMu.Lock()
252254
agentConn, err := s.getMultiAgent(s.ctx)
@@ -264,6 +266,11 @@ func (s *ServerTailnet) reinitCoordinator() {
264266
s.logger.Warn(s.ctx, "resubscribe to agent", slog.Error(err), slog.F("agent_id", agentID))
265267
}
266268
}
269+
270+
s.logger.Info(s.ctx, "successfully reinitialized multiagent",
271+
slog.F("agents", len(s.agentConnectionTimes)),
272+
slog.F("took", time.Since(start)),
273+
)
267274
s.nodesMu.Unlock()
268275
return
269276
}

enterprise/wsproxy/wsproxysdk/wsproxysdk.go

+18-6
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ type CoordinateNodes struct {
431431

432432
func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, error) {
433433
ctx, cancel := context.WithCancel(ctx)
434+
logger := c.SDKClient.Logger().Named("multiagent")
434435

435436
coordinateURL, err := c.SDKClient.URL.Parse("/api/v2/workspaceproxies/me/coordinate")
436437
if err != nil {
@@ -454,12 +455,13 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro
454455
return nil, xerrors.Errorf("dial coordinate websocket: %w", err)
455456
}
456457

457-
go httpapi.HeartbeatClose(ctx, cancel, conn)
458+
go httpapi.HeartbeatClose(ctx, logger, cancel, conn)
458459

459460
nc := websocket.NetConn(ctx, conn, websocket.MessageText)
460461
rma := remoteMultiAgentHandler{
461462
sdk: c,
462463
nc: nc,
464+
cancel: cancel,
463465
legacyAgentCache: map[uuid.UUID]bool{},
464466
}
465467

@@ -472,6 +474,11 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro
472474
OnRemove: func(agpl.Queue) { conn.Close(websocket.StatusGoingAway, "closed") },
473475
}).Init()
474476

477+
go func() {
478+
<-ctx.Done()
479+
ma.Close()
480+
}()
481+
475482
go func() {
476483
defer cancel()
477484
dec := json.NewDecoder(nc)
@@ -480,16 +487,17 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro
480487
err := dec.Decode(&msg)
481488
if err != nil {
482489
if xerrors.Is(err, io.EOF) {
490+
logger.Info(ctx, "websocket connection severed", slog.Error(err))
483491
return
484492
}
485493

486-
c.SDKClient.Logger().Error(ctx, "failed to decode coordinator nodes", slog.Error(err))
494+
logger.Error(ctx, "decode coordinator nodes", slog.Error(err))
487495
return
488496
}
489497

490498
err = ma.Enqueue(msg.Nodes)
491499
if err != nil {
492-
c.SDKClient.Logger().Error(ctx, "enqueue nodes from coordinator", slog.Error(err))
500+
logger.Error(ctx, "enqueue nodes from coordinator", slog.Error(err))
493501
continue
494502
}
495503
}
@@ -499,8 +507,9 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro
499507
}
500508

501509
type remoteMultiAgentHandler struct {
502-
sdk *Client
503-
nc net.Conn
510+
sdk *Client
511+
nc net.Conn
512+
cancel func()
504513

505514
legacyMu sync.RWMutex
506515
legacyAgentCache map[uuid.UUID]bool
@@ -517,10 +526,12 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error {
517526
// Node updates are tiny, so even the dinkiest connection can handle them if it's not hung.
518527
err = a.nc.SetWriteDeadline(time.Now().Add(agpl.WriteTimeout))
519528
if err != nil {
529+
a.cancel()
520530
return xerrors.Errorf("set write deadline: %w", err)
521531
}
522532
_, err = a.nc.Write(data)
523533
if err != nil {
534+
a.cancel()
524535
return xerrors.Errorf("write message: %w", err)
525536
}
526537

@@ -531,6 +542,7 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error {
531542
// our successful write, it is important that we reset the deadline before it fires.
532543
err = a.nc.SetWriteDeadline(time.Time{})
533544
if err != nil {
545+
a.cancel()
534546
return xerrors.Errorf("clear write deadline: %w", err)
535547
}
536548

@@ -573,7 +585,7 @@ func (a *remoteMultiAgentHandler) AgentIsLegacy(agentID uuid.UUID) bool {
573585
return a.sdk.AgentIsLegacy(ctx, agentID)
574586
})
575587
if err != nil {
576-
a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.Error(err))
588+
a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.F("agent_id", agentID), slog.Error(err))
577589

578590
// Assume that the agent is legacy since this failed, while less
579591
// efficient it will always work.

0 commit comments

Comments
 (0)