From 129b16df19cc8fad152cc32ef569825a8bc93a4c Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Thu, 4 Jan 2024 18:50:13 +0000 Subject: [PATCH 1/4] fix: ensure wsproxy `MultiAgent` is closed when websocket dies The `SingleTailnet` behavior only checked to see if the `MultiAgent` was closed, but the websocket error was not being propogated into the `MultiAgent`, causing it to never be swapped for a new working one. --- coderd/httpapi/websocket.go | 6 +++--- coderd/tailnet.go | 1 + enterprise/wsproxy/wsproxysdk/wsproxysdk.go | 5 +++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/coderd/httpapi/websocket.go b/coderd/httpapi/websocket.go index 60904396099a1..c99c56b0a4b34 100644 --- a/coderd/httpapi/websocket.go +++ b/coderd/httpapi/websocket.go @@ -26,10 +26,10 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) { } } -// Heartbeat loops to ping a WebSocket to keep it alive. It kills the connection -// on ping failure. +// Heartbeat loops to ping a WebSocket to keep it alive. It calls `exit` on ping +// failure. func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) { - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() for { diff --git a/coderd/tailnet.go b/coderd/tailnet.go index b04f3dc519fec..7030c1ea703fa 100644 --- a/coderd/tailnet.go +++ b/coderd/tailnet.go @@ -224,6 +224,7 @@ func (s *ServerTailnet) watchAgentUpdates() { nodes, ok := conn.NextUpdate(s.ctx) if !ok { if conn.IsClosed() && s.ctx.Err() == nil { + s.logger.Warn(s.ctx, "multiagent closed, reinitializing") s.reinitCoordinator() continue } diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index c00ab834b7c25..09c0680f0e6a0 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -472,6 +472,11 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro OnRemove: func(agpl.Queue) { conn.Close(websocket.StatusGoingAway, "closed") }, }).Init() + go func() { + <-ctx.Done() + ma.Close() + }() + go func() { defer cancel() dec := json.NewDecoder(nc) From 5c24d072bf23c3e799cabf572099e539b6159e9b Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Thu, 4 Jan 2024 20:23:56 +0000 Subject: [PATCH 2/4] add comment when successfully reinit'd --- coderd/tailnet.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/coderd/tailnet.go b/coderd/tailnet.go index 7030c1ea703fa..6521d79149b48 100644 --- a/coderd/tailnet.go +++ b/coderd/tailnet.go @@ -248,6 +248,7 @@ func (s *ServerTailnet) getAgentConn() tailnet.MultiAgentConn { } func (s *ServerTailnet) reinitCoordinator() { + start := time.Now() for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(s.ctx); { s.nodesMu.Lock() agentConn, err := s.getMultiAgent(s.ctx) @@ -265,6 +266,11 @@ func (s *ServerTailnet) reinitCoordinator() { s.logger.Warn(s.ctx, "resubscribe to agent", slog.Error(err), slog.F("agent_id", agentID)) } } + + s.logger.Info(s.ctx, "successfully reinitialized multiagent", + slog.F("agents", len(s.agentConnectionTimes)), + slog.F("took", time.Since(start)), + ) s.nodesMu.Unlock() return } From 72ad811afd7cc14f93c88cf5789b079b83e6cd2f Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Thu, 11 Jan 2024 16:35:23 +0000 Subject: [PATCH 3/4] address spike's comments --- enterprise/wsproxy/wsproxysdk/wsproxysdk.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index 09c0680f0e6a0..e5ed8f459214f 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -460,6 +460,7 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro rma := remoteMultiAgentHandler{ sdk: c, nc: nc, + cancel: cancel, legacyAgentCache: map[uuid.UUID]bool{}, } @@ -485,6 +486,7 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro err := dec.Decode(&msg) if err != nil { if xerrors.Is(err, io.EOF) { + c.SDKClient.Logger().Info(ctx, "multiagent connection severed", slog.Error(err)) return } @@ -504,8 +506,9 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro } type remoteMultiAgentHandler struct { - sdk *Client - nc net.Conn + sdk *Client + nc net.Conn + cancel func() legacyMu sync.RWMutex legacyAgentCache map[uuid.UUID]bool @@ -522,10 +525,12 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error { // Node updates are tiny, so even the dinkiest connection can handle them if it's not hung. err = a.nc.SetWriteDeadline(time.Now().Add(agpl.WriteTimeout)) if err != nil { + a.cancel() return xerrors.Errorf("set write deadline: %w", err) } _, err = a.nc.Write(data) if err != nil { + a.cancel() return xerrors.Errorf("write message: %w", err) } @@ -536,6 +541,7 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error { // our successful write, it is important that we reset the deadline before it fires. err = a.nc.SetWriteDeadline(time.Time{}) if err != nil { + a.cancel() return xerrors.Errorf("clear write deadline: %w", err) } From aa3f58100f066fd82aab698775146ed4414f66be Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Thu, 11 Jan 2024 16:49:26 +0000 Subject: [PATCH 4/4] one more log --- coderd/httpapi/websocket.go | 4 +++- enterprise/wsproxy/wsproxysdk/wsproxysdk.go | 11 ++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/coderd/httpapi/websocket.go b/coderd/httpapi/websocket.go index c99c56b0a4b34..ad3b4b277dff4 100644 --- a/coderd/httpapi/websocket.go +++ b/coderd/httpapi/websocket.go @@ -4,6 +4,7 @@ import ( "context" "time" + "cdr.dev/slog" "nhooyr.io/websocket" ) @@ -28,7 +29,7 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) { // Heartbeat loops to ping a WebSocket to keep it alive. It calls `exit` on ping // failure. -func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) { +func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *websocket.Conn) { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() @@ -41,6 +42,7 @@ func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) { err := conn.Ping(ctx) if err != nil { _ = conn.Close(websocket.StatusGoingAway, "Ping failed") + logger.Info(ctx, "failed to heartbeat ping", slog.Error(err)) exit() return } diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index e5ed8f459214f..142d0b5c1ee57 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -431,6 +431,7 @@ type CoordinateNodes struct { func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, error) { ctx, cancel := context.WithCancel(ctx) + logger := c.SDKClient.Logger().Named("multiagent") coordinateURL, err := c.SDKClient.URL.Parse("/api/v2/workspaceproxies/me/coordinate") if err != nil { @@ -454,7 +455,7 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro return nil, xerrors.Errorf("dial coordinate websocket: %w", err) } - go httpapi.HeartbeatClose(ctx, cancel, conn) + go httpapi.HeartbeatClose(ctx, logger, cancel, conn) nc := websocket.NetConn(ctx, conn, websocket.MessageText) rma := remoteMultiAgentHandler{ @@ -486,17 +487,17 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro err := dec.Decode(&msg) if err != nil { if xerrors.Is(err, io.EOF) { - c.SDKClient.Logger().Info(ctx, "multiagent connection severed", slog.Error(err)) + logger.Info(ctx, "websocket connection severed", slog.Error(err)) return } - c.SDKClient.Logger().Error(ctx, "failed to decode coordinator nodes", slog.Error(err)) + logger.Error(ctx, "decode coordinator nodes", slog.Error(err)) return } err = ma.Enqueue(msg.Nodes) if err != nil { - c.SDKClient.Logger().Error(ctx, "enqueue nodes from coordinator", slog.Error(err)) + logger.Error(ctx, "enqueue nodes from coordinator", slog.Error(err)) continue } } @@ -584,7 +585,7 @@ func (a *remoteMultiAgentHandler) AgentIsLegacy(agentID uuid.UUID) bool { return a.sdk.AgentIsLegacy(ctx, agentID) }) if err != nil { - a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.Error(err)) + a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.F("agent_id", agentID), slog.Error(err)) // Assume that the agent is legacy since this failed, while less // efficient it will always work.