-
Notifications
You must be signed in to change notification settings - Fork 887
fix: ensure wsproxy MultiAgent
is closed when websocket dies
#11414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -431,6 +431,7 @@ type CoordinateNodes struct { | |
|
||
func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, error) { | ||
ctx, cancel := context.WithCancel(ctx) | ||
logger := c.SDKClient.Logger().Named("multiagent") | ||
|
||
coordinateURL, err := c.SDKClient.URL.Parse("/api/v2/workspaceproxies/me/coordinate") | ||
if err != nil { | ||
|
@@ -454,12 +455,13 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro | |
return nil, xerrors.Errorf("dial coordinate websocket: %w", err) | ||
} | ||
|
||
go httpapi.HeartbeatClose(ctx, cancel, conn) | ||
go httpapi.HeartbeatClose(ctx, logger, cancel, conn) | ||
|
||
nc := websocket.NetConn(ctx, conn, websocket.MessageText) | ||
rma := remoteMultiAgentHandler{ | ||
sdk: c, | ||
nc: nc, | ||
cancel: cancel, | ||
legacyAgentCache: map[uuid.UUID]bool{}, | ||
} | ||
|
||
|
@@ -472,6 +474,11 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro | |
OnRemove: func(agpl.Queue) { conn.Close(websocket.StatusGoingAway, "closed") }, | ||
}).Init() | ||
|
||
go func() { | ||
<-ctx.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm understanding this correctly, we're depending on the fact that the reader goroutine below cancels the context on a failed read. I think we should also tear down the multi-agent on a failed write of subscription messages. It's unlikely that we'd have a failure that leaves the connection half-open (e.g. for reads but not writes), but such things are possible and you don't want the proxy limping on unable to subscribe to new agents. |
||
ma.Close() | ||
}() | ||
|
||
go func() { | ||
defer cancel() | ||
dec := json.NewDecoder(nc) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
[Re: lines 488 to 488]
I think it's worth dropping an INFO log here. See this comment inline on Graphite. |
||
|
@@ -480,16 +487,17 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro | |
err := dec.Decode(&msg) | ||
if err != nil { | ||
if xerrors.Is(err, io.EOF) { | ||
logger.Info(ctx, "websocket connection severed", slog.Error(err)) | ||
return | ||
} | ||
|
||
c.SDKClient.Logger().Error(ctx, "failed to decode coordinator nodes", slog.Error(err)) | ||
logger.Error(ctx, "decode coordinator nodes", slog.Error(err)) | ||
return | ||
} | ||
|
||
err = ma.Enqueue(msg.Nodes) | ||
if err != nil { | ||
c.SDKClient.Logger().Error(ctx, "enqueue nodes from coordinator", slog.Error(err)) | ||
logger.Error(ctx, "enqueue nodes from coordinator", slog.Error(err)) | ||
continue | ||
} | ||
} | ||
|
@@ -499,8 +507,9 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro | |
} | ||
|
||
type remoteMultiAgentHandler struct { | ||
sdk *Client | ||
nc net.Conn | ||
sdk *Client | ||
nc net.Conn | ||
cancel func() | ||
|
||
legacyMu sync.RWMutex | ||
legacyAgentCache map[uuid.UUID]bool | ||
|
@@ -517,10 +526,12 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error { | |
// Node updates are tiny, so even the dinkiest connection can handle them if it's not hung. | ||
err = a.nc.SetWriteDeadline(time.Now().Add(agpl.WriteTimeout)) | ||
if err != nil { | ||
a.cancel() | ||
return xerrors.Errorf("set write deadline: %w", err) | ||
} | ||
_, err = a.nc.Write(data) | ||
if err != nil { | ||
a.cancel() | ||
return xerrors.Errorf("write message: %w", err) | ||
} | ||
|
||
|
@@ -531,6 +542,7 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error { | |
// our successful write, it is important that we reset the deadline before it fires. | ||
err = a.nc.SetWriteDeadline(time.Time{}) | ||
if err != nil { | ||
a.cancel() | ||
return xerrors.Errorf("clear write deadline: %w", err) | ||
} | ||
|
||
|
@@ -573,7 +585,7 @@ func (a *remoteMultiAgentHandler) AgentIsLegacy(agentID uuid.UUID) bool { | |
return a.sdk.AgentIsLegacy(ctx, agentID) | ||
}) | ||
if err != nil { | ||
a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.Error(err)) | ||
a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.F("agent_id", agentID), slog.Error(err)) | ||
|
||
// Assume that the agent is legacy since this failed, while less | ||
// efficient it will always work. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop an INFO log here
See this comment inline on Graphite.