Skip to content

Commit 18e4981

Browse files
committed
move get sse stream from a separate method to automatic connection when notifications/initialized received
1 parent e3b4496 commit 18e4981

File tree

4 files changed

+23
-41
lines changed

4 files changed

+23
-41
lines changed

src/client/streamableHttp.test.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,7 @@ describe("StreamableHTTPClientTransport", () => {
164164
// We expect the 405 error to be caught and handled gracefully
165165
// This should not throw an error that breaks the transport
166166
await transport.start();
167-
await expect(transport.openSseStream()).rejects.toThrow("Failed to open SSE stream: Method Not Allowed");
168-
167+
await expect(transport["_startOrAuthStandaloneSSE"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
169168
// Check that GET was attempted
170169
expect(global.fetch).toHaveBeenCalledWith(
171170
expect.anything(),
@@ -209,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => {
209208
transport.onmessage = messageSpy;
210209

211210
await transport.start();
212-
await transport.openSseStream();
211+
await transport["_startOrAuthStandaloneSSE"]();
213212

214213
// Give time for the SSE event to be processed
215214
await new Promise(resolve => setTimeout(resolve, 50));
@@ -295,7 +294,7 @@ describe("StreamableHTTPClientTransport", () => {
295294
});
296295

297296
await transport.start();
298-
await transport.openSseStream();
297+
await transport["_startOrAuthStandaloneSSE"]();
299298
await new Promise(resolve => setTimeout(resolve, 50));
300299

301300
// Now simulate attempting to reconnect
@@ -306,7 +305,7 @@ describe("StreamableHTTPClientTransport", () => {
306305
body: null
307306
});
308307

309-
await transport.openSseStream();
308+
await transport["_startOrAuthStandaloneSSE"]();
310309

311310
// Check that Last-Event-ID was included
312311
const calls = (global.fetch as jest.Mock).mock.calls;
@@ -366,7 +365,7 @@ describe("StreamableHTTPClientTransport", () => {
366365

367366
await transport.start();
368367

369-
await transport.openSseStream();
368+
await transport["_startOrAuthStandaloneSSE"]();
370369
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue");
371370

372371
requestInit.headers["X-Custom-Header"] = "SecondCustomValue";

src/client/streamableHttp.ts

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Transport } from "../shared/transport.js";
2-
import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
2+
import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
33
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
44
import { EventSourceParserStream } from "eventsource-parser/stream";
55

@@ -126,6 +126,12 @@ export class StreamableHTTPClientTransport implements Transport {
126126
return await this._authThenStart();
127127
}
128128

129+
// 405 indicates that the server does not offer an SSE stream at GET endpoint
130+
// This is an expected case that should not trigger an error
131+
if (response.status === 405) {
132+
return;
133+
}
134+
129135
throw new StreamableHTTPError(
130136
response.status,
131137
`Failed to open SSE stream: ${response.statusText}`,
@@ -243,6 +249,12 @@ export class StreamableHTTPClientTransport implements Transport {
243249

244250
// If the response is 202 Accepted, there's no body to process
245251
if (response.status === 202) {
252+
// if the accepted notification is initialized, we start the SSE stream
253+
// if it's supported by the server
254+
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
255+
// We don't need to handle 405 here anymore as it's handled in _startOrAuthStandaloneSSE
256+
this._startOrAuthStandaloneSSE().catch(err => this.onerror?.(err));
257+
}
246258
return;
247259
}
248260

@@ -279,20 +291,4 @@ export class StreamableHTTPClientTransport implements Transport {
279291
throw error;
280292
}
281293
}
282-
283-
/**
284-
* Opens SSE stream to receive messages from the server.
285-
*
286-
* This allows the server to push messages to the client without requiring the client
287-
* to first send a request via HTTP POST. Some servers may not support this feature.
288-
* If authentication is required but fails, this method will throw an UnauthorizedError.
289-
*/
290-
async openSseStream(): Promise<void> {
291-
if (!this._abortController) {
292-
throw new Error(
293-
"StreamableHTTPClientTransport not started! Call connect() before openSseStream().",
294-
);
295-
}
296-
await this._startOrAuthStandaloneSSE();
297-
}
298294
}

src/examples/client/simpleStreamableHttp.ts

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,10 @@ async function main(): Promise<void> {
2525
const transport = new StreamableHTTPClientTransport(
2626
new URL('http://localhost:3000/mcp')
2727
);
28-
let supportsStandaloneSse = false;
29-
30-
// Connect the client using the transport and initialize the server
28+
29+
// Connect the client using the transport and initialize the server
3130
await client.connect(transport);
3231
console.log('Connected to MCP server');
33-
console.log('Opening SSE stream to receive server notifications...');
34-
try {
35-
await transport.openSseStream();
36-
supportsStandaloneSse = true;
37-
console.log('SSE stream established successfully. Waiting for notifications...');
38-
}
39-
catch (error) {
40-
console.error('Failed to open SSE stream:', error);
41-
}
4232

4333
// Set up notification handlers for server-initiated messages
4434
client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => {
@@ -136,11 +126,8 @@ async function main(): Promise<void> {
136126
} catch (error) {
137127
console.log(`Resources not supported by this server (${error})`);
138128
}
139-
if (supportsStandaloneSse) {
140-
// Instead of closing immediately, keep the connection open to receive notifications
141-
console.log('\nKeeping connection open to receive notifications. Press Ctrl+C to exit.');
142-
}
143-
129+
// Keep the connection open to receive notifications
130+
console.log('\nKeeping connection open to receive notifications. Press Ctrl+C to exit.');
144131
}
145132

146133
main().catch((error: unknown) => {

src/server/streamableHttp.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ describe("StreamableHTTPServerTransport", () => {
578578

579579
expect(mockResponse.writeHead).toHaveBeenCalledWith(200, expect.objectContaining({
580580
"Content-Type": "text/event-stream",
581-
"Cache-Control": "no-cache",
581+
"Cache-Control": "no-cache, no-transform",
582582
"Connection": "keep-alive",
583583
"mcp-session-id": transport.sessionId,
584584
}));

0 commit comments

Comments
 (0)