Skip to content

Commit 55e6b2d

Browse files
committed
improve ws closing for leaked go routines
1 parent a6cf367 commit 55e6b2d

File tree

1 file changed

+25
-8
lines changed

1 file changed

+25
-8
lines changed

agent/immortalstreams/handler.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,18 +174,25 @@ func (h *Handler) handleUpgrade(w http.ResponseWriter, r *http.Request) {
174174
h.logger.Error(ctx, "failed to accept websocket", slog.Error(err))
175175
return
176176
}
177-
defer conn.Close(websocket.StatusInternalError, "internal error")
178177

179-
// BackedPipe handles sequence numbers internally
180-
// No need to expose them through the API
178+
// Create a context that we can cancel to clean up the connection
179+
connCtx, cancel := context.WithCancel(ctx)
180+
defer cancel()
181+
182+
// Ensure WebSocket is closed when this function returns
183+
defer func() {
184+
conn.Close(websocket.StatusNormalClosure, "connection closed")
185+
}()
181186

182187
// Create a WebSocket adapter
183188
wsConn := &wsConn{
184189
conn: conn,
185190
logger: h.logger,
191+
ctx: connCtx,
192+
cancel: cancel,
186193
}
187194

188-
// Handle the reconnection
195+
// Handle the reconnection - this establishes the connection
189196
// BackedPipe only needs the reader sequence number for replay
190197
err = h.manager.HandleConnection(streamID, wsConn, readSeqNum)
191198
if err != nil {
@@ -194,19 +201,26 @@ func (h *Handler) handleUpgrade(w http.ResponseWriter, r *http.Request) {
194201
return
195202
}
196203

197-
// Keep the connection open until it's closed
198-
<-ctx.Done()
204+
// Keep the connection open until the context is cancelled
205+
// The wsConn will handle connection closure through its Read/Write methods
206+
// When the connection is closed, the backing pipe will detect it and the context should be cancelled
207+
<-connCtx.Done()
208+
h.logger.Debug(ctx, "websocket connection handler exiting")
199209
}
200210

201211
// wsConn adapts a WebSocket connection to io.ReadWriteCloser
202212
type wsConn struct {
203213
conn *websocket.Conn
204214
logger slog.Logger
215+
ctx context.Context
216+
cancel context.CancelFunc
205217
}
206218

207219
func (c *wsConn) Read(p []byte) (n int, err error) {
208-
typ, data, err := c.conn.Read(context.Background())
220+
typ, data, err := c.conn.Read(c.ctx)
209221
if err != nil {
222+
// Cancel the context when read fails (connection closed)
223+
c.cancel()
210224
return 0, err
211225
}
212226
if typ != websocket.MessageBinary {
@@ -217,14 +231,17 @@ func (c *wsConn) Read(p []byte) (n int, err error) {
217231
}
218232

219233
func (c *wsConn) Write(p []byte) (n int, err error) {
220-
err = c.conn.Write(context.Background(), websocket.MessageBinary, p)
234+
err = c.conn.Write(c.ctx, websocket.MessageBinary, p)
221235
if err != nil {
236+
// Cancel the context when write fails (connection closed)
237+
c.cancel()
222238
return 0, err
223239
}
224240
return len(p), nil
225241
}
226242

227243
func (c *wsConn) Close() error {
244+
c.cancel() // Cancel the context when explicitly closed
228245
return c.conn.Close(websocket.StatusNormalClosure, "")
229246
}
230247

0 commit comments

Comments
 (0)