Skip to content

Commit c88ac93

Browse files
tzolovchemicL
andcommitted
feat(mcp): refactor logging to use exchange for targeted client notifications (modelcontextprotocol#132)
Refactors the MCP logging system to use the exchange mechanism for sending logging notifications only to specific client sessions rather than broadcasting to all clients. - Move logging notification delivery from server-wide broadcast to per-session exchange - Implement per-session minimum logging level tracking and filtering - Add proper logging level filtering at the exchange level - Change setLoggingLevel from notification to request/response pattern (breaking change) - Deprecate global server.loggingNotification in favor of exchange.loggingNotification - Add SetLevelRequest record to McpSchema - Add integration test demonstrating filtered logging notifications Resolves modelcontextprotocol#131 Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com> Co-authored-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
1 parent 2895d15 commit c88ac93

File tree

17 files changed

+721
-613
lines changed

17 files changed

+721
-613
lines changed

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java

Lines changed: 230 additions & 126 deletions
Large diffs are not rendered by default.

mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java

Lines changed: 134 additions & 126 deletions
Large diffs are not rendered by default.

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.junit.jupiter.api.BeforeEach;
3232
import org.junit.jupiter.api.Disabled;
3333
import org.junit.jupiter.api.Test;
34+
import reactor.core.publisher.Flux;
3435
import reactor.core.publisher.Mono;
3536
import reactor.test.StepVerifier;
3637

@@ -453,15 +454,10 @@ void testLoggingLevelsWithoutInitialization() {
453454
@Test
454455
void testLoggingLevels() {
455456
withClient(createMcpTransport(), mcpAsyncClient -> {
456-
Mono<Void> testAllLevels = mcpAsyncClient.initialize().then(Mono.defer(() -> {
457-
Mono<Void> chain = Mono.empty();
458-
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
459-
chain = chain.then(mcpAsyncClient.setLoggingLevel(level));
460-
}
461-
return chain;
462-
}));
463-
464-
StepVerifier.create(testAllLevels).verifyComplete();
457+
StepVerifier
458+
.create(mcpAsyncClient.initialize()
459+
.thenMany(Flux.fromArray(McpSchema.LoggingLevel.values()).flatMap(mcpAsyncClient::setLoggingLevel)))
460+
.verifyComplete();
465461
});
466462
}
467463

mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -416,53 +416,4 @@ void testRootsChangeHandlers() {
416416
.doesNotThrowAnyException();
417417
}
418418

419-
// ---------------------------------------
420-
// Logging Tests
421-
// ---------------------------------------
422-
423-
@Test
424-
void testLoggingLevels() {
425-
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
426-
.serverInfo("test-server", "1.0.0")
427-
.capabilities(ServerCapabilities.builder().logging().build())
428-
.build();
429-
430-
// Test all logging levels
431-
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
432-
var notification = McpSchema.LoggingMessageNotification.builder()
433-
.level(level)
434-
.logger("test-logger")
435-
.data("Test message with level " + level)
436-
.build();
437-
438-
StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete();
439-
}
440-
}
441-
442-
@Test
443-
void testLoggingWithoutCapability() {
444-
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
445-
.serverInfo("test-server", "1.0.0")
446-
.capabilities(ServerCapabilities.builder().build()) // No logging capability
447-
.build();
448-
449-
var notification = McpSchema.LoggingMessageNotification.builder()
450-
.level(McpSchema.LoggingLevel.INFO)
451-
.logger("test-logger")
452-
.data("Test log message")
453-
.build();
454-
455-
StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete();
456-
}
457-
458-
@Test
459-
void testLoggingWithNullNotification() {
460-
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
461-
.serverInfo("test-server", "1.0.0")
462-
.capabilities(ServerCapabilities.builder().logging().build())
463-
.build();
464-
465-
StepVerifier.create(mcpAsyncServer.loggingNotification(null)).verifyError(McpError.class);
466-
}
467-
468419
}

mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -388,53 +388,4 @@ void testRootsChangeHandlers() {
388388
assertThatCode(() -> noConsumersServer.closeGracefully()).doesNotThrowAnyException();
389389
}
390390

391-
// ---------------------------------------
392-
// Logging Tests
393-
// ---------------------------------------
394-
395-
@Test
396-
void testLoggingLevels() {
397-
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
398-
.serverInfo("test-server", "1.0.0")
399-
.capabilities(ServerCapabilities.builder().logging().build())
400-
.build();
401-
402-
// Test all logging levels
403-
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
404-
var notification = McpSchema.LoggingMessageNotification.builder()
405-
.level(level)
406-
.logger("test-logger")
407-
.data("Test message with level " + level)
408-
.build();
409-
410-
assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException();
411-
}
412-
}
413-
414-
@Test
415-
void testLoggingWithoutCapability() {
416-
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
417-
.serverInfo("test-server", "1.0.0")
418-
.capabilities(ServerCapabilities.builder().build()) // No logging capability
419-
.build();
420-
421-
var notification = McpSchema.LoggingMessageNotification.builder()
422-
.level(McpSchema.LoggingLevel.INFO)
423-
.logger("test-logger")
424-
.data("Test log message")
425-
.build();
426-
427-
assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException();
428-
}
429-
430-
@Test
431-
void testLoggingWithNullNotification() {
432-
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
433-
.serverInfo("test-server", "1.0.0")
434-
.capabilities(ServerCapabilities.builder().logging().build())
435-
.build();
436-
437-
assertThatThrownBy(() -> mcpSyncServer.loggingNotification(null)).isInstanceOf(McpError.class);
438-
}
439-
440391
}

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -786,10 +786,9 @@ public Mono<Void> setLoggingLevel(LoggingLevel loggingLevel) {
786786
}
787787

788788
return this.withInitializationCheck("setting logging level", initializedResult -> {
789-
String levelName = this.transport.unmarshalFrom(loggingLevel, new TypeReference<String>() {
790-
});
791-
Map<String, Object> params = Map.of("level", levelName);
792-
return this.mcpSession.sendNotification(McpSchema.METHOD_LOGGING_SET_LEVEL, params);
789+
var params = new McpSchema.SetLevelRequest(loggingLevel);
790+
return this.mcpSession.sendRequest(McpSchema.METHOD_LOGGING_SET_LEVEL, params, new TypeReference<Object>() {
791+
}).then();
793792
});
794793
}
795794

mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import java.time.Duration;
88

9-
import io.modelcontextprotocol.spec.McpClientTransport;
109
import io.modelcontextprotocol.spec.McpSchema;
1110
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
1211
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.modelcontextprotocol.spec.McpSchema.CallToolResult;
2222
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
2323
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
24+
import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest;
2425
import io.modelcontextprotocol.spec.McpSchema.Tool;
2526
import io.modelcontextprotocol.spec.McpServerSession;
2627
import io.modelcontextprotocol.spec.McpServerTransportProvider;
@@ -216,11 +217,17 @@ public Mono<Void> notifyPromptsListChanged() {
216217
// ---------------------------------------
217218

218219
/**
219-
* Send a logging message notification to all connected clients. Messages below the
220-
* current minimum logging level will be filtered out.
220+
* This implementation would, incorrectly, broadcast the logging message to all
221+
* connected clients, using a single minLoggingLevel for all of them. Similar to the
222+
* sampling and roots, the logging level should be set per client session and use the
223+
* ServerExchange to send the logging message to the right client.
221224
* @param loggingMessageNotification The logging message to send
222225
* @return A Mono that completes when the notification has been sent
226+
* @deprecated Use
227+
* {@link McpAsyncServerExchange#loggingNotification(LoggingMessageNotification)}
228+
* instead.
223229
*/
230+
@Deprecated
224231
public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageNotification) {
225232
return this.delegate.loggingNotification(loggingMessageNotification);
226233
}
@@ -257,6 +264,8 @@ private static class AsyncServerImpl extends McpAsyncServer {
257264

258265
private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
259266

267+
// FIXME: this field is deprecated and should be remvoed together with the
268+
// broadcasting loggingNotification.
260269
private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;
261270

262271
private List<String> protocolVersions = List.of(McpSchema.LATEST_PROTOCOL_VERSION);
@@ -677,12 +686,22 @@ public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageN
677686
loggingMessageNotification);
678687
}
679688

680-
private McpServerSession.RequestHandler<Void> setLoggerRequestHandler() {
689+
private McpServerSession.RequestHandler<Object> setLoggerRequestHandler() {
681690
return (exchange, params) -> {
682-
this.minLoggingLevel = objectMapper.convertValue(params, new TypeReference<LoggingLevel>() {
683-
});
691+
return Mono.defer(() -> {
684692

685-
return Mono.empty();
693+
SetLevelRequest newMinLoggingLevel = objectMapper.convertValue(params,
694+
new TypeReference<SetLevelRequest>() {
695+
});
696+
697+
exchange.setMinLoggingLevel(newMinLoggingLevel.level());
698+
699+
// FIXME: this field is deprecated and should be removed together
700+
// with the broadcasting loggingNotification.
701+
this.minLoggingLevel = newMinLoggingLevel.level();
702+
703+
return Mono.just(Map.of());
704+
});
686705
};
687706
}
688707

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*/
4+
15
package io.modelcontextprotocol.server;
26

37
import com.fasterxml.jackson.core.type.TypeReference;
48
import io.modelcontextprotocol.spec.McpError;
59
import io.modelcontextprotocol.spec.McpSchema;
10+
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
11+
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
612
import io.modelcontextprotocol.spec.McpServerSession;
13+
import io.modelcontextprotocol.util.Assert;
714
import reactor.core.publisher.Mono;
815

916
/**
1017
* Represents an asynchronous exchange with a Model Context Protocol (MCP) client. The
1118
* exchange provides methods to interact with the client and query its capabilities.
1219
*
1320
* @author Dariusz Jędrzejczyk
21+
* @author Christian Tzolov
1422
*/
1523
public class McpAsyncServerExchange {
1624

@@ -20,6 +28,8 @@ public class McpAsyncServerExchange {
2028

2129
private final McpSchema.Implementation clientInfo;
2230

31+
private volatile LoggingLevel minLoggingLevel = LoggingLevel.INFO;
32+
2333
private static final TypeReference<McpSchema.CreateMessageResult> CREATE_MESSAGE_RESULT_TYPE_REF = new TypeReference<>() {
2434
};
2535

@@ -101,4 +111,38 @@ public Mono<McpSchema.ListRootsResult> listRoots(String cursor) {
101111
LIST_ROOTS_RESULT_TYPE_REF);
102112
}
103113

114+
/**
115+
* Send a logging message notification to all connected clients. Messages below the
116+
* current minimum logging level will be filtered out.
117+
* @param loggingMessageNotification The logging message to send
118+
* @return A Mono that completes when the notification has been sent
119+
*/
120+
public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageNotification) {
121+
122+
if (loggingMessageNotification == null) {
123+
return Mono.error(new McpError("Logging message must not be null"));
124+
}
125+
126+
return Mono.defer(() -> {
127+
if (this.isNotificationForLevelAllowed(loggingMessageNotification.level())) {
128+
return this.session.sendNotification(McpSchema.METHOD_NOTIFICATION_MESSAGE, loggingMessageNotification);
129+
}
130+
return Mono.empty();
131+
});
132+
}
133+
134+
/**
135+
* Set the minimum logging level for the client. Messages below this level will be
136+
* filtered out.
137+
* @param minLoggingLevel The minimum logging level
138+
*/
139+
void setMinLoggingLevel(LoggingLevel minLoggingLevel) {
140+
Assert.notNull(minLoggingLevel, "minLoggingLevel must not be null");
141+
this.minLoggingLevel = minLoggingLevel;
142+
}
143+
144+
private boolean isNotificationForLevelAllowed(LoggingLevel loggingLevel) {
145+
return loggingLevel.level() >= this.minLoggingLevel.level();
146+
}
147+
104148
}

mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44

55
package io.modelcontextprotocol.server;
66

7-
import io.modelcontextprotocol.spec.McpError;
87
import io.modelcontextprotocol.spec.McpSchema;
9-
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
108
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
119
import io.modelcontextprotocol.util.Assert;
1210

@@ -151,9 +149,16 @@ public void notifyPromptsListChanged() {
151149
}
152150

153151
/**
154-
* Send a logging message notification to all clients.
155-
* @param loggingMessageNotification The logging message notification to send
152+
* This implementation would, incorrectly, broadcast the logging message to all
153+
* connected clients, using a single minLoggingLevel for all of them. Similar to the
154+
* sampling and roots, the logging level should be set per client session and use the
155+
* ServerExchange to send the logging message to the right client.
156+
* @param loggingMessageNotification The logging message to send
157+
* @deprecated Use
158+
* {@link McpSyncServerExchange#loggingNotification(LoggingMessageNotification)}
159+
* instead.
156160
*/
161+
@Deprecated
157162
public void loggingNotification(LoggingMessageNotification loggingMessageNotification) {
158163
this.asyncServer.loggingNotification(loggingMessageNotification).block();
159164
}

mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*/
4+
15
package io.modelcontextprotocol.server;
26

3-
import com.fasterxml.jackson.core.type.TypeReference;
47
import io.modelcontextprotocol.spec.McpSchema;
8+
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
9+
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
510

611
/**
712
* Represents a synchronous exchange with a Model Context Protocol (MCP) client. The
813
* exchange provides methods to interact with the client and query its capabilities.
914
*
1015
* @author Dariusz Jędrzejczyk
16+
* @author Christian Tzolov
1117
*/
1218
public class McpSyncServerExchange {
1319

@@ -75,4 +81,13 @@ public McpSchema.ListRootsResult listRoots(String cursor) {
7581
return this.exchange.listRoots(cursor).block();
7682
}
7783

84+
/**
85+
* Send a logging message notification to all connected clients. Messages below the
86+
* current minimum logging level will be filtered out.
87+
* @param loggingMessageNotification The logging message to send
88+
*/
89+
public void loggingNotification(LoggingMessageNotification loggingMessageNotification) {
90+
this.exchange.loggingNotification(loggingMessageNotification).block();
91+
}
92+
7893
}

mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,11 @@ public int level() {
11651165

11661166
} // @formatter:on
11671167

1168+
@JsonInclude(JsonInclude.Include.NON_ABSENT)
1169+
@JsonIgnoreProperties(ignoreUnknown = true)
1170+
public record SetLevelRequest(@JsonProperty("level") LoggingLevel level) {
1171+
}
1172+
11681173
// ---------------------------
11691174
// Autocomplete
11701175
// ---------------------------

mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.junit.jupiter.api.BeforeEach;
3232
import org.junit.jupiter.api.Disabled;
3333
import org.junit.jupiter.api.Test;
34+
import reactor.core.publisher.Flux;
3435
import reactor.core.publisher.Mono;
3536
import reactor.test.StepVerifier;
3637

@@ -454,15 +455,10 @@ void testLoggingLevelsWithoutInitialization() {
454455
@Test
455456
void testLoggingLevels() {
456457
withClient(createMcpTransport(), mcpAsyncClient -> {
457-
Mono<Void> testAllLevels = mcpAsyncClient.initialize().then(Mono.defer(() -> {
458-
Mono<Void> chain = Mono.empty();
459-
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
460-
chain = chain.then(mcpAsyncClient.setLoggingLevel(level));
461-
}
462-
return chain;
463-
}));
464-
465-
StepVerifier.create(testAllLevels).verifyComplete();
458+
StepVerifier
459+
.create(mcpAsyncClient.initialize()
460+
.thenMany(Flux.fromArray(McpSchema.LoggingLevel.values()).flatMap(mcpAsyncClient::setLoggingLevel)))
461+
.verifyComplete();
466462
});
467463
}
468464

0 commit comments

Comments
 (0)