diff --git a/coderd/httpapi/websocket.go b/coderd/httpapi/websocket.go index 60904396099a1..ad3b4b277dff4 100644 --- a/coderd/httpapi/websocket.go +++ b/coderd/httpapi/websocket.go @@ -4,6 +4,7 @@ import ( "context" "time" + "cdr.dev/slog" "nhooyr.io/websocket" ) @@ -26,10 +27,10 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) { } } -// Heartbeat loops to ping a WebSocket to keep it alive. It kills the connection -// on ping failure. -func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) { - ticker := time.NewTicker(30 * time.Second) +// Heartbeat loops to ping a WebSocket to keep it alive. It calls `exit` on ping +// failure. +func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *websocket.Conn) { + ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() for { @@ -41,6 +42,7 @@ func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) { err := conn.Ping(ctx) if err != nil { _ = conn.Close(websocket.StatusGoingAway, "Ping failed") + logger.Info(ctx, "failed to heartbeat ping", slog.Error(err)) exit() return } diff --git a/coderd/tailnet.go b/coderd/tailnet.go index b04f3dc519fec..6521d79149b48 100644 --- a/coderd/tailnet.go +++ b/coderd/tailnet.go @@ -224,6 +224,7 @@ func (s *ServerTailnet) watchAgentUpdates() { nodes, ok := conn.NextUpdate(s.ctx) if !ok { if conn.IsClosed() && s.ctx.Err() == nil { + s.logger.Warn(s.ctx, "multiagent closed, reinitializing") s.reinitCoordinator() continue } @@ -247,6 +248,7 @@ func (s *ServerTailnet) getAgentConn() tailnet.MultiAgentConn { } func (s *ServerTailnet) reinitCoordinator() { + start := time.Now() for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(s.ctx); { s.nodesMu.Lock() agentConn, err := s.getMultiAgent(s.ctx) @@ -264,6 +266,11 @@ func (s *ServerTailnet) reinitCoordinator() { s.logger.Warn(s.ctx, "resubscribe to agent", slog.Error(err), slog.F("agent_id", agentID)) } } + + s.logger.Info(s.ctx, "successfully reinitialized multiagent", + slog.F("agents", len(s.agentConnectionTimes)), + slog.F("took", time.Since(start)), + ) s.nodesMu.Unlock() return } diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index c00ab834b7c25..142d0b5c1ee57 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -431,6 +431,7 @@ type CoordinateNodes struct { func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, error) { ctx, cancel := context.WithCancel(ctx) + logger := c.SDKClient.Logger().Named("multiagent") coordinateURL, err := c.SDKClient.URL.Parse("/api/v2/workspaceproxies/me/coordinate") if err != nil { @@ -454,12 +455,13 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro return nil, xerrors.Errorf("dial coordinate websocket: %w", err) } - go httpapi.HeartbeatClose(ctx, cancel, conn) + go httpapi.HeartbeatClose(ctx, logger, cancel, conn) nc := websocket.NetConn(ctx, conn, websocket.MessageText) rma := remoteMultiAgentHandler{ sdk: c, nc: nc, + cancel: cancel, legacyAgentCache: map[uuid.UUID]bool{}, } @@ -472,6 +474,11 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro OnRemove: func(agpl.Queue) { conn.Close(websocket.StatusGoingAway, "closed") }, }).Init() + go func() { + <-ctx.Done() + ma.Close() + }() + go func() { defer cancel() dec := json.NewDecoder(nc) @@ -480,16 +487,17 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro err := dec.Decode(&msg) if err != nil { if xerrors.Is(err, io.EOF) { + logger.Info(ctx, "websocket connection severed", slog.Error(err)) return } - c.SDKClient.Logger().Error(ctx, "failed to decode coordinator nodes", slog.Error(err)) + logger.Error(ctx, "decode coordinator nodes", slog.Error(err)) return } err = ma.Enqueue(msg.Nodes) if err != nil { - c.SDKClient.Logger().Error(ctx, "enqueue nodes from coordinator", slog.Error(err)) + logger.Error(ctx, "enqueue nodes from coordinator", slog.Error(err)) continue } } @@ -499,8 +507,9 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro } type remoteMultiAgentHandler struct { - sdk *Client - nc net.Conn + sdk *Client + nc net.Conn + cancel func() legacyMu sync.RWMutex legacyAgentCache map[uuid.UUID]bool @@ -517,10 +526,12 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error { // Node updates are tiny, so even the dinkiest connection can handle them if it's not hung. err = a.nc.SetWriteDeadline(time.Now().Add(agpl.WriteTimeout)) if err != nil { + a.cancel() return xerrors.Errorf("set write deadline: %w", err) } _, err = a.nc.Write(data) if err != nil { + a.cancel() return xerrors.Errorf("write message: %w", err) } @@ -531,6 +542,7 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error { // our successful write, it is important that we reset the deadline before it fires. err = a.nc.SetWriteDeadline(time.Time{}) if err != nil { + a.cancel() return xerrors.Errorf("clear write deadline: %w", err) } @@ -573,7 +585,7 @@ func (a *remoteMultiAgentHandler) AgentIsLegacy(agentID uuid.UUID) bool { return a.sdk.AgentIsLegacy(ctx, agentID) }) if err != nil { - a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.Error(err)) + a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.F("agent_id", agentID), slog.Error(err)) // Assume that the agent is legacy since this failed, while less // efficient it will always work.