10
10
import org .tinystruct .http .SSEPushManager ;
11
11
import org .tinystruct .system .annotation .Action ;
12
12
import org .tinystruct .system .annotation .Argument ;
13
+ import org .tinystruct .http .Header ;
13
14
14
15
import java .util .UUID ;
15
16
import java .util .logging .Level ;
27
28
public abstract class MCPApplication extends AbstractApplication {
28
29
private static final Logger LOGGER = Logger .getLogger (MCPApplication .class .getName ());
29
30
30
- protected SSEHandler sseHandler ;
31
31
protected JsonRpcHandler jsonRpcHandler ;
32
32
protected AuthorizationHandler authHandler ;
33
33
34
34
private boolean initialized = false ;
35
35
protected SessionState sessionState = SessionState .DISCONNECTED ;
36
- private final String sessionId = UUID . randomUUID (). toString ();
36
+ protected final Map < String , Object > sessionMap = new ConcurrentHashMap <>(); // sessionId -> user info or state
37
37
38
38
// Generic registries for tools, resources, prompts, and custom RPC handlers
39
39
protected final Map <String , MCPTool > tools = new java .util .concurrent .ConcurrentHashMap <>();
40
40
protected final Map <String , MCPDataResource > resources = new java .util .concurrent .ConcurrentHashMap <>();
41
41
protected final Map <String , MCPPrompt > prompts = new java .util .concurrent .ConcurrentHashMap <>();
42
42
protected final Map <String , RpcMethodHandler > rpcHandlers = new java .util .concurrent .ConcurrentHashMap <>();
43
43
44
- @ Override
44
+
45
45
/**
46
46
* Initializes the MCP application, setting up authentication, SSE, JSON-RPC handler,
47
47
* and registering core protocol handlers. Subclasses should call super.init() and
48
48
* may register additional handlers for custom protocol extensions.
49
49
*/
50
+ @ Override
50
51
public void init () {
51
52
this .setTemplateRequired (false );
52
53
// Generate a random auth token if not configured
@@ -56,7 +57,6 @@ public void init() {
56
57
LOGGER .info ("Generated new MCP auth token: " + token );
57
58
}
58
59
this .authHandler = new AuthorizationHandler (getConfiguration ().get (Config .AUTH_TOKEN ));
59
- this .sseHandler = new SSEHandler ();
60
60
this .jsonRpcHandler = new JsonRpcHandler ();
61
61
62
62
// Register core protocol handlers
@@ -92,7 +92,7 @@ private boolean isAllowedPreInitialization(String method) {
92
92
}
93
93
94
94
/**
95
- * Main entry point for handling JSON-RPC requests.
95
+ * Main entry point for handling JSON-RPC requests via Streamable HTTP POST .
96
96
* Authenticates, parses, dispatches, and returns the response.
97
97
*
98
98
* @param request The HTTP request
@@ -105,11 +105,23 @@ public String handleRpcRequest(Request request, Response response) throws Applic
105
105
try {
106
106
// Validate authentication
107
107
authHandler .validateAuthHeader (request );
108
-
108
+ // Session management: extract or assign sessionId
109
+ String sessionId = (String ) request .headers ().get (Header .value0f ("Mcp-Session-Id" ));
110
+ if (sessionId == null || sessionId .isEmpty ()) {
111
+ // If this is an initialize request, assign a new sessionId
112
+ String jsonStr = request .body ();
113
+ if (jsonStr .contains ("\" method\" :\" initialize\" " )) {
114
+ sessionId = request .getSession ().getId ();
115
+ response .addHeader ("Mcp-Session-Id" , sessionId );
116
+ sessionMap .put (sessionId , System .currentTimeMillis ()); // Store session state as needed
117
+ } else {
118
+ response .setStatus (ResponseStatus .UNAUTHORIZED );
119
+ return jsonRpcHandler .createErrorResponse ("Missing session ID" , ErrorCodes .UNAUTHORIZED );
120
+ }
121
+ }
109
122
// Parse the JSON-RPC request
110
123
String jsonStr = request .body ();
111
124
LOGGER .fine ("Received JSON: " + jsonStr );
112
-
113
125
// Add batch request support
114
126
if (jsonStr .trim ().startsWith ("[" )) {
115
127
return jsonRpcHandler .handleBatchRequest (jsonStr , (rpcReq , rpcRes ) -> {
@@ -121,15 +133,12 @@ public String handleRpcRequest(Request request, Response response) throws Applic
121
133
}
122
134
});
123
135
}
124
-
125
136
if (!jsonRpcHandler .validateJsonRpcRequest (jsonStr )) {
126
137
response .setStatus (ResponseStatus .BAD_REQUEST );
127
138
return jsonRpcHandler .createErrorResponse ("Invalid JSON-RPC request" , ErrorCodes .INVALID_REQUEST );
128
139
}
129
-
130
140
JsonRpcRequest rpcRequest = new JsonRpcRequest ();
131
141
rpcRequest .parse (jsonStr );
132
-
133
142
JsonRpcResponse jsonResponse = new JsonRpcResponse ();
134
143
// Restrict methods before READY
135
144
String method = rpcRequest .getMethod ();
@@ -143,6 +152,8 @@ public String handleRpcRequest(Request request, Response response) throws Applic
143
152
jsonResponse .setError (new JsonRpcError (ErrorCodes .METHOD_NOT_FOUND , "Method not found: " + method ));
144
153
}
145
154
}
155
+ // For now, always return JSON. SSE streaming support to be added in GET endpoint.
156
+ // response.addHeader("Content-Type", "application/json");
146
157
return jsonResponse .toString ();
147
158
} catch (SecurityException e ) {
148
159
response .setStatus (ResponseStatus .UNAUTHORIZED );
@@ -235,7 +246,6 @@ protected void handleShutdown(JsonRpcRequest request, JsonRpcResponse response)
235
246
return ;
236
247
}
237
248
238
- sseHandler .closeAll ();
239
249
sessionState = SessionState .DISCONNECTED ;
240
250
241
251
response .setId (request .getId ());
@@ -250,11 +260,28 @@ protected void handleShutdown(JsonRpcRequest request, JsonRpcResponse response)
250
260
* @param response The JSON-RPC response to populate
251
261
*/
252
262
protected void handleGetStatus (JsonRpcRequest request , JsonRpcResponse response ) {
263
+ String sessionId = null ;
264
+ if (request .getParams () != null && request .getParams ().get ("sessionId" ) != null ) {
265
+ sessionId = request .getParams ().get ("sessionId" ).toString ();
266
+ }
253
267
Builder result = new Builder ();
268
+ if (sessionId == null || !sessionMap .containsKey (sessionId )) {
269
+ response .setError (new JsonRpcError (ErrorCodes .UNAUTHORIZED , "Invalid or missing sessionId" ));
270
+ return ;
271
+ }
272
+ Object sessionStartObj = sessionMap .get (sessionId );
273
+ long sessionStart ;
274
+ if (sessionStartObj instanceof Long ) {
275
+ sessionStart = (Long ) sessionStartObj ;
276
+ } else if (sessionStartObj instanceof Number ) {
277
+ sessionStart = ((Number ) sessionStartObj ).longValue ();
278
+ } else {
279
+ response .setError (new JsonRpcError (ErrorCodes .INTERNAL_ERROR , "Session start time invalid" ));
280
+ return ;
281
+ }
254
282
result .put ("state" , sessionState .toString ());
255
283
result .put ("sessionId" , sessionId );
256
- result .put ("uptime" , System .currentTimeMillis () - sseHandler .getFirstSessionCreatedAt ());
257
-
284
+ result .put ("uptime" , System .currentTimeMillis () - sessionStart );
258
285
response .setId (request .getId ());
259
286
response .setResult (result );
260
287
}
0 commit comments