Skip to content

Commit 27d2996

Browse files
authored
Merge pull request modelcontextprotocol#292 from modelcontextprotocol/ihrpr/fix-streamable-connection-close
Close SSE connection for request in Streamable Http Server implementation
2 parents 0445095 + ac3f2cf commit 27d2996

File tree

2 files changed

+157
-34
lines changed

2 files changed

+157
-34
lines changed

src/server/streamableHttp.test.ts

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -842,8 +842,8 @@ describe("StreamableHTTPServerTransport", () => {
842842
},
843843
body: JSON.stringify(initMessage),
844844
});
845-
846-
await transport.handleRequest(initReq, mockResponse);
845+
const initResponse = createMockResponse();
846+
await transport.handleRequest(initReq, initResponse);
847847
mockResponse.writeHead.mockClear();
848848
});
849849

@@ -934,6 +934,136 @@ describe("StreamableHTTPServerTransport", () => {
934934
// Now stream should be closed
935935
expect(mockResponse.end).toHaveBeenCalled();
936936
});
937+
938+
it("should keep stream open when multiple requests share the same connection", async () => {
939+
// Create a fresh response for this test
940+
const sharedResponse = createMockResponse();
941+
942+
// Send two requests in a batch that will share the same connection
943+
const batchRequests: JSONRPCMessage[] = [
944+
{ jsonrpc: "2.0", method: "method1", params: {}, id: "req1" },
945+
{ jsonrpc: "2.0", method: "method2", params: {}, id: "req2" }
946+
];
947+
948+
const req = createMockRequest({
949+
method: "POST",
950+
headers: {
951+
"content-type": "application/json",
952+
"accept": "application/json, text/event-stream",
953+
"mcp-session-id": transport.sessionId
954+
},
955+
body: JSON.stringify(batchRequests)
956+
});
957+
958+
await transport.handleRequest(req, sharedResponse);
959+
960+
// Respond to first request
961+
const response1: JSONRPCMessage = {
962+
jsonrpc: "2.0",
963+
result: { value: "result1" },
964+
id: "req1"
965+
};
966+
967+
await transport.send(response1);
968+
969+
// Connection should remain open because req2 is still pending
970+
expect(sharedResponse.write).toHaveBeenCalledWith(
971+
expect.stringContaining(`event: message\ndata: ${JSON.stringify(response1)}\n\n`)
972+
);
973+
expect(sharedResponse.end).not.toHaveBeenCalled();
974+
975+
// Respond to second request
976+
const response2: JSONRPCMessage = {
977+
jsonrpc: "2.0",
978+
result: { value: "result2" },
979+
id: "req2"
980+
};
981+
982+
await transport.send(response2);
983+
984+
// Now connection should close as all requests are complete
985+
expect(sharedResponse.write).toHaveBeenCalledWith(
986+
expect.stringContaining(`event: message\ndata: ${JSON.stringify(response2)}\n\n`)
987+
);
988+
expect(sharedResponse.end).toHaveBeenCalled();
989+
});
990+
991+
it("should clean up connection tracking when a response is sent", async () => {
992+
const req = createMockRequest({
993+
method: "POST",
994+
headers: {
995+
"content-type": "application/json",
996+
"accept": "application/json, text/event-stream",
997+
"mcp-session-id": transport.sessionId
998+
},
999+
body: JSON.stringify({
1000+
jsonrpc: "2.0",
1001+
method: "test",
1002+
params: {},
1003+
id: "cleanup-test"
1004+
})
1005+
});
1006+
1007+
const response = createMockResponse();
1008+
await transport.handleRequest(req, response);
1009+
1010+
// Verify that the request is tracked in the SSE map
1011+
expect(transport["_sseResponseMapping"].size).toBe(2);
1012+
expect(transport["_sseResponseMapping"].has("cleanup-test")).toBe(true);
1013+
1014+
// Send a response
1015+
await transport.send({
1016+
jsonrpc: "2.0",
1017+
result: {},
1018+
id: "cleanup-test"
1019+
});
1020+
1021+
// Verify that the mapping was cleaned up
1022+
expect(transport["_sseResponseMapping"].size).toBe(1);
1023+
expect(transport["_sseResponseMapping"].has("cleanup-test")).toBe(false);
1024+
});
1025+
1026+
it("should clean up connection tracking when client disconnects", async () => {
1027+
// Setup two requests that share a connection
1028+
const req = createMockRequest({
1029+
method: "POST",
1030+
headers: {
1031+
"content-type": "application/json",
1032+
"accept": "application/json, text/event-stream",
1033+
"mcp-session-id": transport.sessionId
1034+
},
1035+
body: JSON.stringify([
1036+
{ jsonrpc: "2.0", method: "longRunning1", params: {}, id: "req1" },
1037+
{ jsonrpc: "2.0", method: "longRunning2", params: {}, id: "req2" }
1038+
])
1039+
});
1040+
1041+
const response = createMockResponse();
1042+
1043+
// We need to manually store the callback to trigger it later
1044+
let closeCallback: (() => void) | undefined;
1045+
response.on.mockImplementation((event, callback: () => void) => {
1046+
if (typeof event === "string" && event === "close") {
1047+
closeCallback = callback;
1048+
}
1049+
return response;
1050+
});
1051+
1052+
await transport.handleRequest(req, response);
1053+
1054+
// Both requests should be mapped to the same response
1055+
expect(transport["_sseResponseMapping"].size).toBe(3);
1056+
expect(transport["_sseResponseMapping"].get("req1")).toBe(response);
1057+
expect(transport["_sseResponseMapping"].get("req2")).toBe(response);
1058+
1059+
// Simulate client disconnect by triggering the stored callback
1060+
if (closeCallback) closeCallback();
1061+
1062+
// All entries using this response should be removed
1063+
expect(transport["_sseResponseMapping"].size).toBe(1);
1064+
expect(transport["_sseResponseMapping"].has("req1")).toBe(false);
1065+
expect(transport["_sseResponseMapping"].has("req2")).toBe(false);
1066+
});
9371067
});
9381068

9391069
describe("Message Targeting", () => {

src/server/streamableHttp.ts

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -197,19 +197,7 @@ export class StreamableHTTPServerTransport implements Transport {
197197
}
198198
this.sessionId = this.sessionIdGenerator();
199199
this._initialized = true;
200-
const headers: Record<string, string> = {};
201200

202-
if (this.sessionId !== undefined) {
203-
headers["mcp-session-id"] = this.sessionId;
204-
}
205-
206-
// Process initialization messages before responding
207-
for (const message of messages) {
208-
this.onmessage?.(message);
209-
}
210-
211-
res.writeHead(200, headers).end();
212-
return;
213201
}
214202
// If an Mcp-Session-Id is returned by the server during initialization,
215203
// clients using the Streamable HTTP transport MUST include it
@@ -254,6 +242,16 @@ export class StreamableHTTPServerTransport implements Transport {
254242
}
255243
}
256244

245+
// Set up close handler for client disconnects
246+
res.on("close", () => {
247+
// Remove all entries that reference this response
248+
for (const [id, storedRes] of this._sseResponseMapping.entries()) {
249+
if (storedRes === res) {
250+
this._sseResponseMapping.delete(id);
251+
}
252+
}
253+
});
254+
257255
// handle each message
258256
for (const message of messages) {
259257
this.onmessage?.(message);
@@ -360,36 +358,31 @@ export class StreamableHTTPServerTransport implements Transport {
360358
}
361359

362360
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
363-
const relatedRequestId = options?.relatedRequestId;
364-
// SSE connections are established per POST request, for now we don't support it through the GET
365-
// this will be changed when we implement the GET SSE connection
366-
if (relatedRequestId === undefined) {
367-
throw new Error("relatedRequestId is required for Streamable HTTP transport");
361+
let requestId = options?.relatedRequestId;
362+
if ('result' in message || 'error' in message) {
363+
// If the message is a response, use the request ID from the message
364+
requestId = message.id;
365+
}
366+
if (requestId === undefined) {
367+
throw new Error("No request ID provided for the message");
368368
}
369369

370-
const sseResponse = this._sseResponseMapping.get(relatedRequestId);
370+
const sseResponse = this._sseResponseMapping.get(requestId);
371371
if (!sseResponse) {
372-
throw new Error(`No SSE connection established for request ID: ${String(relatedRequestId)}`);
372+
throw new Error(`No SSE connection established for request ID: ${String(requestId)}`);
373373
}
374374

375375
// Send the message as an SSE event
376376
sseResponse.write(
377377
`event: message\ndata: ${JSON.stringify(message)}\n\n`,
378378
);
379-
380-
// If this is a response message with the same ID as the request, we can check
381-
// if we need to close the stream after sending the response
379+
// After all JSON-RPC responses have been sent, the server SHOULD close the SSE stream.
382380
if ('result' in message || 'error' in message) {
383-
if (message.id === relatedRequestId) {
384-
// This is a response to the original request, we can close the stream
385-
// after sending all related responses
386-
this._sseResponseMapping.delete(relatedRequestId);
387-
388-
// Only close the connection if it's not needed by other requests
389-
const canCloseConnection = ![...this._sseResponseMapping.entries()].some(([id, res]) => res === sseResponse && id !== relatedRequestId);
390-
if (canCloseConnection) {
391-
sseResponse.end();
392-
}
381+
this._sseResponseMapping.delete(requestId);
382+
// Only close the connection if it's not needed by other requests
383+
const canCloseConnection = ![...this._sseResponseMapping.entries()].some(([id, res]) => res === sseResponse && id !== requestId);
384+
if (canCloseConnection) {
385+
sseResponse?.end();
393386
}
394387
}
395388
}

0 commit comments

Comments
 (0)