@@ -842,8 +842,8 @@ describe("StreamableHTTPServerTransport", () => {
842
842
} ,
843
843
body : JSON . stringify ( initMessage ) ,
844
844
} ) ;
845
-
846
- await transport . handleRequest ( initReq , mockResponse ) ;
845
+ const initResponse = createMockResponse ( ) ;
846
+ await transport . handleRequest ( initReq , initResponse ) ;
847
847
mockResponse . writeHead . mockClear ( ) ;
848
848
} ) ;
849
849
@@ -934,6 +934,136 @@ describe("StreamableHTTPServerTransport", () => {
934
934
// Now stream should be closed
935
935
expect ( mockResponse . end ) . toHaveBeenCalled ( ) ;
936
936
} ) ;
937
+
938
+ it ( "should keep stream open when multiple requests share the same connection" , async ( ) => {
939
+ // Create a fresh response for this test
940
+ const sharedResponse = createMockResponse ( ) ;
941
+
942
+ // Send two requests in a batch that will share the same connection
943
+ const batchRequests : JSONRPCMessage [ ] = [
944
+ { jsonrpc : "2.0" , method : "method1" , params : { } , id : "req1" } ,
945
+ { jsonrpc : "2.0" , method : "method2" , params : { } , id : "req2" }
946
+ ] ;
947
+
948
+ const req = createMockRequest ( {
949
+ method : "POST" ,
950
+ headers : {
951
+ "content-type" : "application/json" ,
952
+ "accept" : "application/json, text/event-stream" ,
953
+ "mcp-session-id" : transport . sessionId
954
+ } ,
955
+ body : JSON . stringify ( batchRequests )
956
+ } ) ;
957
+
958
+ await transport . handleRequest ( req , sharedResponse ) ;
959
+
960
+ // Respond to first request
961
+ const response1 : JSONRPCMessage = {
962
+ jsonrpc : "2.0" ,
963
+ result : { value : "result1" } ,
964
+ id : "req1"
965
+ } ;
966
+
967
+ await transport . send ( response1 ) ;
968
+
969
+ // Connection should remain open because req2 is still pending
970
+ expect ( sharedResponse . write ) . toHaveBeenCalledWith (
971
+ expect . stringContaining ( `event: message\ndata: ${ JSON . stringify ( response1 ) } \n\n` )
972
+ ) ;
973
+ expect ( sharedResponse . end ) . not . toHaveBeenCalled ( ) ;
974
+
975
+ // Respond to second request
976
+ const response2 : JSONRPCMessage = {
977
+ jsonrpc : "2.0" ,
978
+ result : { value : "result2" } ,
979
+ id : "req2"
980
+ } ;
981
+
982
+ await transport . send ( response2 ) ;
983
+
984
+ // Now connection should close as all requests are complete
985
+ expect ( sharedResponse . write ) . toHaveBeenCalledWith (
986
+ expect . stringContaining ( `event: message\ndata: ${ JSON . stringify ( response2 ) } \n\n` )
987
+ ) ;
988
+ expect ( sharedResponse . end ) . toHaveBeenCalled ( ) ;
989
+ } ) ;
990
+
991
+ it ( "should clean up connection tracking when a response is sent" , async ( ) => {
992
+ const req = createMockRequest ( {
993
+ method : "POST" ,
994
+ headers : {
995
+ "content-type" : "application/json" ,
996
+ "accept" : "application/json, text/event-stream" ,
997
+ "mcp-session-id" : transport . sessionId
998
+ } ,
999
+ body : JSON . stringify ( {
1000
+ jsonrpc : "2.0" ,
1001
+ method : "test" ,
1002
+ params : { } ,
1003
+ id : "cleanup-test"
1004
+ } )
1005
+ } ) ;
1006
+
1007
+ const response = createMockResponse ( ) ;
1008
+ await transport . handleRequest ( req , response ) ;
1009
+
1010
+ // Verify that the request is tracked in the SSE map
1011
+ expect ( transport [ "_sseResponseMapping" ] . size ) . toBe ( 2 ) ;
1012
+ expect ( transport [ "_sseResponseMapping" ] . has ( "cleanup-test" ) ) . toBe ( true ) ;
1013
+
1014
+ // Send a response
1015
+ await transport . send ( {
1016
+ jsonrpc : "2.0" ,
1017
+ result : { } ,
1018
+ id : "cleanup-test"
1019
+ } ) ;
1020
+
1021
+ // Verify that the mapping was cleaned up
1022
+ expect ( transport [ "_sseResponseMapping" ] . size ) . toBe ( 1 ) ;
1023
+ expect ( transport [ "_sseResponseMapping" ] . has ( "cleanup-test" ) ) . toBe ( false ) ;
1024
+ } ) ;
1025
+
1026
+ it ( "should clean up connection tracking when client disconnects" , async ( ) => {
1027
+ // Setup two requests that share a connection
1028
+ const req = createMockRequest ( {
1029
+ method : "POST" ,
1030
+ headers : {
1031
+ "content-type" : "application/json" ,
1032
+ "accept" : "application/json, text/event-stream" ,
1033
+ "mcp-session-id" : transport . sessionId
1034
+ } ,
1035
+ body : JSON . stringify ( [
1036
+ { jsonrpc : "2.0" , method : "longRunning1" , params : { } , id : "req1" } ,
1037
+ { jsonrpc : "2.0" , method : "longRunning2" , params : { } , id : "req2" }
1038
+ ] )
1039
+ } ) ;
1040
+
1041
+ const response = createMockResponse ( ) ;
1042
+
1043
+ // We need to manually store the callback to trigger it later
1044
+ let closeCallback : ( ( ) => void ) | undefined ;
1045
+ response . on . mockImplementation ( ( event , callback : ( ) => void ) => {
1046
+ if ( typeof event === "string" && event === "close" ) {
1047
+ closeCallback = callback ;
1048
+ }
1049
+ return response ;
1050
+ } ) ;
1051
+
1052
+ await transport . handleRequest ( req , response ) ;
1053
+
1054
+ // Both requests should be mapped to the same response
1055
+ expect ( transport [ "_sseResponseMapping" ] . size ) . toBe ( 3 ) ;
1056
+ expect ( transport [ "_sseResponseMapping" ] . get ( "req1" ) ) . toBe ( response ) ;
1057
+ expect ( transport [ "_sseResponseMapping" ] . get ( "req2" ) ) . toBe ( response ) ;
1058
+
1059
+ // Simulate client disconnect by triggering the stored callback
1060
+ if ( closeCallback ) closeCallback ( ) ;
1061
+
1062
+ // All entries using this response should be removed
1063
+ expect ( transport [ "_sseResponseMapping" ] . size ) . toBe ( 1 ) ;
1064
+ expect ( transport [ "_sseResponseMapping" ] . has ( "req1" ) ) . toBe ( false ) ;
1065
+ expect ( transport [ "_sseResponseMapping" ] . has ( "req2" ) ) . toBe ( false ) ;
1066
+ } ) ;
937
1067
} ) ;
938
1068
939
1069
describe ( "Message Targeting" , ( ) => {
0 commit comments