From b2d3e0098e484e172719237b0933fa395cdfdf4b Mon Sep 17 00:00:00 2001
From: Christian Tzolov
Date: Mon, 12 May 2025 15:04:05 +0200
Subject: [PATCH 1/7] Next development version
Signed-off-by: Christian Tzolov
---
mcp-bom/pom.xml | 2 +-
mcp-spring/mcp-spring-webflux/pom.xml | 6 +++---
mcp-spring/mcp-spring-webmvc/pom.xml | 6 +++---
mcp-test/pom.xml | 4 ++--
mcp/pom.xml | 2 +-
pom.xml | 2 +-
6 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/mcp-bom/pom.xml b/mcp-bom/pom.xml
index 4f24f719..7214dacd 100644
--- a/mcp-bom/pom.xml
+++ b/mcp-bom/pom.xml
@@ -7,7 +7,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOTmcp-bom
diff --git a/mcp-spring/mcp-spring-webflux/pom.xml b/mcp-spring/mcp-spring-webflux/pom.xml
index 86f46bf9..a8b92bd0 100644
--- a/mcp-spring/mcp-spring-webflux/pom.xml
+++ b/mcp-spring/mcp-spring-webflux/pom.xml
@@ -6,7 +6,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT../../pom.xmlmcp-spring-webflux
@@ -25,13 +25,13 @@
io.modelcontextprotocol.sdkmcp
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOTio.modelcontextprotocol.sdkmcp-test
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOTtest
diff --git a/mcp-spring/mcp-spring-webmvc/pom.xml b/mcp-spring/mcp-spring-webmvc/pom.xml
index 82fbbf3e..48d1c346 100644
--- a/mcp-spring/mcp-spring-webmvc/pom.xml
+++ b/mcp-spring/mcp-spring-webmvc/pom.xml
@@ -6,7 +6,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT../../pom.xmlmcp-spring-webmvc
@@ -25,13 +25,13 @@
io.modelcontextprotocol.sdkmcp
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOTio.modelcontextprotocol.sdkmcp-test
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOTtest
diff --git a/mcp-test/pom.xml b/mcp-test/pom.xml
index f1484ae7..a6e5bdb0 100644
--- a/mcp-test/pom.xml
+++ b/mcp-test/pom.xml
@@ -6,7 +6,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOTmcp-testjar
@@ -24,7 +24,7 @@
io.modelcontextprotocol.sdkmcp
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT
diff --git a/mcp/pom.xml b/mcp/pom.xml
index 17693ab3..77343282 100644
--- a/mcp/pom.xml
+++ b/mcp/pom.xml
@@ -6,7 +6,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOTmcpjar
diff --git a/pom.xml b/pom.xml
index 63845740..c2327ee8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOTpomhttps://github.com/modelcontextprotocol/java-sdk
From f34662555a0ab68d74ac118f1b0220441b2c81b2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?=
Date: Wed, 14 May 2025 15:38:02 +0200
Subject: [PATCH 2/7] Fix stdio tests - proper server-everything argument
(#237)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Dariusz Jędrzejczyk
---
.../modelcontextprotocol/client/StdioMcpAsyncClientTests.java | 4 ++--
.../modelcontextprotocol/client/StdioMcpSyncClientTests.java | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpAsyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpAsyncClientTests.java
index c3908013..8c0069d6 100644
--- a/mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpAsyncClientTests.java
+++ b/mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpAsyncClientTests.java
@@ -25,12 +25,12 @@ protected McpClientTransport createMcpTransport() {
ServerParameters stdioParams;
if (System.getProperty("os.name").toLowerCase().contains("win")) {
stdioParams = ServerParameters.builder("cmd.exe")
- .args("/c", "npx.cmd", "-y", "@modelcontextprotocol/server-everything", "dir")
+ .args("/c", "npx.cmd", "-y", "@modelcontextprotocol/server-everything", "stdio")
.build();
}
else {
stdioParams = ServerParameters.builder("npx")
- .args("-y", "@modelcontextprotocol/server-everything", "dir")
+ .args("-y", "@modelcontextprotocol/server-everything", "stdio")
.build();
}
return new StdioClientTransport(stdioParams);
diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpSyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpSyncClientTests.java
index 8e75c4a3..706aa9b2 100644
--- a/mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpSyncClientTests.java
+++ b/mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpSyncClientTests.java
@@ -33,12 +33,12 @@ protected McpClientTransport createMcpTransport() {
ServerParameters stdioParams;
if (System.getProperty("os.name").toLowerCase().contains("win")) {
stdioParams = ServerParameters.builder("cmd.exe")
- .args("/c", "npx.cmd", "-y", "@modelcontextprotocol/server-everything", "dir")
+ .args("/c", "npx.cmd", "-y", "@modelcontextprotocol/server-everything", "stdio")
.build();
}
else {
stdioParams = ServerParameters.builder("npx")
- .args("-y", "@modelcontextprotocol/server-everything", "dir")
+ .args("-y", "@modelcontextprotocol/server-everything", "stdio")
.build();
}
return new StdioClientTransport(stdioParams);
From 2e13f9f9df8610e0d05cc76b1416fe195e249303 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?=
Date: Wed, 14 May 2025 22:46:54 +0200
Subject: [PATCH 3/7] Fix flaky WebFluxSse integration test
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Dariusz Jędrzejczyk
---
.../WebFluxSseIntegrationTests.java | 46 ++++++++++---------
1 file changed, 24 insertions(+), 22 deletions(-)
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
index 2ba04746..03fbc996 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
@@ -8,6 +8,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
@@ -651,9 +653,11 @@ void testInitialize(String clientType) {
@ParameterizedTest(name = "{0} : {displayName} ")
@ValueSource(strings = { "httpclient", "webflux" })
- void testLoggingNotification(String clientType) {
+ void testLoggingNotification(String clientType) throws InterruptedException {
+ int expectedNotificationsCount = 3;
+ CountDownLatch latch = new CountDownLatch(expectedNotificationsCount);
// Create a list to store received logging notifications
- List receivedNotifications = new ArrayList<>();
+ List receivedNotifications = new CopyOnWriteArrayList<>();
var clientBuilder = clientBuilders.get(clientType);
@@ -709,6 +713,7 @@ void testLoggingNotification(String clientType) {
// Create client with logging notification handler
var mcpClient = clientBuilder.loggingConsumer(notification -> {
receivedNotifications.add(notification);
+ latch.countDown();
}).build()) {
// Initialize client
@@ -724,31 +729,28 @@ void testLoggingNotification(String clientType) {
assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class);
assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Logging test completed");
- // Wait for notifications to be processed
- await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
+ assertThat(latch.await(5, TimeUnit.SECONDS)).as("Should receive notifications in reasonable time").isTrue();
- // Should have received 3 notifications (1 NOTICE and 2 ERROR)
- assertThat(receivedNotifications).hasSize(3);
+ // Should have received 3 notifications (1 NOTICE and 2 ERROR)
+ assertThat(receivedNotifications).hasSize(expectedNotificationsCount);
- Map notificationMap = receivedNotifications.stream()
- .collect(Collectors.toMap(n -> n.data(), n -> n));
+ Map notificationMap = receivedNotifications.stream()
+ .collect(Collectors.toMap(n -> n.data(), n -> n));
- // First notification should be NOTICE level
- assertThat(notificationMap.get("Notice message").level()).isEqualTo(McpSchema.LoggingLevel.NOTICE);
- assertThat(notificationMap.get("Notice message").logger()).isEqualTo("test-logger");
- assertThat(notificationMap.get("Notice message").data()).isEqualTo("Notice message");
+ // First notification should be NOTICE level
+ assertThat(notificationMap.get("Notice message").level()).isEqualTo(McpSchema.LoggingLevel.NOTICE);
+ assertThat(notificationMap.get("Notice message").logger()).isEqualTo("test-logger");
+ assertThat(notificationMap.get("Notice message").data()).isEqualTo("Notice message");
- // Second notification should be ERROR level
- assertThat(notificationMap.get("Error message").level()).isEqualTo(McpSchema.LoggingLevel.ERROR);
- assertThat(notificationMap.get("Error message").logger()).isEqualTo("test-logger");
- assertThat(notificationMap.get("Error message").data()).isEqualTo("Error message");
+ // Second notification should be ERROR level
+ assertThat(notificationMap.get("Error message").level()).isEqualTo(McpSchema.LoggingLevel.ERROR);
+ assertThat(notificationMap.get("Error message").logger()).isEqualTo("test-logger");
+ assertThat(notificationMap.get("Error message").data()).isEqualTo("Error message");
- // Third notification should be ERROR level
- assertThat(notificationMap.get("Another error message").level())
- .isEqualTo(McpSchema.LoggingLevel.ERROR);
- assertThat(notificationMap.get("Another error message").logger()).isEqualTo("test-logger");
- assertThat(notificationMap.get("Another error message").data()).isEqualTo("Another error message");
- });
+ // Third notification should be ERROR level
+ assertThat(notificationMap.get("Another error message").level()).isEqualTo(McpSchema.LoggingLevel.ERROR);
+ assertThat(notificationMap.get("Another error message").logger()).isEqualTo("test-logger");
+ assertThat(notificationMap.get("Another error message").data()).isEqualTo("Another error message");
}
mcpServer.close();
}
From 1adfa8a047852c8f9e0188b4e63fe2020e0c66c5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?=
Date: Wed, 14 May 2025 14:05:39 +0200
Subject: [PATCH 4/7] Add Contributing Guidelines and Code of Conduct
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Dariusz Jędrzejczyk
---
CODE_OF_CONDUCT.md | 119 +++++++++++++++++++++++++++++++++++++++++++++
CONTRIBUTING.md | 91 ++++++++++++++++++++++++++++++++++
README.md | 7 +--
SECURITY.md | 21 ++++++++
4 files changed, 233 insertions(+), 5 deletions(-)
create mode 100644 CODE_OF_CONDUCT.md
create mode 100644 CONTRIBUTING.md
create mode 100644 SECURITY.md
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
new file mode 100644
index 00000000..6009a645
--- /dev/null
+++ b/CODE_OF_CONDUCT.md
@@ -0,0 +1,119 @@
+# Contributor Covenant Code of Conduct
+
+## Our Pledge
+
+We as members, contributors, and leaders pledge to make participation in our community a
+harassment-free experience for everyone, regardless of age, body size, visible or
+invisible disability, ethnicity, sex characteristics, gender identity and expression,
+level of experience, education, socio-economic status, nationality, personal appearance,
+race, religion, or sexual identity and orientation.
+
+We pledge to act and interact in ways that contribute to an open, welcoming, diverse,
+inclusive, and healthy community.
+
+## Our Standards
+
+Examples of behavior that contributes to a positive environment for our community
+include:
+
+- Demonstrating empathy and kindness toward other people
+- Being respectful of differing opinions, viewpoints, and experiences
+- Giving and gracefully accepting constructive feedback
+- Accepting responsibility and apologizing to those affected by our mistakes, and
+ learning from the experience
+- Focusing on what is best not just for us as individuals, but for the overall community
+
+Examples of unacceptable behavior include:
+
+- The use of sexualized language or imagery, and sexual attention or advances of any kind
+- Trolling, insulting or derogatory comments, and personal or political attacks
+- Public or private harassment
+- Publishing others' private information, such as a physical or email address, without
+ their explicit permission
+- Other conduct which could reasonably be considered inappropriate in a professional
+ setting
+
+## Enforcement Responsibilities
+
+Community leaders are responsible for clarifying and enforcing our standards of
+acceptable behavior and will take appropriate and fair corrective action in response to
+any behavior that they deem inappropriate, threatening, offensive, or harmful.
+
+Community leaders have the right and responsibility to remove, edit, or reject comments,
+commits, code, wiki edits, issues, and other contributions that are not aligned to this
+Code of Conduct, and will communicate reasons for moderation decisions when appropriate.
+
+## Scope
+
+This Code of Conduct applies within all community spaces, and also applies when an
+individual is officially representing the community in public spaces. Examples of
+representing our community include using an official e-mail address, posting via an
+official social media account, or acting as an appointed representative at an online or
+offline event.
+
+## Enforcement
+
+Instances of abusive, harassing, or otherwise unacceptable behavior may be reported to
+the community leaders responsible for enforcement at mcp-coc@anthropic.com. All
+complaints will be reviewed and investigated promptly and fairly.
+
+All community leaders are obligated to respect the privacy and security of the reporter
+of any incident.
+
+## Enforcement Guidelines
+
+Community leaders will follow these Community Impact Guidelines in determining the
+consequences for any action they deem in violation of this Code of Conduct:
+
+### 1. Correction
+
+**Community Impact**: Use of inappropriate language or other behavior deemed
+unprofessional or unwelcome in the community.
+
+**Consequence**: A private, written warning from community leaders, providing clarity
+around the nature of the violation and an explanation of why the behavior was
+inappropriate. A public apology may be requested.
+
+### 2. Warning
+
+**Community Impact**: A violation through a single incident or series of actions.
+
+**Consequence**: A warning with consequences for continued behavior. No interaction with
+the people involved, including unsolicited interaction with those enforcing the Code of
+Conduct, for a specified period of time. This includes avoiding interactions in community
+spaces as well as external channels like social media. Violating these terms may lead to
+a temporary or permanent ban.
+
+### 3. Temporary Ban
+
+**Community Impact**: A serious violation of community standards, including sustained
+inappropriate behavior.
+
+**Consequence**: A temporary ban from any sort of interaction or public communication
+with the community for a specified period of time. No public or private interaction with
+the people involved, including unsolicited interaction with those enforcing the Code of
+Conduct, is allowed during this period. Violating these terms may lead to a permanent
+ban.
+
+### 4. Permanent Ban
+
+**Community Impact**: Demonstrating a pattern of violation of community standards,
+including sustained inappropriate behavior, harassment of an individual, or aggression
+toward or disparagement of classes of individuals.
+
+**Consequence**: A permanent ban from any sort of public interaction within the
+community.
+
+## Attribution
+
+This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 2.0,
+available at https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
+
+Community Impact Guidelines were inspired by
+[Mozilla's code of conduct enforcement ladder](https://github.com/mozilla/diversity).
+
+[homepage]: https://www.contributor-covenant.org
+
+For answers to common questions about this code of conduct, see the FAQ at
+https://www.contributor-covenant.org/faq. Translations are available at
+https://www.contributor-covenant.org/translations.
\ No newline at end of file
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 00000000..a949dcc0
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,91 @@
+# Contributing to Model Context Protocol Java SDK
+
+Thank you for your interest in contributing to the Model Context Protocol Java SDK!
+This document outlines how to contribute to this project.
+
+## Prerequisites
+
+The following software is required to work on the codebase:
+
+- `Java 17` or above
+- `Docker`
+- `npx`
+
+## Getting Started
+
+1. Fork the repository
+2. Clone your fork:
+
+```bash
+git clone https://github.com/YOUR-USERNAME/java-sdk.git
+cd java-sdk
+```
+
+3. Build from source:
+
+```bash
+./mvnw clean install -DskipTests # skip the tests
+./mvnw test # run tests
+```
+
+## Reporting Issues
+
+Please create an issue in the repository if you discover a bug or would like to
+propose an enhancement. Bug reports should have a reproducer in the form of a code
+sample or a repository attached that the maintainers or contributors can work with to
+address the problem.
+
+## Making Changes
+
+1. Create a new branch:
+
+```bash
+git checkout -b feature/your-feature-name
+```
+
+2. Make your changes
+3. Validate your changes:
+
+```bash
+./mvnw clean test
+```
+
+### Change Proposal Guidelines
+
+#### Principles of MCP
+
+1. **Simple + Minimal**: It is much easier to add things to the codebase than it is to
+ remove them. To maintain simplicity, we keep a high bar for adding new concepts and
+ primitives as each addition requires maintenance and compatibility consideration.
+2. **Concrete**: Code changes need to be based on specific usage and implementation
+ challenges and not on speculative ideas. Most importantly, the SDK is meant to
+ implement the MCP specification.
+
+## Submitting Changes
+
+1. For non-trivial changes, please clarify with the maintainers in an issue whether
+ you can contribute the change and the desired scope of the change.
+2. For trivial changes (for example a couple of lines or documentation changes) there
+ is no need to open an issue first.
+3. Push your changes to your fork.
+4. Submit a pull request to the main repository.
+5. Follow the pull request template.
+6. Wait for review.
+
+## Code of Conduct
+
+This project follows a Code of Conduct. Please review it in
+[CODE_OF_CONDUCT.md](CODE_OF_CONDUCT.md).
+
+## Questions
+
+If you have questions, please create a discussion in the repository.
+
+## License
+
+By contributing, you agree that your contributions will be licensed under the MIT
+License.
+
+## Security
+
+Please review our [Security Policy](SECURITY.md) for reporting security issues.
\ No newline at end of file
diff --git a/README.md b/README.md
index 9fc17306..0cd3f84a 100644
--- a/README.md
+++ b/README.md
@@ -30,11 +30,8 @@ To run the tests you have to pre-install `Docker` and `npx`.
## Contributing
-Contributions are welcome! Please:
-
-1. Fork the repository
-2. Create a feature branch
-3. Submit a Pull Request
+Contributions are welcome!
+Please follow the [Contributing Guidelines](CONTRIBUTING.md).
## Team
diff --git a/SECURITY.md b/SECURITY.md
new file mode 100644
index 00000000..74e9880f
--- /dev/null
+++ b/SECURITY.md
@@ -0,0 +1,21 @@
+# Security Policy
+
+Thank you for helping us keep the SDKs and systems they interact with secure.
+
+## Reporting Security Issues
+
+This SDK is maintained by [Anthropic](https://www.anthropic.com/) as part of the Model
+Context Protocol project.
+
+The security of our systems and user data is Anthropic’s top priority. We appreciate the
+work of security researchers acting in good faith in identifying and reporting potential
+vulnerabilities.
+
+Our security program is managed on HackerOne and we ask that any validated vulnerability
+in this functionality be reported through their
+[submission form](https://hackerone.com/anthropic-vdp/reports/new?type=team&report_type=vulnerability).
+
+## Vulnerability Disclosure Program
+
+Our Vulnerability Program Guidelines are defined on our
+[HackerOne program page](https://hackerone.com/anthropic-vdp).
\ No newline at end of file
From 07e7b8fd6bac47be4527f97451f8cdd95ed31a38 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?=
Date: Wed, 14 May 2025 18:00:06 +0200
Subject: [PATCH 5/7] Add note about force pushes
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Dariusz Jędrzejczyk
---
CONTRIBUTING.md | 3 +++
1 file changed, 3 insertions(+)
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index a949dcc0..517f3255 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -71,6 +71,9 @@ git checkout -b feature/your-feature-name
4. Submit a pull request to the main repository.
5. Follow the pull request template.
6. Wait for review.
+7. For any follow-up work, please add new commits instead of force-pushing. This will
+ allow the reviewer to focus on incremental changes instead of having to restart the
+ review process.
## Code of Conduct
From 8a5a591d39256ba3947003ec4477e1722363eb35 Mon Sep 17 00:00:00 2001
From: Luca Chang
Date: Tue, 27 May 2025 15:26:44 -0700
Subject: [PATCH 6/7] feat: Add elicitation support to MCP protocol
Implement elicitation capabilities allowing servers to request additional information
from users through clients during interactions. This feature provides a standardized
way for servers to gather necessary information dynamically while clients maintain
control over user interactions and data sharing.
- Add ElicitRequest and ElicitResult classes to McpSchema
- Implement elicitation handlers in client classes
- Add elicitation capabilities to server exchange classes
- Add tests for elicitation functionality with various scenarios
---
.../WebFluxSseIntegrationTests.java | 224 +++++++++++++++++-
.../server/WebMvcSseIntegrationTests.java | 213 +++++++++++++++++
.../client/McpAsyncClient.java | 32 +++
.../client/McpClient.java | 40 +++-
.../client/McpClientFeatures.java | 31 ++-
.../server/McpAsyncServerExchange.java | 28 +++
.../server/McpSyncServerExchange.java | 18 ++
.../modelcontextprotocol/spec/McpSchema.java | 129 ++++++++--
.../client/AbstractMcpAsyncClientTests.java | 22 +-
.../McpAsyncClientResponseHandlerTests.java | 150 ++++++++++++
...rverTransportProviderIntegrationTests.java | 213 +++++++++++++++++
.../spec/McpSchemaTests.java | 34 +++
12 files changed, 1106 insertions(+), 28 deletions(-)
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
index 03fbc996..2f85654e 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
@@ -4,7 +4,6 @@
package io.modelcontextprotocol;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,11 +27,11 @@
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.*;
-import io.modelcontextprotocol.spec.McpSchema.ServerCapabilities.CompletionCapabilities;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
@@ -41,6 +40,7 @@
import org.springframework.web.client.RestClient;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.RouterFunctions;
+import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -331,6 +331,226 @@ void testCreateMessageWithRequestTimeoutFail(String clientType) throws Interrupt
mcpServer.closeGracefully().block();
}
+ // ---------------------------------------
+ // Elicitation Tests
+ // ---------------------------------------
+ @ParameterizedTest(name = "{0} : {displayName} ")
+ @ValueSource(strings = { "httpclient", "webflux" })
+ void testCreateElicitationWithoutElicitationCapabilities(String clientType) {
+
+ var clientBuilder = clientBuilders.get(clientType);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ exchange.createElicitation(mock(ElicitRequest.class)).block();
+
+ return Mono.just(mock(CallToolResult.class));
+ });
+
+ var server = McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build();
+
+ try (
+ // Create client without elicitation capabilities
+ var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")).build()) {
+
+ assertThat(client.initialize()).isNotNull();
+
+ try {
+ client.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+ }
+ catch (McpError e) {
+ assertThat(e).isInstanceOf(McpError.class)
+ .hasMessage("Client must be configured with elicitation capabilities");
+ }
+ }
+ server.closeGracefully().block();
+ }
+
+ @ParameterizedTest(name = "{0} : {displayName} ")
+ @ValueSource(strings = { "httpclient", "webflux" })
+ void testCreateElicitationSuccess(String clientType) {
+
+ var clientBuilder = clientBuilders.get(clientType);
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+
+ return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
+ };
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .tools(tool)
+ .build();
+
+ try (var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build()) {
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+
+ assertThat(response).isNotNull();
+ assertThat(response).isEqualTo(callResponse);
+ }
+ mcpServer.closeGracefully().block();
+ }
+
+ @ParameterizedTest(name = "{0} : {displayName} ")
+ @ValueSource(strings = { "httpclient", "webflux" })
+ void testCreateElicitationWithRequestTimeoutSuccess(String clientType) {
+
+ // Client
+ var clientBuilder = clientBuilders.get(clientType);
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
+ };
+
+ var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build();
+
+ // Server
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .requestTimeout(Duration.ofSeconds(3))
+ .tools(tool)
+ .build();
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+
+ assertThat(response).isNotNull();
+ assertThat(response).isEqualTo(callResponse);
+
+ mcpClient.closeGracefully();
+ mcpServer.closeGracefully().block();
+ }
+
+ @ParameterizedTest(name = "{0} : {displayName} ")
+ @ValueSource(strings = { "httpclient", "webflux" })
+ void testCreateElicitationWithRequestTimeoutFail(String clientType) {
+
+ // Client
+ var clientBuilder = clientBuilders.get(clientType);
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
+ };
+
+ var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build();
+
+ // Server
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .requestTimeout(Duration.ofSeconds(1))
+ .tools(tool)
+ .build();
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ assertThatExceptionOfType(McpError.class).isThrownBy(() -> {
+ mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+ }).withMessageContaining("within 1000ms");
+
+ mcpClient.closeGracefully();
+ mcpServer.closeGracefully().block();
+ }
+
// ---------------------------------------
// Roots Tests
// ---------------------------------------
diff --git a/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java b/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java
index b12d6843..3f3f7be6 100644
--- a/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java
+++ b/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java
@@ -357,6 +357,219 @@ void testCreateMessageWithRequestTimeoutFail() throws InterruptedException {
mcpServer.close();
}
+ // ---------------------------------------
+ // Elicitation Tests
+ // ---------------------------------------
+ @Test
+ void testCreateElicitationWithoutElicitationCapabilities() {
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ exchange.createElicitation(mock(McpSchema.ElicitRequest.class)).block();
+
+ return Mono.just(mock(CallToolResult.class));
+ });
+
+ var server = McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build();
+
+ try (
+ // Create client without elicitation capabilities
+ var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")).build()) {
+
+ assertThat(client.initialize()).isNotNull();
+
+ try {
+ client.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+ }
+ catch (McpError e) {
+ assertThat(e).isInstanceOf(McpError.class)
+ .hasMessage("Client must be configured with elicitation capabilities");
+ }
+ }
+ server.closeGracefully().block();
+ }
+
+ @Test
+ void testCreateElicitationSuccess() {
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+
+ return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT,
+ Map.of("message", request.message()));
+ };
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = McpSchema.ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .tools(tool)
+ .build();
+
+ try (var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build()) {
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+
+ assertThat(response).isNotNull();
+ assertThat(response).isEqualTo(callResponse);
+ }
+ mcpServer.closeGracefully().block();
+ }
+
+ @Test
+ void testCreateElicitationWithRequestTimeoutSuccess() {
+
+ // Client
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT,
+ Map.of("message", request.message()));
+ };
+
+ var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build();
+
+ // Server
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = McpSchema.ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .requestTimeout(Duration.ofSeconds(3))
+ .tools(tool)
+ .build();
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+
+ assertThat(response).isNotNull();
+ assertThat(response).isEqualTo(callResponse);
+
+ mcpClient.closeGracefully();
+ mcpServer.closeGracefully().block();
+ }
+
+ @Test
+ void testCreateElicitationWithRequestTimeoutFail() {
+
+ // Client
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT,
+ Map.of("message", request.message()));
+ };
+
+ var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build();
+
+ // Server
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = McpSchema.ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .requestTimeout(Duration.ofSeconds(1))
+ .tools(tool)
+ .build();
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ assertThatExceptionOfType(McpError.class).isThrownBy(() -> {
+ mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+ }).withMessageContaining("Timeout");
+
+ mcpClient.closeGracefully();
+ mcpServer.closeGracefully().block();
+ }
+
// ---------------------------------------
// Roots Tests
// ---------------------------------------
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
index e3a997ba..a22ef6b5 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
@@ -23,6 +23,8 @@
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult;
+import io.modelcontextprotocol.spec.McpSchema.ElicitRequest;
+import io.modelcontextprotocol.spec.McpSchema.ElicitResult;
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
import io.modelcontextprotocol.spec.McpSchema.GetPromptResult;
import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult;
@@ -141,6 +143,15 @@ public class McpAsyncClient {
*/
private Function> samplingHandler;
+ /**
+ * MCP provides a standardized way for servers to request additional information from
+ * users through the client during interactions. This flow allows clients to maintain
+ * control over user interactions and data sharing while enabling servers to gather
+ * necessary information dynamically. Servers can request structured data from users
+ * with optional JSON schemas to validate responses.
+ */
+ private Function> elicitationHandler;
+
/**
* Client transport implementation.
*/
@@ -189,6 +200,15 @@ public class McpAsyncClient {
requestHandlers.put(McpSchema.METHOD_SAMPLING_CREATE_MESSAGE, samplingCreateMessageHandler());
}
+ // Elicitation Handler
+ if (this.clientCapabilities.elicitation() != null) {
+ if (features.elicitationHandler() == null) {
+ throw new McpError("Elicitation handler must not be null when client capabilities include elicitation");
+ }
+ this.elicitationHandler = features.elicitationHandler();
+ requestHandlers.put(McpSchema.METHOD_ELICITATION_CREATE, elicitationCreateHandler());
+ }
+
// Notification Handlers
Map notificationHandlers = new HashMap<>();
@@ -500,6 +520,18 @@ private RequestHandler samplingCreateMessageHandler() {
};
}
+ // --------------------------
+ // Elicitation
+ // --------------------------
+ private RequestHandler elicitationCreateHandler() {
+ return params -> {
+ ElicitRequest request = transport.unmarshalFrom(params, new TypeReference<>() {
+ });
+
+ return this.elicitationHandler.apply(request);
+ };
+ }
+
// --------------------------
// Tools
// --------------------------
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java
index a1dc1168..280906cf 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java
@@ -18,6 +18,8 @@
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult;
+import io.modelcontextprotocol.spec.McpSchema.ElicitRequest;
+import io.modelcontextprotocol.spec.McpSchema.ElicitResult;
import io.modelcontextprotocol.spec.McpSchema.Implementation;
import io.modelcontextprotocol.spec.McpSchema.Root;
import io.modelcontextprotocol.util.Assert;
@@ -175,6 +177,8 @@ class SyncSpec {
private Function samplingHandler;
+ private Function elicitationHandler;
+
private SyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
@@ -283,6 +287,21 @@ public SyncSpec sampling(Function sam
return this;
}
+ /**
+ * Sets a custom elicitation handler for processing elicitation message requests.
+ * The elicitation handler can modify or validate messages before they are sent to
+ * the server, enabling custom processing logic.
+ * @param elicitationHandler A function that processes elicitation requests and
+ * returns results. Must not be null.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if elicitationHandler is null
+ */
+ public SyncSpec elicitation(Function elicitationHandler) {
+ Assert.notNull(elicitationHandler, "Elicitation handler must not be null");
+ this.elicitationHandler = elicitationHandler;
+ return this;
+ }
+
/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
@@ -364,7 +383,7 @@ public SyncSpec loggingConsumers(List> samplingHandler;
+ private Function> elicitationHandler;
+
private AsyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
@@ -522,6 +543,21 @@ public AsyncSpec sampling(Function> elicitationHandler) {
+ Assert.notNull(elicitationHandler, "Elicitation handler must not be null");
+ this.elicitationHandler = elicitationHandler;
+ return this;
+ }
+
/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
@@ -606,7 +642,7 @@ public McpAsyncClient build() {
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
- this.loggingConsumers, this.samplingHandler));
+ this.loggingConsumers, this.samplingHandler, this.elicitationHandler));
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
index 284b93f8..23d7c6a6 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
@@ -60,13 +60,15 @@ class McpClientFeatures {
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
* @param samplingHandler the sampling handler.
+ * @param elicitationHandler the elicitation handler.
*/
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List, Mono>> toolsChangeConsumers,
List, Mono>> resourcesChangeConsumers,
List, Mono>> promptsChangeConsumers,
List>> loggingConsumers,
- Function> samplingHandler) {
+ Function> samplingHandler,
+ Function> elicitationHandler) {
/**
* Create an instance and validate the arguments.
@@ -77,6 +79,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
* @param samplingHandler the sampling handler.
+ * @param elicitationHandler the elicitation handler.
*/
public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots,
@@ -84,14 +87,16 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
List, Mono>> resourcesChangeConsumers,
List, Mono>> promptsChangeConsumers,
List>> loggingConsumers,
- Function> samplingHandler) {
+ Function> samplingHandler,
+ Function> elicitationHandler) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
this.clientCapabilities = (clientCapabilities != null) ? clientCapabilities
: new McpSchema.ClientCapabilities(null,
!Utils.isEmpty(roots) ? new McpSchema.ClientCapabilities.RootCapabilities(false) : null,
- samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null);
+ samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null,
+ elicitationHandler != null ? new McpSchema.ClientCapabilities.Elicitation() : null);
this.roots = roots != null ? new ConcurrentHashMap<>(roots) : new ConcurrentHashMap<>();
this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
@@ -99,6 +104,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
this.samplingHandler = samplingHandler;
+ this.elicitationHandler = elicitationHandler;
}
/**
@@ -138,9 +144,14 @@ public static Async fromSync(Sync syncSpec) {
Function> samplingHandler = r -> Mono
.fromCallable(() -> syncSpec.samplingHandler().apply(r))
.subscribeOn(Schedulers.boundedElastic());
+
+ Function> elicitationHandler = r -> Mono
+ .fromCallable(() -> syncSpec.elicitationHandler().apply(r))
+ .subscribeOn(Schedulers.boundedElastic());
+
return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
toolsChangeConsumers, resourcesChangeConsumers, promptsChangeConsumers, loggingConsumers,
- samplingHandler);
+ samplingHandler, elicitationHandler);
}
}
@@ -156,13 +167,15 @@ public static Async fromSync(Sync syncSpec) {
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
* @param samplingHandler the sampling handler.
+ * @param elicitationHandler the elicitation handler.
*/
public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List>> toolsChangeConsumers,
List>> resourcesChangeConsumers,
List>> promptsChangeConsumers,
List> loggingConsumers,
- Function samplingHandler) {
+ Function samplingHandler,
+ Function elicitationHandler) {
/**
* Create an instance and validate the arguments.
@@ -174,20 +187,23 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
* @param samplingHandler the sampling handler.
+ * @param elicitationHandler the elicitation handler.
*/
public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List>> toolsChangeConsumers,
List>> resourcesChangeConsumers,
List>> promptsChangeConsumers,
List> loggingConsumers,
- Function samplingHandler) {
+ Function samplingHandler,
+ Function elicitationHandler) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
this.clientCapabilities = (clientCapabilities != null) ? clientCapabilities
: new McpSchema.ClientCapabilities(null,
!Utils.isEmpty(roots) ? new McpSchema.ClientCapabilities.RootCapabilities(false) : null,
- samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null);
+ samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null,
+ elicitationHandler != null ? new McpSchema.ClientCapabilities.Elicitation() : null);
this.roots = roots != null ? new HashMap<>(roots) : new HashMap<>();
this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
@@ -195,6 +211,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
this.samplingHandler = samplingHandler;
+ this.elicitationHandler = elicitationHandler;
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java
index 889dc66d..cfb07d26 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java
@@ -36,6 +36,9 @@ public class McpAsyncServerExchange {
private static final TypeReference LIST_ROOTS_RESULT_TYPE_REF = new TypeReference<>() {
};
+ private static final TypeReference ELICITATION_RESULT_TYPE_REF = new TypeReference<>() {
+ };
+
/**
* Create a new asynchronous exchange with the client.
* @param session The server session representing a 1-1 interaction.
@@ -93,6 +96,31 @@ public Mono createMessage(McpSchema.CreateMessage
CREATE_MESSAGE_RESULT_TYPE_REF);
}
+ /**
+ * Creates a new elicitation. MCP provides a standardized way for servers to request
+ * additional information from users through the client during interactions. This flow
+ * allows clients to maintain control over user interactions and data sharing while
+ * enabling servers to gather necessary information dynamically. Servers can request
+ * structured data from users with optional JSON schemas to validate responses.
+ * @param elicitRequest The request to create a new elicitation
+ * @return A Mono that completes when the elicitation has been resolved.
+ * @see McpSchema.ElicitRequest
+ * @see McpSchema.ElicitResult
+ * @see Elicitation
+ * Specification
+ */
+ public Mono createElicitation(McpSchema.ElicitRequest elicitRequest) {
+ if (this.clientCapabilities == null) {
+ return Mono.error(new McpError("Client must be initialized. Call the initialize method first!"));
+ }
+ if (this.clientCapabilities.elicitation() == null) {
+ return Mono.error(new McpError("Client must be configured with elicitation capabilities"));
+ }
+ return this.session.sendRequest(McpSchema.METHOD_ELICITATION_CREATE, elicitRequest,
+ ELICITATION_RESULT_TYPE_REF);
+ }
+
/**
* Retrieves the list of all roots provided by the client.
* @return A Mono that emits the list of roots result.
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java
index 52360e54..084412b9 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java
@@ -64,6 +64,24 @@ public McpSchema.CreateMessageResult createMessage(McpSchema.CreateMessageReques
return this.exchange.createMessage(createMessageRequest).block();
}
+ /**
+ * Creates a new elicitation. MCP provides a standardized way for servers to request
+ * additional information from users through the client during interactions. This flow
+ * allows clients to maintain control over user interactions and data sharing while
+ * enabling servers to gather necessary information dynamically. Servers can request
+ * structured data from users with optional JSON schemas to validate responses.
+ * @param elicitRequest The request to create a new elicitation
+ * @return A result containing the elicitation response.
+ * @see McpSchema.ElicitRequest
+ * @see McpSchema.ElicitResult
+ * @see Elicitation
+ * Specification
+ */
+ public McpSchema.ElicitResult createElicitation(McpSchema.ElicitRequest elicitRequest) {
+ return this.exchange.createElicitation(elicitRequest).block();
+ }
+
/**
* Retrieves the list of all roots provided by the client.
* @return The list of roots result.
diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java
index 8df8a158..9dae0826 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java
@@ -94,6 +94,9 @@ private McpSchema() {
// Sampling Methods
public static final String METHOD_SAMPLING_CREATE_MESSAGE = "sampling/createMessage";
+ // Elicitation Methods
+ public static final String METHOD_ELICITATION_CREATE = "elicitation/create";
+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
// ---------------------------
@@ -131,8 +134,8 @@ public static final class ErrorCodes {
}
- public sealed interface Request
- permits InitializeRequest, CallToolRequest, CreateMessageRequest, CompleteRequest, GetPromptRequest {
+ public sealed interface Request permits InitializeRequest, CallToolRequest, CreateMessageRequest, ElicitRequest,
+ CompleteRequest, GetPromptRequest {
}
@@ -221,7 +224,7 @@ public record JSONRPCError(
public record InitializeRequest( // @formatter:off
@JsonProperty("protocolVersion") String protocolVersion,
@JsonProperty("capabilities") ClientCapabilities capabilities,
- @JsonProperty("clientInfo") Implementation clientInfo) implements Request {
+ @JsonProperty("clientInfo") Implementation clientInfo) implements Request {
} // @formatter:on
@JsonInclude(JsonInclude.Include.NON_ABSENT)
@@ -245,6 +248,8 @@ public record InitializeResult( // @formatter:off
* access to.
* @param sampling Provides a standardized way for servers to request LLM sampling
* (“completions” or “generations”) from language models via clients.
+ * @param elicitation Provides a standardized way for servers to request additional
+ * information from users through the client during interactions.
*
*/
@JsonInclude(JsonInclude.Include.NON_ABSENT)
@@ -252,7 +257,8 @@ public record InitializeResult( // @formatter:off
public record ClientCapabilities( // @formatter:off
@JsonProperty("experimental") Map experimental,
@JsonProperty("roots") RootCapabilities roots,
- @JsonProperty("sampling") Sampling sampling) {
+ @JsonProperty("sampling") Sampling sampling,
+ @JsonProperty("elicitation") Elicitation elicitation) {
/**
* Roots define the boundaries of where servers can operate within the filesystem,
@@ -264,7 +270,7 @@ public record ClientCapabilities( // @formatter:off
* has changed since the last time the server checked.
*/
@JsonInclude(JsonInclude.Include.NON_ABSENT)
- @JsonIgnoreProperties(ignoreUnknown = true)
+ @JsonIgnoreProperties(ignoreUnknown = true)
public record RootCapabilities(
@JsonProperty("listChanged") Boolean listChanged) {
}
@@ -279,10 +285,22 @@ public record RootCapabilities(
* image-based interactions and optionally include context
* from MCP servers in their prompts.
*/
- @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
public record Sampling() {
}
+ /**
+ * Provides a standardized way for servers to request additional
+ * information from users through the client during interactions.
+ * This flow allows clients to maintain control over user
+ * interactions and data sharing while enabling servers to gather
+ * necessary information dynamically. Servers can request structured
+ * data from users with optional JSON schemas to validate responses.
+ */
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ public record Elicitation() {
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -291,6 +309,7 @@ public static class Builder {
private Map experimental;
private RootCapabilities roots;
private Sampling sampling;
+ private Elicitation elicitation;
public Builder experimental(Map experimental) {
this.experimental = experimental;
@@ -307,8 +326,13 @@ public Builder sampling() {
return this;
}
+ public Builder elicitation() {
+ this.elicitation = new Elicitation();
+ return this;
+ }
+
public ClientCapabilities build() {
- return new ClientCapabilities(experimental, roots, sampling);
+ return new ClientCapabilities(experimental, roots, sampling, elicitation);
}
}
}// @formatter:on
@@ -326,11 +350,11 @@ public record ServerCapabilities( // @formatter:off
@JsonInclude(JsonInclude.Include.NON_ABSENT)
public record CompletionCapabilities() {
}
-
+
@JsonInclude(JsonInclude.Include.NON_ABSENT)
public record LoggingCapabilities() {
}
-
+
@JsonInclude(JsonInclude.Include.NON_ABSENT)
public record PromptCapabilities(
@JsonProperty("listChanged") Boolean listChanged) {
@@ -727,11 +751,11 @@ public record Tool( // @formatter:off
@JsonProperty("name") String name,
@JsonProperty("description") String description,
@JsonProperty("inputSchema") JsonSchema inputSchema) {
-
+
public Tool(String name, String description, String schema) {
this(name, description, parseSchema(schema));
}
-
+
} // @formatter:on
private static JsonSchema parseSchema(String schema) {
@@ -758,7 +782,7 @@ public record CallToolRequest(// @formatter:off
@JsonProperty("arguments") Map arguments) implements Request {
public CallToolRequest(String name, String jsonArguments) {
- this(name, parseJsonArguments(jsonArguments));
+ this(name, parseJsonArguments(jsonArguments));
}
private static Map parseJsonArguments(String jsonArguments) {
@@ -893,7 +917,7 @@ public record ModelPreferences(// @formatter:off
@JsonProperty("costPriority") Double costPriority,
@JsonProperty("speedPriority") Double speedPriority,
@JsonProperty("intelligencePriority") Double intelligencePriority) {
-
+
public static Builder builder() {
return new Builder();
}
@@ -963,7 +987,7 @@ public record CreateMessageRequest(// @formatter:off
@JsonProperty("includeContext") ContextInclusionStrategy includeContext,
@JsonProperty("temperature") Double temperature,
@JsonProperty("maxTokens") int maxTokens,
- @JsonProperty("stopSequences") List stopSequences,
+ @JsonProperty("stopSequences") List stopSequences,
@JsonProperty("metadata") Map metadata) implements Request {
public enum ContextInclusionStrategy {
@@ -971,7 +995,7 @@ public enum ContextInclusionStrategy {
@JsonProperty("thisServer") THIS_SERVER,
@JsonProperty("allServers") ALL_SERVERS
}
-
+
public static Builder builder() {
return new Builder();
}
@@ -1040,7 +1064,7 @@ public record CreateMessageResult(// @formatter:off
@JsonProperty("content") Content content,
@JsonProperty("model") String model,
@JsonProperty("stopReason") StopReason stopReason) {
-
+
public enum StopReason {
@JsonProperty("endTurn") END_TURN,
@JsonProperty("stopSequence") STOP_SEQUENCE,
@@ -1088,6 +1112,79 @@ public CreateMessageResult build() {
}
}// @formatter:on
+ // Elicitation
+ /**
+ * Used by the server to send an elicitation to the client.
+ *
+ * @param message The body of the elicitation message.
+ * @param requestedSchema The elicitation response schema that must be satisfied.
+ */
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public record ElicitRequest(// @formatter:off
+ @JsonProperty("message") String message,
+ @JsonProperty("requestedSchema") Map requestedSchema) implements Request {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String message;
+ private Map requestedSchema;
+
+ public Builder message(String message) {
+ this.message = message;
+ return this;
+ }
+
+ public Builder requestedSchema(Map requestedSchema) {
+ this.requestedSchema = requestedSchema;
+ return this;
+ }
+
+ public ElicitRequest build() {
+ return new ElicitRequest(message, requestedSchema);
+ }
+ }
+ }// @formatter:on
+
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public record ElicitResult(// @formatter:off
+ @JsonProperty("action") Action action,
+ @JsonProperty("content") Map content) {
+
+ public enum Action {
+ @JsonProperty("accept") ACCEPT,
+ @JsonProperty("decline") DECLINE,
+ @JsonProperty("cancel") CANCEL
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Action action;
+ private Map content;
+
+ public Builder message(Action action) {
+ this.action = action;
+ return this;
+ }
+
+ public Builder content(Map content) {
+ this.content = content;
+ return this;
+ }
+
+ public ElicitResult build() {
+ return new ElicitResult(action, content);
+ }
+ }
+ }// @formatter:on
+
// ---------------------------
// Pagination Interfaces
// ---------------------------
diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java
index 72b409af..d1a2581e 100644
--- a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java
+++ b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java
@@ -19,6 +19,8 @@
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult;
+import io.modelcontextprotocol.spec.McpSchema.ElicitRequest;
+import io.modelcontextprotocol.spec.McpSchema.ElicitResult;
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
import io.modelcontextprotocol.spec.McpSchema.Prompt;
import io.modelcontextprotocol.spec.McpSchema.Resource;
@@ -422,6 +424,20 @@ void testInitializeWithSamplingCapability() {
});
}
+ @Test
+ void testInitializeWithElicitationCapability() {
+ ClientCapabilities capabilities = ClientCapabilities.builder().elicitation().build();
+ ElicitResult elicitResult = ElicitResult.builder()
+ .message(ElicitResult.Action.ACCEPT)
+ .content(Map.of("foo", "bar"))
+ .build();
+ withClient(createMcpTransport(),
+ builder -> builder.capabilities(capabilities).elicitation(request -> Mono.just(elicitResult)),
+ client -> {
+ StepVerifier.create(client.initialize()).expectNextMatches(Objects::nonNull).verifyComplete();
+ });
+ }
+
@Test
void testInitializeWithAllCapabilities() {
var capabilities = ClientCapabilities.builder()
@@ -433,7 +449,11 @@ void testInitializeWithAllCapabilities() {
Function> samplingHandler = request -> Mono
.just(CreateMessageResult.builder().message("test").model("test-model").build());
- withClient(createMcpTransport(), builder -> builder.capabilities(capabilities).sampling(samplingHandler),
+ Function> elicitationHandler = request -> Mono
+ .just(ElicitResult.builder().message(ElicitResult.Action.ACCEPT).content(Map.of("foo", "bar")).build());
+
+ withClient(createMcpTransport(),
+ builder -> builder.capabilities(capabilities).sampling(samplingHandler).elicitation(elicitationHandler),
client ->
StepVerifier.create(client.initialize()).assertNext(result -> {
diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java
index 4510b152..e6cde8e3 100644
--- a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java
+++ b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java
@@ -19,6 +19,8 @@
import io.modelcontextprotocol.spec.McpSchema.InitializeResult;
import io.modelcontextprotocol.spec.McpSchema.Root;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import reactor.core.publisher.Mono;
import static io.modelcontextprotocol.spec.McpSchema.METHOD_INITIALIZE;
@@ -349,4 +351,152 @@ void testSamplingCreateMessageRequestHandlingWithNullHandler() {
.hasMessage("Sampling handler must not be null when client capabilities include sampling");
}
+ @Test
+ @SuppressWarnings("unchecked")
+ void testElicitationCreateRequestHandling() {
+ MockMcpClientTransport transport = initializationEnabledTransport();
+
+ // Create a test elicitation handler that echoes back the input
+ Function> elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isInstanceOf(Map.class);
+ assertThat(request.requestedSchema().get("type")).isEqualTo("object");
+
+ var properties = request.requestedSchema().get("properties");
+ assertThat(properties).isNotNull();
+ assertThat(((Map) properties).get("message")).isInstanceOf(Map.class);
+
+ return Mono.just(McpSchema.ElicitResult.builder()
+ .message(McpSchema.ElicitResult.Action.ACCEPT)
+ .content(Map.of("message", request.message()))
+ .build());
+ };
+
+ // Create client with elicitation capability and handler
+ McpAsyncClient asyncMcpClient = McpClient.async(transport)
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build();
+
+ assertThat(asyncMcpClient.initialize().block()).isNotNull();
+
+ // Create a mock elicitation
+ var elicitRequest = McpSchema.ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ // Simulate incoming request
+ McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
+ McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
+ transport.simulateIncomingMessage(request);
+
+ // Verify response
+ McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
+ assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
+
+ McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
+ assertThat(response.id()).isEqualTo("test-id");
+ assertThat(response.error()).isNull();
+
+ McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeReference<>() {
+ });
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT);
+ assertThat(result.content()).isEqualTo(Map.of("message", "Test message"));
+
+ asyncMcpClient.closeGracefully();
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = McpSchema.ElicitResult.Action.class, names = { "DECLINE", "CANCEL" })
+ void testElicitationFailRequestHandling(McpSchema.ElicitResult.Action action) {
+ MockMcpClientTransport transport = initializationEnabledTransport();
+
+ // Create a test elicitation handler to decline the request
+ Function> elicitationHandler = request -> Mono
+ .just(McpSchema.ElicitResult.builder().message(action).build());
+
+ // Create client with elicitation capability and handler
+ McpAsyncClient asyncMcpClient = McpClient.async(transport)
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build();
+
+ assertThat(asyncMcpClient.initialize().block()).isNotNull();
+
+ // Create a mock elicitation
+ var elicitRequest = McpSchema.ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ // Simulate incoming request
+ McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
+ McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
+ transport.simulateIncomingMessage(request);
+
+ // Verify response
+ McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
+ assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
+
+ McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
+ assertThat(response.id()).isEqualTo("test-id");
+ assertThat(response.error()).isNull();
+
+ McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeReference<>() {
+ });
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(action);
+ assertThat(result.content()).isNull();
+
+ asyncMcpClient.closeGracefully();
+ }
+
+ @Test
+ void testElicitationCreateRequestHandlingWithoutCapability() {
+ MockMcpClientTransport transport = initializationEnabledTransport();
+
+ // Create client without elicitation capability
+ McpAsyncClient asyncMcpClient = McpClient.async(transport)
+ .capabilities(ClientCapabilities.builder().build()) // No elicitation
+ // capability
+ .build();
+
+ assertThat(asyncMcpClient.initialize().block()).isNotNull();
+
+ // Create a mock elicitation
+ var elicitRequest = new McpSchema.ElicitRequest("test",
+ Map.of("type", "object", "properties", Map.of("test", Map.of("type", "boolean", "defaultValue", true,
+ "description", "test-description", "title", "test-title"))));
+
+ // Simulate incoming request
+ McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
+ McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
+ transport.simulateIncomingMessage(request);
+
+ // Verify error response
+ McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
+ assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
+
+ McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
+ assertThat(response.id()).isEqualTo("test-id");
+ assertThat(response.result()).isNull();
+ assertThat(response.error()).isNotNull();
+ assertThat(response.error().message()).contains("Method not found: elicitation/create");
+
+ asyncMcpClient.closeGracefully();
+ }
+
+ @Test
+ void testElicitationCreateRequestHandlingWithNullHandler() {
+ MockMcpClientTransport transport = new MockMcpClientTransport();
+
+ // Create client with elicitation capability but null handler
+ assertThatThrownBy(() -> McpClient.async(transport)
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .build()).isInstanceOf(McpError.class)
+ .hasMessage("Elicitation handler must not be null when client capabilities include elicitation");
+ }
+
}
diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java
index 2ff6325a..dc9d1cfa 100644
--- a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java
+++ b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java
@@ -24,6 +24,8 @@
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult;
+import io.modelcontextprotocol.spec.McpSchema.ElicitRequest;
+import io.modelcontextprotocol.spec.McpSchema.ElicitResult;
import io.modelcontextprotocol.spec.McpSchema.InitializeResult;
import io.modelcontextprotocol.spec.McpSchema.ModelPreferences;
import io.modelcontextprotocol.spec.McpSchema.Role;
@@ -339,6 +341,217 @@ void testCreateMessageWithRequestTimeoutFail() throws InterruptedException {
mcpServer.close();
}
+ // ---------------------------------------
+ // Elicitation Tests
+ // ---------------------------------------
+ @Test
+ @Disabled
+ void testCreateElicitationWithoutElicitationCapabilities() {
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ exchange.createElicitation(mock(ElicitRequest.class)).block();
+
+ return Mono.just(mock(CallToolResult.class));
+ });
+
+ var server = McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build();
+
+ try (
+ // Create client without elicitation capabilities
+ var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")).build()) {
+
+ assertThat(client.initialize()).isNotNull();
+
+ try {
+ client.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+ }
+ catch (McpError e) {
+ assertThat(e).isInstanceOf(McpError.class)
+ .hasMessage("Client must be configured with elicitation capabilities");
+ }
+ }
+ server.closeGracefully().block();
+ }
+
+ @Test
+ void testCreateElicitationSuccess() {
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+
+ return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
+ };
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .tools(tool)
+ .build();
+
+ try (var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build()) {
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+
+ assertThat(response).isNotNull();
+ assertThat(response).isEqualTo(callResponse);
+ }
+ mcpServer.closeGracefully().block();
+ }
+
+ @Test
+ void testCreateElicitationWithRequestTimeoutSuccess() {
+
+ // Client
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
+ };
+
+ var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build();
+
+ // Server
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .requestTimeout(Duration.ofSeconds(3))
+ .tools(tool)
+ .build();
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+
+ assertThat(response).isNotNull();
+ assertThat(response).isEqualTo(callResponse);
+
+ mcpClient.closeGracefully();
+ mcpServer.closeGracefully().block();
+ }
+
+ @Test
+ void testCreateElicitationWithRequestTimeoutFail() {
+
+ // Client
+
+ Function elicitationHandler = request -> {
+ assertThat(request.message()).isNotEmpty();
+ assertThat(request.requestedSchema()).isNotNull();
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
+ };
+
+ var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
+ .capabilities(ClientCapabilities.builder().elicitation().build())
+ .elicitation(elicitationHandler)
+ .build();
+
+ // Server
+
+ CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
+ null);
+
+ McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
+ new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
+
+ var elicitationRequest = ElicitRequest.builder()
+ .message("Test message")
+ .requestedSchema(
+ Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
+ .build();
+
+ StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
+ assertThat(result).isNotNull();
+ assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
+ assertThat(result.content().get("message")).isEqualTo("Test message");
+ }).verifyComplete();
+
+ return Mono.just(callResponse);
+ });
+
+ var mcpServer = McpServer.async(mcpServerTransportProvider)
+ .serverInfo("test-server", "1.0.0")
+ .requestTimeout(Duration.ofSeconds(1))
+ .tools(tool)
+ .build();
+
+ InitializeResult initResult = mcpClient.initialize();
+ assertThat(initResult).isNotNull();
+
+ assertThatExceptionOfType(McpError.class).isThrownBy(() -> {
+ mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
+ }).withMessageContaining("Timeout");
+
+ mcpClient.closeGracefully();
+ mcpServer.closeGracefully().block();
+ }
+
// ---------------------------------------
// Roots Tests
// ---------------------------------------
diff --git a/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java b/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java
index ff78c1bf..99015d8c 100644
--- a/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java
+++ b/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java
@@ -807,6 +807,40 @@ void testCreateMessageResult() throws Exception {
{"role":"assistant","content":{"type":"text","text":"Assistant response"},"model":"gpt-4","stopReason":"endTurn"}"""));
}
+ // Elicitation Tests
+
+ @Test
+ void testCreateElicitationRequest() throws Exception {
+ McpSchema.ElicitRequest request = McpSchema.ElicitRequest.builder()
+ .requestedSchema(Map.of("type", "object", "required", List.of("a"), "properties",
+ Map.of("foo", Map.of("type", "string"))))
+ .build();
+
+ String value = mapper.writeValueAsString(request);
+
+ assertThatJson(value).when(Option.IGNORING_ARRAY_ORDER)
+ .when(Option.IGNORING_EXTRA_ARRAY_ITEMS)
+ .isObject()
+ .isEqualTo(json("""
+ {"requestedSchema":{"properties":{"foo":{"type":"string"}},"required":["a"],"type":"object"}}"""));
+ }
+
+ @Test
+ void testCreateElicitationResult() throws Exception {
+ McpSchema.ElicitResult result = McpSchema.ElicitResult.builder()
+ .content(Map.of("foo", "bar"))
+ .message(McpSchema.ElicitResult.Action.ACCEPT)
+ .build();
+
+ String value = mapper.writeValueAsString(result);
+
+ assertThatJson(value).when(Option.IGNORING_ARRAY_ORDER)
+ .when(Option.IGNORING_EXTRA_ARRAY_ITEMS)
+ .isObject()
+ .isEqualTo(json("""
+ {"action":"accept","content":{"foo":"bar"}}"""));
+ }
+
// Roots Tests
@Test
From 2f944349cc77009d020ebddc8b9967b8e1b14baf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?=
Date: Tue, 10 Jun 2025 18:33:40 +0200
Subject: [PATCH 7/7] feat: WebClient Streamable HTTP support (#292)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
An implementation of Streamable HTTP Client with WebFlux WebClient.
Aside from implementing the specification, several improvements have been incorporated throughout the client-side of the architecture.
The changes cover:
- resilience tests using toxiproxy in testcontainers
- integration tests using updated everything-server with streamableHttp support
- improved logging
- session invalidation handling (both transport session and JSON-RPC concept of session)
- implicit initialization and burst protection (in case of concurrent `Mcp(Sync|Async)Client` use
- more logging, e.g. stdio process lifecycle logs
Related #72, #273, #253, #107, #105
Signed-off-by: Dariusz Jędrzejczyk
---
mcp-spring/mcp-spring-webflux/pom.xml | 6 +
.../WebClientStreamableHttpTransport.java | 520 ++++++++++++++++++
.../transport/WebFluxSseClientTransport.java | 3 +
...eamableHttpAsyncClientResiliencyTests.java | 17 +
...bClientStreamableHttpAsyncClientTests.java | 42 ++
...ebClientStreamableHttpSyncClientTests.java | 41 ++
.../client/WebFluxSseMcpAsyncClientTests.java | 3 +-
.../client/WebFluxSseMcpSyncClientTests.java | 3 +-
.../WebFluxSseClientTransportTests.java | 3 +-
.../src/test/resources/logback.xml | 8 +-
mcp-test/pom.xml | 5 +
...AbstractMcpAsyncClientResiliencyTests.java | 222 ++++++++
.../client/AbstractMcpAsyncClientTests.java | 37 +-
.../client/AbstractMcpSyncClientTests.java | 60 +-
.../client/McpAsyncClient.java | 291 ++++++----
.../transport/StdioClientTransport.java | 5 +
.../spec/DefaultMcpTransportSession.java | 79 +++
.../spec/DefaultMcpTransportStream.java | 74 +++
.../spec/McpClientSession.java | 53 +-
.../spec/McpClientTransport.java | 22 +-
.../modelcontextprotocol/spec/McpSchema.java | 6 +
.../spec/McpTransportSession.java | 60 ++
.../McpTransportSessionNotFoundException.java | 29 +
.../spec/McpTransportStream.java | 45 ++
.../client/AbstractMcpAsyncClientTests.java | 37 +-
.../client/AbstractMcpSyncClientTests.java | 58 +-
.../client/HttpSseMcpAsyncClientTests.java | 3 +-
.../client/HttpSseMcpSyncClientTests.java | 3 +-
.../client/StdioMcpSyncClientTests.java | 2 +-
.../HttpClientSseClientTransportTests.java | 3 +-
pom.xml | 3 +-
31 files changed, 1501 insertions(+), 242 deletions(-)
create mode 100644 mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java
create mode 100644 mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientResiliencyTests.java
create mode 100644 mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientTests.java
create mode 100644 mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpSyncClientTests.java
create mode 100644 mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportStream.java
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/McpTransportSession.java
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionNotFoundException.java
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/McpTransportStream.java
diff --git a/mcp-spring/mcp-spring-webflux/pom.xml b/mcp-spring/mcp-spring-webflux/pom.xml
index a8b92bd0..26452fe9 100644
--- a/mcp-spring/mcp-spring-webflux/pom.xml
+++ b/mcp-spring/mcp-spring-webflux/pom.xml
@@ -99,6 +99,12 @@
${testcontainers.version}test
+
+ org.testcontainers
+ toxiproxy
+ ${toxiproxy.version}
+ test
+ org.awaitility
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java
new file mode 100644
index 00000000..e7b7c8ee
--- /dev/null
+++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java
@@ -0,0 +1,520 @@
+package io.modelcontextprotocol.client.transport;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
+import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import io.modelcontextprotocol.spec.McpError;
+import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
+import io.modelcontextprotocol.spec.McpTransportSession;
+import io.modelcontextprotocol.spec.McpTransportStream;
+import io.modelcontextprotocol.util.Assert;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * An implementation of the Streamable HTTP protocol as defined by the
+ * 2025-03-26 version of the MCP specification.
+ *
+ *
+ * The transport is capable of resumability and reconnects. It reacts to transport-level
+ * session invalidation and will propagate {@link McpTransportSessionNotFoundException
+ * appropriate exceptions} to the higher level abstraction layer when needed in order to
+ * allow proper state management. The implementation handles servers that are stateful and
+ * provide session meta information, but can also communicate with stateless servers that
+ * do not provide a session identifier and do not support SSE streams.
+ *
+ *
+ * This implementation does not handle backwards compatibility with the "HTTP
+ * with SSE" transport. In order to communicate over the phased-out
+ * 2024-11-05 protocol, use {@link HttpClientSseClientTransport} or
+ * {@link WebFluxSseClientTransport}.
+ *
+ *
+ * @author Dariusz Jędrzejczyk
+ * @see Streamable
+ * HTTP transport specification
+ */
+public class WebClientStreamableHttpTransport implements McpClientTransport {
+
+ private static final Logger logger = LoggerFactory.getLogger(WebClientStreamableHttpTransport.class);
+
+ private static final String DEFAULT_ENDPOINT = "/mcp";
+
+ /**
+ * Event type for JSON-RPC messages received through the SSE connection. The server
+ * sends messages with this event type to transmit JSON-RPC protocol data.
+ */
+ private static final String MESSAGE_EVENT_TYPE = "message";
+
+ private static final ParameterizedTypeReference> PARAMETERIZED_TYPE_REF = new ParameterizedTypeReference<>() {
+ };
+
+ private final ObjectMapper objectMapper;
+
+ private final WebClient webClient;
+
+ private final String endpoint;
+
+ private final boolean openConnectionOnStartup;
+
+ private final boolean resumableStreams;
+
+ private final AtomicReference activeSession = new AtomicReference<>();
+
+ private final AtomicReference, Mono>> handler = new AtomicReference<>();
+
+ private final AtomicReference> exceptionHandler = new AtomicReference<>();
+
+ private WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Builder webClientBuilder,
+ String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) {
+ this.objectMapper = objectMapper;
+ this.webClient = webClientBuilder.build();
+ this.endpoint = endpoint;
+ this.resumableStreams = resumableStreams;
+ this.openConnectionOnStartup = openConnectionOnStartup;
+ this.activeSession.set(createTransportSession());
+ }
+
+ /**
+ * Create a stateful builder for creating {@link WebClientStreamableHttpTransport}
+ * instances.
+ * @param webClientBuilder the {@link WebClient.Builder} to use
+ * @return a builder which will create an instance of
+ * {@link WebClientStreamableHttpTransport} once {@link Builder#build()} is called
+ */
+ public static Builder builder(WebClient.Builder webClientBuilder) {
+ return new Builder(webClientBuilder);
+ }
+
+ @Override
+ public Mono connect(Function, Mono> handler) {
+ return Mono.deferContextual(ctx -> {
+ this.handler.set(handler);
+ if (openConnectionOnStartup) {
+ logger.debug("Eagerly opening connection on startup");
+ return this.reconnect(null).then();
+ }
+ return Mono.empty();
+ });
+ }
+
+ private DefaultMcpTransportSession createTransportSession() {
+ Supplier> onClose = () -> {
+ DefaultMcpTransportSession transportSession = this.activeSession.get();
+ return transportSession.sessionId().isEmpty() ? Mono.empty()
+ : webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
+ httpHeaders.add("mcp-session-id", transportSession.sessionId().get());
+ }).retrieve().toBodilessEntity().doOnError(e -> logger.info("Got response {}", e)).then();
+ };
+ return new DefaultMcpTransportSession(onClose);
+ }
+
+ @Override
+ public void setExceptionHandler(Consumer handler) {
+ logger.debug("Exception handler registered");
+ this.exceptionHandler.set(handler);
+ }
+
+ private void handleException(Throwable t) {
+ logger.debug("Handling exception for session {}", sessionIdOrPlaceholder(this.activeSession.get()), t);
+ if (t instanceof McpTransportSessionNotFoundException) {
+ McpTransportSession> invalidSession = this.activeSession.getAndSet(createTransportSession());
+ logger.warn("Server does not recognize session {}. Invalidating.", invalidSession.sessionId());
+ invalidSession.close();
+ }
+ Consumer handler = this.exceptionHandler.get();
+ if (handler != null) {
+ handler.accept(t);
+ }
+ }
+
+ @Override
+ public Mono closeGracefully() {
+ return Mono.defer(() -> {
+ logger.debug("Graceful close triggered");
+ DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession());
+ if (currentSession != null) {
+ return currentSession.closeGracefully();
+ }
+ return Mono.empty();
+ });
+ }
+
+ private Mono reconnect(McpTransportStream stream) {
+ return Mono.deferContextual(ctx -> {
+ if (stream != null) {
+ logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId());
+ }
+ else {
+ logger.debug("Reconnecting with no prior stream");
+ }
+ // Here we attempt to initialize the client. In case the server supports SSE,
+ // we will establish a long-running
+ // session here and listen for messages. If it doesn't, that's ok, the server
+ // is a simple, stateless one.
+ final AtomicReference disposableRef = new AtomicReference<>();
+ final McpTransportSession transportSession = this.activeSession.get();
+
+ Disposable connection = webClient.get()
+ .uri(this.endpoint)
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .headers(httpHeaders -> {
+ transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
+ if (stream != null) {
+ stream.lastId().ifPresent(id -> httpHeaders.add("last-event-id", id));
+ }
+ })
+ .exchangeToFlux(response -> {
+ if (isEventStream(response)) {
+ return eventStream(stream, response);
+ }
+ else if (isNotAllowed(response)) {
+ logger.debug("The server does not support SSE streams, using request-response mode.");
+ return Flux.empty();
+ }
+ else if (isNotFound(response)) {
+ String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
+ return mcpSessionNotFoundError(sessionIdRepresentation);
+ }
+ else {
+ return response.createError().doOnError(e -> {
+ logger.info("Opening an SSE stream failed. This can be safely ignored.", e);
+ }).flux();
+ }
+ })
+ .onErrorComplete(t -> {
+ this.handleException(t);
+ return true;
+ })
+ .doFinally(s -> {
+ Disposable ref = disposableRef.getAndSet(null);
+ if (ref != null) {
+ transportSession.removeConnection(ref);
+ }
+ })
+ .contextWrite(ctx)
+ .subscribe();
+
+ disposableRef.set(connection);
+ transportSession.addConnection(connection);
+ return Mono.just(connection);
+ });
+ }
+
+ @Override
+ public Mono sendMessage(McpSchema.JSONRPCMessage message) {
+ return Mono.create(sink -> {
+ logger.debug("Sending message {}", message);
+ // Here we attempt to initialize the client.
+ // In case the server supports SSE, we will establish a long-running session
+ // here and
+ // listen for messages.
+ // If it doesn't, nothing actually happens here, that's just the way it is...
+ final AtomicReference disposableRef = new AtomicReference<>();
+ final McpTransportSession transportSession = this.activeSession.get();
+
+ Disposable connection = webClient.post()
+ .uri(this.endpoint)
+ .accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
+ .headers(httpHeaders -> {
+ transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
+ })
+ .bodyValue(message)
+ .exchangeToFlux(response -> {
+ if (transportSession
+ .markInitialized(response.headers().asHttpHeaders().getFirst("mcp-session-id"))) {
+ // Once we have a session, we try to open an async stream for
+ // the server to send notifications and requests out-of-band.
+ reconnect(null).contextWrite(sink.contextView()).subscribe();
+ }
+
+ String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
+
+ // The spec mentions only ACCEPTED, but the existing SDKs can return
+ // 200 OK for notifications
+ if (response.statusCode().is2xxSuccessful()) {
+ Optional contentType = response.headers().contentType();
+ // Existing SDKs consume notifications with no response body nor
+ // content type
+ if (contentType.isEmpty()) {
+ logger.trace("Message was successfully sent via POST for session {}",
+ sessionRepresentation);
+ // signal the caller that the message was successfully
+ // delivered
+ sink.success();
+ // communicate to downstream there is no streamed data coming
+ return Flux.empty();
+ }
+ else {
+ MediaType mediaType = contentType.get();
+ if (mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
+ // communicate to caller that the message was delivered
+ sink.success();
+ // starting a stream
+ return newEventStream(response, sessionRepresentation);
+ }
+ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
+ logger.trace("Received response to POST for session {}", sessionRepresentation);
+ // communicate to caller the message was delivered
+ sink.success();
+ return responseFlux(response);
+ }
+ else {
+ logger.warn("Unknown media type {} returned for POST in session {}", contentType,
+ sessionRepresentation);
+ return Flux.error(new RuntimeException("Unknown media type returned: " + contentType));
+ }
+ }
+ }
+ else {
+ if (isNotFound(response)) {
+ return mcpSessionNotFoundError(sessionRepresentation);
+ }
+ return extractError(response, sessionRepresentation);
+ }
+ })
+ .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
+ .onErrorResume(t -> {
+ // handle the error first
+ this.handleException(t);
+ // inform the caller of sendMessage
+ sink.error(t);
+ return Flux.empty();
+ })
+ .doFinally(s -> {
+ Disposable ref = disposableRef.getAndSet(null);
+ if (ref != null) {
+ transportSession.removeConnection(ref);
+ }
+ })
+ .contextWrite(sink.contextView())
+ .subscribe();
+ disposableRef.set(connection);
+ transportSession.addConnection(connection);
+ });
+ }
+
+ private static Flux mcpSessionNotFoundError(String sessionRepresentation) {
+ logger.warn("Session {} was not found on the MCP server", sessionRepresentation);
+ // inform the stream/connection subscriber
+ return Flux.error(new McpTransportSessionNotFoundException(sessionRepresentation));
+ }
+
+ private Flux extractError(ClientResponse response, String sessionRepresentation) {
+ return response.createError().onErrorResume(e -> {
+ WebClientResponseException responseException = (WebClientResponseException) e;
+ byte[] body = responseException.getResponseBodyAsByteArray();
+ McpSchema.JSONRPCResponse.JSONRPCError jsonRpcError = null;
+ Exception toPropagate;
+ try {
+ McpSchema.JSONRPCResponse jsonRpcResponse = objectMapper.readValue(body,
+ McpSchema.JSONRPCResponse.class);
+ jsonRpcError = jsonRpcResponse.error();
+ toPropagate = new McpError(jsonRpcError);
+ }
+ catch (IOException ex) {
+ toPropagate = new RuntimeException("Sending request failed", e);
+ logger.debug("Received content together with {} HTTP code response: {}", response.statusCode(), body);
+ }
+
+ // Some implementations can return 400 when presented with a
+ // session id that it doesn't know about, so we will
+ // invalidate the session
+ // https://github.com/modelcontextprotocol/typescript-sdk/issues/389
+ if (responseException.getStatusCode().isSameCodeAs(HttpStatus.BAD_REQUEST)) {
+ return Mono.error(new McpTransportSessionNotFoundException(sessionRepresentation, toPropagate));
+ }
+ return Mono.empty();
+ }).flux();
+ }
+
+ private Flux eventStream(McpTransportStream stream, ClientResponse response) {
+ McpTransportStream sessionStream = stream != null ? stream
+ : new DefaultMcpTransportStream<>(this.resumableStreams, this::reconnect);
+ logger.debug("Connected stream {}", sessionStream.streamId());
+
+ var idWithMessages = response.bodyToFlux(PARAMETERIZED_TYPE_REF).map(this::parse);
+ return Flux.from(sessionStream.consumeSseStream(idWithMessages));
+ }
+
+ private static boolean isNotFound(ClientResponse response) {
+ return response.statusCode().isSameCodeAs(HttpStatus.NOT_FOUND);
+ }
+
+ private static boolean isNotAllowed(ClientResponse response) {
+ return response.statusCode().isSameCodeAs(HttpStatus.METHOD_NOT_ALLOWED);
+ }
+
+ private static boolean isEventStream(ClientResponse response) {
+ return response.statusCode().is2xxSuccessful() && response.headers().contentType().isPresent()
+ && response.headers().contentType().get().isCompatibleWith(MediaType.TEXT_EVENT_STREAM);
+ }
+
+ private static String sessionIdOrPlaceholder(McpTransportSession> transportSession) {
+ return transportSession.sessionId().orElse("[missing_session_id]");
+ }
+
+ private Flux responseFlux(ClientResponse response) {
+ return response.bodyToMono(String.class).>handle((responseMessage, s) -> {
+ try {
+ McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
+ responseMessage);
+ s.next(List.of(jsonRpcResponse));
+ }
+ catch (IOException e) {
+ s.error(e);
+ }
+ }).flatMapIterable(Function.identity());
+ }
+
+ private Flux newEventStream(ClientResponse response, String sessionRepresentation) {
+ McpTransportStream sessionStream = new DefaultMcpTransportStream<>(this.resumableStreams,
+ this::reconnect);
+ logger.trace("Sent POST and opened a stream ({}) for session {}", sessionStream.streamId(),
+ sessionRepresentation);
+ return eventStream(sessionStream, response);
+ }
+
+ @Override
+ public T unmarshalFrom(Object data, TypeReference typeRef) {
+ return this.objectMapper.convertValue(data, typeRef);
+ }
+
+ private Tuple2, Iterable> parse(ServerSentEvent event) {
+ if (MESSAGE_EVENT_TYPE.equals(event.event())) {
+ try {
+ // We don't support batching ATM and probably won't since the next version
+ // considers removing it.
+ McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
+ return Tuples.of(Optional.ofNullable(event.id()), List.of(message));
+ }
+ catch (IOException ioException) {
+ throw new McpError("Error parsing JSON-RPC message: " + event.data());
+ }
+ }
+ else {
+ throw new McpError("Received unrecognized SSE event type: " + event.event());
+ }
+ }
+
+ /**
+ * Builder for {@link WebClientStreamableHttpTransport}.
+ */
+ public static class Builder {
+
+ private ObjectMapper objectMapper;
+
+ private WebClient.Builder webClientBuilder;
+
+ private String endpoint = DEFAULT_ENDPOINT;
+
+ private boolean resumableStreams = true;
+
+ private boolean openConnectionOnStartup = false;
+
+ private Builder(WebClient.Builder webClientBuilder) {
+ Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
+ this.webClientBuilder = webClientBuilder;
+ }
+
+ /**
+ * Configure the {@link ObjectMapper} to use.
+ * @param objectMapper instance to use
+ * @return the builder instance
+ */
+ public Builder objectMapper(ObjectMapper objectMapper) {
+ Assert.notNull(objectMapper, "ObjectMapper must not be null");
+ this.objectMapper = objectMapper;
+ return this;
+ }
+
+ /**
+ * Configure the {@link WebClient.Builder} to construct the {@link WebClient}.
+ * @param webClientBuilder instance to use
+ * @return the builder instance
+ */
+ public Builder webClientBuilder(WebClient.Builder webClientBuilder) {
+ Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
+ this.webClientBuilder = webClientBuilder;
+ return this;
+ }
+
+ /**
+ * Configure the endpoint to make HTTP requests against.
+ * @param endpoint endpoint to use
+ * @return the builder instance
+ */
+ public Builder endpoint(String endpoint) {
+ Assert.hasText(endpoint, "endpoint must be a non-empty String");
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ /**
+ * Configure whether to use the stream resumability feature by keeping track of
+ * SSE event ids.
+ * @param resumableStreams if {@code true} event ids will be tracked and upon
+ * disconnection, the last seen id will be used upon reconnection as a header to
+ * resume consuming messages.
+ * @return the builder instance
+ */
+ public Builder resumableStreams(boolean resumableStreams) {
+ this.resumableStreams = resumableStreams;
+ return this;
+ }
+
+ /**
+ * Configure whether the client should open an SSE connection upon startup. Not
+ * all servers support this (although it is in theory possible with the current
+ * specification), so use with caution. By default, this value is {@code false}.
+ * @param openConnectionOnStartup if {@code true} the {@link #connect(Function)}
+ * method call will try to open an SSE connection before sending any JSON-RPC
+ * request
+ * @return the builder instance
+ */
+ public Builder openConnectionOnStartup(boolean openConnectionOnStartup) {
+ this.openConnectionOnStartup = openConnectionOnStartup;
+ return this;
+ }
+
+ /**
+ * Construct a fresh instance of {@link WebClientStreamableHttpTransport} using
+ * the current builder configuration.
+ * @return a new instance of {@link WebClientStreamableHttpTransport}
+ */
+ public WebClientStreamableHttpTransport build() {
+ ObjectMapper objectMapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
+
+ return new WebClientStreamableHttpTransport(objectMapper, this.webClientBuilder, endpoint, resumableStreams,
+ openConnectionOnStartup);
+ }
+
+ }
+
+}
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java
index 37abe295..128cda4c 100644
--- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java
+++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java
@@ -190,6 +190,9 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe
*/
@Override
public Mono connect(Function, Mono> handler) {
+ // TODO: Avoid eager connection opening and enable resilience
+ // -> upon disconnects, re-establish connection
+ // -> allow optimizing for eager connection start using a constructor flag
Flux> events = eventStream();
this.inboundSubscription = events.concatMap(event -> Mono.just(event).handle((e, s) -> {
if (ENDPOINT_EVENT_TYPE.equals(event.event())) {
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientResiliencyTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientResiliencyTests.java
new file mode 100644
index 00000000..80fc671e
--- /dev/null
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientResiliencyTests.java
@@ -0,0 +1,17 @@
+package io.modelcontextprotocol.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.web.reactive.function.client.WebClient;
+
+@Timeout(15)
+public class WebClientStreamableHttpAsyncClientResiliencyTests extends AbstractMcpAsyncClientResiliencyTests {
+
+ @Override
+ protected McpClientTransport createMcpTransport() {
+ return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
+ }
+
+}
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientTests.java
new file mode 100644
index 00000000..4c803265
--- /dev/null
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientTests.java
@@ -0,0 +1,42 @@
+package io.modelcontextprotocol.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+@Timeout(15)
+public class WebClientStreamableHttpAsyncClientTests extends AbstractMcpAsyncClientTests {
+
+ static String host = "http://localhost:3001";
+
+ // Uses the https://github.com/tzolov/mcp-everything-server-docker-image
+ @SuppressWarnings("resource")
+ GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
+ .withCommand("node dist/index.js streamableHttp")
+ .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
+ .withExposedPorts(3001)
+ .waitingFor(Wait.forHttp("/").forStatusCode(404));
+
+ @Override
+ protected McpClientTransport createMcpTransport() {
+ return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
+ }
+
+ @Override
+ protected void onStart() {
+ container.start();
+ int port = container.getMappedPort(3001);
+ host = "http://" + container.getHost() + ":" + port;
+ }
+
+ @Override
+ public void onClose() {
+ container.stop();
+ }
+
+}
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpSyncClientTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpSyncClientTests.java
new file mode 100644
index 00000000..a8cad489
--- /dev/null
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpSyncClientTests.java
@@ -0,0 +1,41 @@
+package io.modelcontextprotocol.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+@Timeout(15)
+public class WebClientStreamableHttpSyncClientTests extends AbstractMcpSyncClientTests {
+
+ static String host = "http://localhost:3001";
+
+ // Uses the https://github.com/tzolov/mcp-everything-server-docker-image
+ @SuppressWarnings("resource")
+ GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
+ .withCommand("node dist/index.js streamableHttp")
+ .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
+ .withExposedPorts(3001)
+ .waitingFor(Wait.forHttp("/").forStatusCode(404));
+
+ @Override
+ protected McpClientTransport createMcpTransport() {
+ return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
+ }
+
+ @Override
+ protected void onStart() {
+ container.start();
+ int port = container.getMappedPort(3001);
+ host = "http://" + container.getHost() + ":" + port;
+ }
+
+ @Override
+ public void onClose() {
+ container.stop();
+ }
+
+}
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java
index b43c1449..f0533cb4 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java
@@ -26,7 +26,8 @@ class WebFluxSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests {
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
@SuppressWarnings("resource")
- GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1")
+ GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
+ .withCommand("node dist/index.js sse")
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withExposedPorts(3001)
.waitingFor(Wait.forHttp("/").forStatusCode(404));
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java
index 66ac8a6d..9b0959a3 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java
@@ -26,7 +26,8 @@ class WebFluxSseMcpSyncClientTests extends AbstractMcpSyncClientTests {
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
@SuppressWarnings("resource")
- GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1")
+ GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
+ .withCommand("node dist/index.js sse")
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withExposedPorts(3001)
.waitingFor(Wait.forHttp("/").forStatusCode(404));
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java
index c757d3da..42b91d14 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java
@@ -41,7 +41,8 @@ class WebFluxSseClientTransportTests {
static String host = "http://localhost:3001";
@SuppressWarnings("resource")
- GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1")
+ GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
+ .withCommand("node dist/index.js sse")
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withExposedPorts(3001)
.waitingFor(Wait.forHttp("/").forStatusCode(404));
diff --git a/mcp-spring/mcp-spring-webflux/src/test/resources/logback.xml b/mcp-spring/mcp-spring-webflux/src/test/resources/logback.xml
index 5ad73374..abc831d1 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/resources/logback.xml
+++ b/mcp-spring/mcp-spring-webflux/src/test/resources/logback.xml
@@ -9,13 +9,13 @@
-
+
-
-
+
+
-
+
diff --git a/mcp-test/pom.xml b/mcp-test/pom.xml
index a6e5bdb0..9998569d 100644
--- a/mcp-test/pom.xml
+++ b/mcp-test/pom.xml
@@ -68,6 +68,11 @@
junit-jupiter${testcontainers.version}
+
+ org.testcontainers
+ toxiproxy
+ ${toxiproxy.version}
+ org.awaitility
diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java
new file mode 100644
index 00000000..85d6a88e
--- /dev/null
+++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java
@@ -0,0 +1,222 @@
+package io.modelcontextprotocol.client;
+
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.ToxiproxyClient;
+import eu.rekawek.toxiproxy.model.ToxicDirection;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpTransport;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.ToxiproxyContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import reactor.test.StepVerifier;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Resiliency test suite for the {@link McpAsyncClient} that can be used with different
+ * {@link McpTransport} implementations that support Streamable HTTP.
+ *
+ * The purpose of these tests is to allow validating the transport layer resiliency
+ * instead of the functionality offered by the logical layer of MCP concepts such as
+ * tools, resources, prompts, etc.
+ *
+ * @author Dariusz Jędrzejczyk
+ */
+public abstract class AbstractMcpAsyncClientResiliencyTests {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractMcpAsyncClientResiliencyTests.class);
+
+ static Network network = Network.newNetwork();
+ static String host = "http://localhost:3001";
+
+ // Uses the https://github.com/tzolov/mcp-everything-server-docker-image
+ @SuppressWarnings("resource")
+ static GenericContainer> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
+ .withCommand("node dist/index.js streamableHttp")
+ .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
+ .withNetwork(network)
+ .withNetworkAliases("everything-server")
+ .withExposedPorts(3001)
+ .waitingFor(Wait.forHttp("/").forStatusCode(404));
+
+ static ToxiproxyContainer toxiproxy = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0").withNetwork(network)
+ .withExposedPorts(8474, 3000);
+
+ static Proxy proxy;
+
+ static {
+ container.start();
+
+ toxiproxy.start();
+
+ final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
+ try {
+ proxy = toxiproxyClient.createProxy("everything-server", "0.0.0.0:3000", "everything-server:3001");
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Can't create proxy!", e);
+ }
+
+ final String ipAddressViaToxiproxy = toxiproxy.getHost();
+ final int portViaToxiproxy = toxiproxy.getMappedPort(3000);
+
+ host = "http://" + ipAddressViaToxiproxy + ":" + portViaToxiproxy;
+ }
+
+ private static void disconnect() {
+ long start = System.nanoTime();
+ try {
+ // disconnect
+ // proxy.toxics().bandwidth("CUT_CONNECTION_DOWNSTREAM",
+ // ToxicDirection.DOWNSTREAM, 0);
+ // proxy.toxics().bandwidth("CUT_CONNECTION_UPSTREAM",
+ // ToxicDirection.UPSTREAM, 0);
+ proxy.toxics().resetPeer("RESET_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0);
+ proxy.toxics().resetPeer("RESET_UPSTREAM", ToxicDirection.UPSTREAM, 0);
+ logger.info("Disconnect took {} ms", Duration.ofNanos(System.nanoTime() - start).toMillis());
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed to disconnect", e);
+ }
+ }
+
+ private static void reconnect() {
+ long start = System.nanoTime();
+ try {
+ proxy.toxics().get("RESET_UPSTREAM").remove();
+ proxy.toxics().get("RESET_DOWNSTREAM").remove();
+ // proxy.toxics().get("CUT_CONNECTION_DOWNSTREAM").remove();
+ // proxy.toxics().get("CUT_CONNECTION_UPSTREAM").remove();
+ logger.info("Reconnect took {} ms", Duration.ofNanos(System.nanoTime() - start).toMillis());
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed to reconnect", e);
+ }
+ }
+
+ private static void restartMcpServer() {
+ container.stop();
+ container.start();
+ }
+
+ abstract McpClientTransport createMcpTransport();
+
+ protected Duration getRequestTimeout() {
+ return Duration.ofSeconds(14);
+ }
+
+ protected Duration getInitializationTimeout() {
+ return Duration.ofSeconds(2);
+ }
+
+ McpAsyncClient client(McpClientTransport transport) {
+ return client(transport, Function.identity());
+ }
+
+ McpAsyncClient client(McpClientTransport transport, Function customizer) {
+ AtomicReference client = new AtomicReference<>();
+
+ assertThatCode(() -> {
+ McpClient.AsyncSpec builder = McpClient.async(transport)
+ .requestTimeout(getRequestTimeout())
+ .initializationTimeout(getInitializationTimeout())
+ .capabilities(McpSchema.ClientCapabilities.builder().roots(true).build());
+ builder = customizer.apply(builder);
+ client.set(builder.build());
+ }).doesNotThrowAnyException();
+
+ return client.get();
+ }
+
+ void withClient(McpClientTransport transport, Consumer c) {
+ withClient(transport, Function.identity(), c);
+ }
+
+ void withClient(McpClientTransport transport, Function customizer,
+ Consumer c) {
+ var client = client(transport, customizer);
+ try {
+ c.accept(client);
+ }
+ finally {
+ StepVerifier.create(client.closeGracefully()).expectComplete().verify(Duration.ofSeconds(10));
+ }
+ }
+
+ @Test
+ void testPing() {
+ withClient(createMcpTransport(), mcpAsyncClient -> {
+ StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
+
+ disconnect();
+
+ StepVerifier.create(mcpAsyncClient.ping()).expectError().verify();
+
+ reconnect();
+
+ StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
+ });
+ }
+
+ @Test
+ void testSessionInvalidation() {
+ withClient(createMcpTransport(), mcpAsyncClient -> {
+ StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
+
+ restartMcpServer();
+
+ // The first try will face the session mismatch exception and the second one
+ // will go through the re-initialization process.
+ StepVerifier.create(mcpAsyncClient.ping().retry(1)).expectNextCount(1).verifyComplete();
+ });
+ }
+
+ @Test
+ void testCallTool() {
+ withClient(createMcpTransport(), mcpAsyncClient -> {
+ AtomicReference> tools = new AtomicReference<>();
+ StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
+ StepVerifier.create(mcpAsyncClient.listTools())
+ .consumeNextWith(list -> tools.set(list.tools()))
+ .verifyComplete();
+
+ disconnect();
+
+ String name = tools.get().get(0).name();
+ // Assuming this is the echo tool
+ McpSchema.CallToolRequest request = new McpSchema.CallToolRequest(name, Map.of("message", "hello"));
+ StepVerifier.create(mcpAsyncClient.callTool(request)).expectError().verify();
+
+ reconnect();
+
+ StepVerifier.create(mcpAsyncClient.callTool(request)).expectNextCount(1).verifyComplete();
+ });
+ }
+
+ @Test
+ void testSessionClose() {
+ withClient(createMcpTransport(), mcpAsyncClient -> {
+ StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
+ // In case of Streamable HTTP this call should issue a HTTP DELETE request
+ // invalidating the session
+ StepVerifier.create(mcpAsyncClient.closeGracefully()).expectComplete().verify();
+ // The next use should immediately re-initialize with no issue and send the
+ // request without any broken connections.
+ StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
+ });
+ }
+
+}
diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java
index 5452c8ea..049bea00 100644
--- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java
+++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java
@@ -110,14 +110,16 @@ void tearDown() {
onClose();
}
- void verifyInitializationTimeout(Function> operation, String action) {
+ void verifyNotificationSucceedsWithImplicitInitialization(Function> operation,
+ String action) {
withClient(createMcpTransport(), mcpAsyncClient -> {
- StepVerifier.withVirtualTime(() -> operation.apply(mcpAsyncClient))
- .expectSubscription()
- .thenAwait(getInitializationTimeout())
- .consumeErrorWith(e -> assertThat(e).isInstanceOf(McpError.class)
- .hasMessage("Client must be initialized before " + action))
- .verify();
+ StepVerifier.create(operation.apply(mcpAsyncClient)).verifyComplete();
+ });
+ }
+
+ void verifyCallSucceedsWithImplicitInitialization(Function> operation, String action) {
+ withClient(createMcpTransport(), mcpAsyncClient -> {
+ StepVerifier.create(operation.apply(mcpAsyncClient)).expectNextCount(1).verifyComplete();
});
}
@@ -133,7 +135,7 @@ void testConstructorWithInvalidArguments() {
@Test
void testListToolsWithoutInitialization() {
- verifyInitializationTimeout(client -> client.listTools(null), "listing tools");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.listTools(null), "listing tools");
}
@Test
@@ -153,7 +155,7 @@ void testListTools() {
@Test
void testPingWithoutInitialization() {
- verifyInitializationTimeout(client -> client.ping(), "pinging the server");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.ping(), "pinging the server");
}
@Test
@@ -168,7 +170,7 @@ void testPing() {
@Test
void testCallToolWithoutInitialization() {
CallToolRequest callToolRequest = new CallToolRequest("echo", Map.of("message", ECHO_TEST_MESSAGE));
- verifyInitializationTimeout(client -> client.callTool(callToolRequest), "calling tools");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.callTool(callToolRequest), "calling tools");
}
@Test
@@ -202,7 +204,7 @@ void testCallToolWithInvalidTool() {
@Test
void testListResourcesWithoutInitialization() {
- verifyInitializationTimeout(client -> client.listResources(null), "listing resources");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.listResources(null), "listing resources");
}
@Test
@@ -233,7 +235,7 @@ void testMcpAsyncClientState() {
@Test
void testListPromptsWithoutInitialization() {
- verifyInitializationTimeout(client -> client.listPrompts(null), "listing " + "prompts");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.listPrompts(null), "listing " + "prompts");
}
@Test
@@ -258,7 +260,7 @@ void testListPrompts() {
@Test
void testGetPromptWithoutInitialization() {
GetPromptRequest request = new GetPromptRequest("simple_prompt", Map.of());
- verifyInitializationTimeout(client -> client.getPrompt(request), "getting " + "prompts");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.getPrompt(request), "getting " + "prompts");
}
@Test
@@ -279,7 +281,7 @@ void testGetPrompt() {
@Test
void testRootsListChangedWithoutInitialization() {
- verifyInitializationTimeout(client -> client.rootsListChangedNotification(),
+ verifyNotificationSucceedsWithImplicitInitialization(client -> client.rootsListChangedNotification(),
"sending roots list changed notification");
}
@@ -354,7 +356,8 @@ void testReadResource() {
@Test
void testListResourceTemplatesWithoutInitialization() {
- verifyInitializationTimeout(client -> client.listResourceTemplates(), "listing resource templates");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.listResourceTemplates(),
+ "listing resource templates");
}
@Test
@@ -447,8 +450,8 @@ void testInitializeWithAllCapabilities() {
@Test
void testLoggingLevelsWithoutInitialization() {
- verifyInitializationTimeout(client -> client.setLoggingLevel(McpSchema.LoggingLevel.DEBUG),
- "setting logging level");
+ verifyNotificationSucceedsWithImplicitInitialization(
+ client -> client.setLoggingLevel(McpSchema.LoggingLevel.DEBUG), "setting logging level");
}
@Test
diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java
index 128441f8..3785fd64 100644
--- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java
+++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java
@@ -5,6 +5,7 @@
package io.modelcontextprotocol.client;
import java.time.Duration;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -12,7 +13,6 @@
import java.util.function.Function;
import io.modelcontextprotocol.spec.McpClientTransport;
-import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.CallToolRequest;
import io.modelcontextprotocol.spec.McpSchema.CallToolResult;
@@ -112,33 +112,18 @@ void tearDown() {
static final Object DUMMY_RETURN_VALUE = new Object();
- void verifyNotificationTimesOut(Consumer operation, String action) {
- verifyCallTimesOut(client -> {
+ void verifyNotificationSucceedsWithImplicitInitialization(Consumer operation, String action) {
+ verifyCallSucceedsWithImplicitInitialization(client -> {
operation.accept(client);
return DUMMY_RETURN_VALUE;
}, action);
}
- void verifyCallTimesOut(Function blockingOperation, String action) {
+ void verifyCallSucceedsWithImplicitInitialization(Function blockingOperation, String action) {
withClient(createMcpTransport(), mcpSyncClient -> {
- // This scheduler is not replaced by virtual time scheduler
- Scheduler customScheduler = Schedulers.newBoundedElastic(1, 1, "actualBoundedElastic");
-
- StepVerifier.withVirtualTime(() -> Mono.fromSupplier(() -> blockingOperation.apply(mcpSyncClient))
- // Offload the blocking call to the real scheduler
- .subscribeOn(customScheduler))
- .expectSubscription()
- // This works without actually waiting but executes all the
- // tasks pending execution on the VirtualTimeScheduler.
- // It is possible to execute the blocking code from the operation
- // because it is blocked on a dedicated Scheduler and the main
- // flow is not blocked and uses the VirtualTimeScheduler.
- .thenAwait(getInitializationTimeout())
- .consumeErrorWith(e -> assertThat(e).isInstanceOf(McpError.class)
- .hasMessage("Client must be initialized before " + action))
- .verify();
-
- customScheduler.dispose();
+ StepVerifier.create(Mono.fromSupplier(() -> blockingOperation.apply(mcpSyncClient)))
+ .expectNextCount(1)
+ .verifyComplete();
});
}
@@ -154,7 +139,7 @@ void testConstructorWithInvalidArguments() {
@Test
void testListToolsWithoutInitialization() {
- verifyCallTimesOut(client -> client.listTools(null), "listing tools");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.listTools(null), "listing tools");
}
@Test
@@ -175,8 +160,8 @@ void testListTools() {
@Test
void testCallToolsWithoutInitialization() {
- verifyCallTimesOut(client -> client.callTool(new CallToolRequest("add", Map.of("a", 3, "b", 4))),
- "calling tools");
+ verifyCallSucceedsWithImplicitInitialization(
+ client -> client.callTool(new CallToolRequest("add", Map.of("a", 3, "b", 4))), "calling tools");
}
@Test
@@ -200,7 +185,7 @@ void testCallTools() {
@Test
void testPingWithoutInitialization() {
- verifyCallTimesOut(client -> client.ping(), "pinging the server");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.ping(), "pinging the server");
}
@Test
@@ -214,7 +199,7 @@ void testPing() {
@Test
void testCallToolWithoutInitialization() {
CallToolRequest callToolRequest = new CallToolRequest("echo", Map.of("message", TEST_MESSAGE));
- verifyCallTimesOut(client -> client.callTool(callToolRequest), "calling tools");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.callTool(callToolRequest), "calling tools");
}
@Test
@@ -243,7 +228,7 @@ void testCallToolWithInvalidTool() {
@Test
void testRootsListChangedWithoutInitialization() {
- verifyNotificationTimesOut(client -> client.rootsListChangedNotification(),
+ verifyNotificationSucceedsWithImplicitInitialization(client -> client.rootsListChangedNotification(),
"sending roots list changed notification");
}
@@ -257,7 +242,7 @@ void testRootsListChanged() {
@Test
void testListResourcesWithoutInitialization() {
- verifyCallTimesOut(client -> client.listResources(null), "listing resources");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.listResources(null), "listing resources");
}
@Test
@@ -333,8 +318,14 @@ void testRemoveNonExistentRoot() {
@Test
void testReadResourceWithoutInitialization() {
- Resource resource = new Resource("test://uri", "Test Resource", null, null, null);
- verifyCallTimesOut(client -> client.readResource(resource), "reading resources");
+ AtomicReference> resources = new AtomicReference<>();
+ withClient(createMcpTransport(), mcpSyncClient -> {
+ mcpSyncClient.initialize();
+ resources.set(mcpSyncClient.listResources().resources());
+ });
+
+ verifyCallSucceedsWithImplicitInitialization(client -> client.readResource(resources.get().get(0)),
+ "reading resources");
}
@Test
@@ -355,7 +346,8 @@ void testReadResource() {
@Test
void testListResourceTemplatesWithoutInitialization() {
- verifyCallTimesOut(client -> client.listResourceTemplates(null), "listing resource templates");
+ verifyCallSucceedsWithImplicitInitialization(client -> client.listResourceTemplates(null),
+ "listing resource templates");
}
@Test
@@ -413,8 +405,8 @@ void testNotificationHandlers() {
@Test
void testLoggingLevelsWithoutInitialization() {
- verifyNotificationTimesOut(client -> client.setLoggingLevel(McpSchema.LoggingLevel.DEBUG),
- "setting logging level");
+ verifyNotificationSucceedsWithImplicitInitialization(
+ client -> client.setLoggingLevel(McpSchema.LoggingLevel.DEBUG), "setting logging level");
}
@Test
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
index a22ef6b5..8f0433eb 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
@@ -9,9 +9,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.function.Supplier;
import com.fasterxml.jackson.core.type.TypeReference;
import io.modelcontextprotocol.spec.McpClientSession;
@@ -32,7 +32,7 @@
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpSchema.PaginatedRequest;
import io.modelcontextprotocol.spec.McpSchema.Root;
-import io.modelcontextprotocol.spec.McpTransport;
+import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import org.slf4j.Logger;
@@ -77,29 +77,37 @@
* @see McpClient
* @see McpSchema
* @see McpClientSession
+ * @see McpClientTransport
*/
public class McpAsyncClient {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClient.class);
- private static TypeReference VOID_TYPE_REFERENCE = new TypeReference<>() {
+ private static final TypeReference VOID_TYPE_REFERENCE = new TypeReference<>() {
};
- protected final Sinks.One initializedSink = Sinks.one();
+ public static final TypeReference
*/
public Mono initialize() {
+ return withSession("by explicit API call", init -> Mono.just(init.get()));
+ }
+ private Mono doInitialize(McpClientSession mcpClientSession) {
String latestVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(// @formatter:off
@@ -356,16 +395,10 @@ public Mono initialize() {
this.clientCapabilities,
this.clientInfo); // @formatter:on
- Mono result = this.mcpSession.sendRequest(McpSchema.METHOD_INITIALIZE,
- initializeRequest, new TypeReference() {
- });
+ Mono result = mcpClientSession.sendRequest(McpSchema.METHOD_INITIALIZE,
+ initializeRequest, INITIALIZE_RESULT_TYPE_REF);
return result.flatMap(initializeResult -> {
-
- this.serverCapabilities = initializeResult.capabilities();
- this.serverInstructions = initializeResult.instructions();
- this.serverInfo = initializeResult.serverInfo();
-
logger.info("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}",
initializeResult.protocolVersion(), initializeResult.capabilities(), initializeResult.serverInfo(),
initializeResult.instructions());
@@ -375,28 +408,93 @@ public Mono initialize() {
"Unsupported protocol version from the server: " + initializeResult.protocolVersion()));
}
- return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null).doOnSuccess(v -> {
- this.initialized.set(true);
- this.initializedSink.tryEmitValue(initializeResult);
- }).thenReturn(initializeResult);
+ return mcpClientSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
+ .thenReturn(initializeResult);
});
}
+ private static class Initialization {
+
+ private final Sinks.One initSink = Sinks.one();
+
+ private final AtomicReference result = new AtomicReference<>();
+
+ private final AtomicReference mcpClientSession = new AtomicReference<>();
+
+ static Initialization create() {
+ return new Initialization();
+ }
+
+ void setMcpClientSession(McpClientSession mcpClientSession) {
+ this.mcpClientSession.set(mcpClientSession);
+ }
+
+ McpClientSession mcpSession() {
+ return this.mcpClientSession.get();
+ }
+
+ McpSchema.InitializeResult get() {
+ return this.result.get();
+ }
+
+ Mono await() {
+ return this.initSink.asMono();
+ }
+
+ void complete(McpSchema.InitializeResult initializeResult) {
+ // first ensure the result is cached
+ this.result.set(initializeResult);
+ // inform all the subscribers waiting for the initialization
+ this.initSink.emitValue(initializeResult, Sinks.EmitFailureHandler.FAIL_FAST);
+ }
+
+ void error(Throwable t) {
+ this.initSink.emitError(t, Sinks.EmitFailureHandler.FAIL_FAST);
+ }
+
+ void close() {
+ this.mcpSession().close();
+ }
+
+ Mono closeGracefully() {
+ return this.mcpSession().closeGracefully();
+ }
+
+ }
+
/**
- * Utility method to handle the common pattern of checking initialization before
+ * Utility method to handle the common pattern of ensuring initialization before
* executing an operation.
* @param The type of the result Mono
- * @param actionName The action to perform if the client is initialized
- * @param operation The operation to execute if the client is initialized
+ * @param actionName The action to perform when the client is initialized
+ * @param operation The operation to execute when the client is initialized
* @return A Mono that completes with the result of the operation
*/
- private Mono withInitializationCheck(String actionName,
- Function> operation) {
- return this.initializedSink.asMono()
- .timeout(this.initializationTimeout)
- .onErrorResume(TimeoutException.class,
- ex -> Mono.error(new McpError("Client must be initialized before " + actionName)))
- .flatMap(operation);
+ private Mono withSession(String actionName, Function> operation) {
+ return Mono.defer(() -> {
+ Initialization newInit = Initialization.create();
+ Initialization previous = this.initializationRef.compareAndExchange(null, newInit);
+
+ boolean needsToInitialize = previous == null;
+ logger.debug(needsToInitialize ? "Initialization process started" : "Joining previous initialization");
+ if (needsToInitialize) {
+ newInit.setMcpClientSession(this.sessionSupplier.get());
+ }
+
+ Mono initializationJob = needsToInitialize
+ ? doInitialize(newInit.mcpSession()).doOnNext(newInit::complete).onErrorResume(ex -> {
+ newInit.error(ex);
+ return Mono.error(ex);
+ }) : previous.await();
+
+ return initializationJob.map(initializeResult -> this.initializationRef.get())
+ .timeout(this.initializationTimeout)
+ .onErrorResume(ex -> {
+ logger.warn("Failed to initialize", ex);
+ return Mono.error(new McpError("Client failed to initialize " + actionName));
+ })
+ .flatMap(operation);
+ });
}
// --------------------------
@@ -408,9 +506,8 @@ private Mono withInitializationCheck(String actionName,
* @return A Mono that completes with the server's ping response
*/
public Mono