Skip to content

Commit f19f4a9

Browse files
authored
feat(tool): Add kafka consume messages messages (#23)
* Add consume messages tool. Bump dependencies * chore(tool): Add schema registry options for produce/consume tools (#26) * wip * Modify producer handler to produce with schema registry * Enhance client manager and producer handler to support Schema Registry integration - Added SchemaRegistryClient support in ClientManager and DefaultClientManager. - Implemented SchemaRegistryClientHandler interface for managing schema registry connections. - Updated ProduceKafkaMessageHandler to utilize the Schema Registry for message serialization, including schema registration and validation. - Refactored message serialization logic to handle schema checks and improved error handling for serialization failures. * Implement Schema Registry support in Kafka message handlers - Introduced schema registry integration in ConsumeKafkaMessagesHandler for automatic deserialization of messages. - Enhanced ProduceKafkaMessageHandler to support schema checks and serialization using Schema Registry. - Added new utility functions for schema management and message processing. - Refactored existing code to improve error handling and streamline message processing logic. * docs and formatting * refactoring of middleware and supplying configs to client manager
1 parent b6c0050 commit f19f4a9

11 files changed

+5454
-621
lines changed

package-lock.json

Lines changed: 4528 additions & 496 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@confluentinc/mcp-confluent",
33
"description": "Confluent MCP Server",
4-
"version": "1.0.1",
4+
"version": "1.0.2",
55
"author": "Confluent Inc.",
66
"license": "MIT",
77
"repository": {
@@ -34,33 +34,34 @@
3434
"prepare": "husky"
3535
},
3636
"devDependencies": {
37-
"@eslint/js": "^9.23.0",
37+
"@eslint/js": "^9.25.1",
3838
"@types/content-type": "^1.1.8",
3939
"@types/eslint__js": "^8.42.3",
40-
"@types/node": "^22.14.0",
40+
"@types/node": "^22.15.3",
4141
"@types/ws": "^8.18.1",
42-
"@typescript-eslint/eslint-plugin": "^8.29.0",
43-
"@typescript-eslint/parser": "^8.29.0",
42+
"@typescript-eslint/eslint-plugin": "^8.31.0",
43+
"@typescript-eslint/parser": "^8.31.0",
4444
"concurrently": "^9.1.2",
45-
"eslint": "^9.23.0",
46-
"eslint-config-prettier": "^10.1.1",
45+
"eslint": "^9.25.1",
46+
"eslint-config-prettier": "^10.1.2",
4747
"eslint-plugin-prettier": "^5.2.6",
4848
"globals": "^16.0.0",
4949
"husky": "^9.1.7",
5050
"openapi-typescript": "^7.6.1",
5151
"prettier": "3.5.3",
52-
"tsc-alias": "^1.8.13",
53-
"typescript": "^5.8.2",
54-
"typescript-eslint": "^8.29.0"
52+
"tsc-alias": "^1.8.15",
53+
"typescript": "^5.8.3",
54+
"typescript-eslint": "^8.31.0"
5555
},
5656
"dependencies": {
5757
"@commander-js/extra-typings": "^13.1.0",
58-
"@confluentinc/kafka-javascript": "^1.2.0",
59-
"@modelcontextprotocol/sdk": "^1.8.0",
58+
"@confluentinc/kafka-javascript": "^1.3.0",
59+
"@confluentinc/schemaregistry": "^1.3.1",
60+
"@modelcontextprotocol/sdk": "^1.10.2",
6061
"commander": "^13.1.0",
61-
"dotenv": "^16.4.7",
62+
"dotenv": "^16.5.0",
6263
"openapi-fetch": "^0.13.5",
63-
"zod": "^3.24.2"
64+
"zod": "^3.24.3"
6465
},
6566
"files": [
6667
"dist"

src/confluent/client-manager.ts

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
*/
44

55
import { KafkaJS } from "@confluentinc/kafka-javascript";
6+
import { SchemaRegistryClient } from "@confluentinc/schemaregistry";
67
import {
7-
confluentCloudAuthMiddleware,
8-
confluentCloudFlinkAuthMiddleware,
9-
confluentCloudKafkaAuthMiddleware,
10-
confluentCloudSchemaRegistryAuthMiddleware,
8+
ConfluentAuth,
9+
ConfluentEndpoints,
10+
createAuthMiddleware,
1111
} from "@src/confluent/middleware.js";
1212
import { paths } from "@src/confluent/openapi-schema.js";
1313
import { AsyncLazy, Lazy } from "@src/lazy.js";
@@ -24,7 +24,7 @@ export interface KafkaClientManager {
2424
/** Gets a connected producer client for publishing messages */
2525
getProducer(): Promise<KafkaJS.Producer>;
2626
/** Gets a connected consumer client for subscribing to topics */
27-
getConsumer(): Promise<KafkaJS.Consumer>;
27+
getConsumer(sessionId?: string): Promise<KafkaJS.Consumer>;
2828
/** Disconnects and cleans up all client connections */
2929
disconnect(): Promise<void>;
3030
}
@@ -51,15 +51,38 @@ export interface ConfluentCloudRestClientManager {
5151
setConfluentCloudKafkaRestEndpoint(endpoint: string): void;
5252
}
5353

54+
/**
55+
* Interface for managing Schema Registry client connections.
56+
*/
57+
export interface SchemaRegistryClientHandler {
58+
getSchemaRegistryClient(): SchemaRegistryClient;
59+
}
60+
5461
export interface ClientManager
5562
extends KafkaClientManager,
56-
ConfluentCloudRestClientManager {}
63+
ConfluentCloudRestClientManager,
64+
SchemaRegistryClientHandler {
65+
getSchemaRegistryClient(): SchemaRegistryClient;
66+
}
67+
68+
export interface ClientManagerConfig {
69+
kafka: KafkaJS.CommonConstructorConfig;
70+
endpoints: ConfluentEndpoints;
71+
auth: {
72+
cloud: ConfluentAuth;
73+
flink: ConfluentAuth;
74+
schemaRegistry: ConfluentAuth;
75+
kafka: ConfluentAuth;
76+
};
77+
}
5778

5879
/**
5980
* Default implementation of client management for Kafka and Confluent Cloud services.
6081
* Manages lifecycle and lazy initialization of various client connections.
6182
*/
62-
export class DefaultClientManager implements ClientManager {
83+
export class DefaultClientManager
84+
implements ClientManager, SchemaRegistryClientHandler
85+
{
6386
private confluentCloudBaseUrl: string;
6487
private confluentCloudFlinkBaseUrl: string;
6588
private confluentCloudSchemaRegistryBaseUrl: string;
@@ -79,27 +102,19 @@ export class DefaultClientManager implements ClientManager {
79102
private readonly confluentCloudKafkaRestClient: Lazy<
80103
Client<paths, `${string}/${string}`>
81104
>;
105+
private readonly schemaRegistryClient: Lazy<SchemaRegistryClient>;
106+
82107
/**
83108
* Creates a new DefaultClientManager instance.
84-
* @param config - Configuration options for KafkaJS client
85-
* @param confluentCloudBaseUrl - Base URL for Confluent Cloud REST API
86-
* @param confluentCloudFlinkBaseUrl - Base URL for Flink REST API
87-
* @param confluentCloudSchemaRegistryBaseUrl - Base URL for Schema Registry REST API
88-
* @param confluentCloudKafkaRestBaseUrl - Base URL for Kafka REST API
109+
* @param config - Configuration for all clients
89110
*/
90-
constructor(
91-
config: KafkaJS.CommonConstructorConfig,
92-
confluentCloudBaseUrl?: string,
93-
confluentCloudFlinkBaseUrl?: string,
94-
confluentCloudSchemaRegistryBaseUrl?: string,
95-
confluentCloudKafkaRestBaseUrl?: string,
96-
) {
97-
this.confluentCloudBaseUrl = confluentCloudBaseUrl || "";
98-
this.confluentCloudFlinkBaseUrl = confluentCloudFlinkBaseUrl || "";
99-
this.confluentCloudSchemaRegistryBaseUrl =
100-
confluentCloudSchemaRegistryBaseUrl || "";
101-
this.confluentCloudKafkaRestBaseUrl = confluentCloudKafkaRestBaseUrl || "";
102-
this.kafkaClient = new Lazy(() => new KafkaJS.Kafka(config));
111+
constructor(config: ClientManagerConfig) {
112+
this.confluentCloudBaseUrl = config.endpoints.cloud;
113+
this.confluentCloudFlinkBaseUrl = config.endpoints.flink;
114+
this.confluentCloudSchemaRegistryBaseUrl = config.endpoints.schemaRegistry;
115+
this.confluentCloudKafkaRestBaseUrl = config.endpoints.kafka;
116+
117+
this.kafkaClient = new Lazy(() => new KafkaJS.Kafka(config.kafka));
103118
this.adminClient = new AsyncLazy(
104119
async () => {
105120
console.error("Connecting Kafka Admin");
@@ -113,10 +128,8 @@ export class DefaultClientManager implements ClientManager {
113128
async () => {
114129
console.error("Connecting Kafka Producer");
115130
const producer = this.kafkaClient.get().producer({
116-
kafkaJS: {
117-
acks: 1,
118-
compression: KafkaJS.CompressionTypes.GZIP,
119-
},
131+
"compression.type": "gzip",
132+
"linger.ms": 5,
120133
});
121134
await producer.connect();
122135
return producer;
@@ -131,7 +144,7 @@ export class DefaultClientManager implements ClientManager {
131144
const client = createClient<paths>({
132145
baseUrl: this.confluentCloudBaseUrl,
133146
});
134-
client.use(confluentCloudAuthMiddleware);
147+
client.use(createAuthMiddleware(config.auth.cloud));
135148
return client;
136149
});
137150

@@ -142,7 +155,7 @@ export class DefaultClientManager implements ClientManager {
142155
const client = createClient<paths>({
143156
baseUrl: this.confluentCloudFlinkBaseUrl,
144157
});
145-
client.use(confluentCloudFlinkAuthMiddleware);
158+
client.use(createAuthMiddleware(config.auth.flink));
146159
return client;
147160
});
148161

@@ -153,7 +166,7 @@ export class DefaultClientManager implements ClientManager {
153166
const client = createClient<paths>({
154167
baseUrl: this.confluentCloudSchemaRegistryBaseUrl,
155168
});
156-
client.use(confluentCloudSchemaRegistryAuthMiddleware);
169+
client.use(createAuthMiddleware(config.auth.schemaRegistry));
157170
return client;
158171
});
159172

@@ -164,9 +177,35 @@ export class DefaultClientManager implements ClientManager {
164177
const client = createClient<paths>({
165178
baseUrl: this.confluentCloudKafkaRestBaseUrl,
166179
});
167-
client.use(confluentCloudKafkaAuthMiddleware);
180+
client.use(createAuthMiddleware(config.auth.kafka));
168181
return client;
169182
});
183+
184+
this.schemaRegistryClient = new Lazy(() => {
185+
const { apiKey, apiSecret } = config.auth.schemaRegistry;
186+
return new SchemaRegistryClient({
187+
baseURLs: [config.endpoints.schemaRegistry],
188+
basicAuthCredentials: {
189+
credentialsSource: "USER_INFO",
190+
userInfo: `${apiKey}:${apiSecret}`,
191+
},
192+
});
193+
});
194+
}
195+
196+
/** @inheritdoc */
197+
async getConsumer(sessionId?: string): Promise<KafkaJS.Consumer> {
198+
const baseGroupId = "mcp-confluent"; // should be configurable?
199+
const groupId = sessionId ? `${baseGroupId}-${sessionId}` : baseGroupId;
200+
console.error(`Creating new Kafka Consumer with groupId: ${groupId}`);
201+
return this.kafkaClient.get().consumer({
202+
kafkaJS: {
203+
fromBeginning: true,
204+
groupId,
205+
allowAutoTopicCreation: false,
206+
autoCommit: false,
207+
},
208+
});
170209
}
171210
/**
172211
* a function that sets a new confluent cloud rest endpoint.
@@ -190,10 +229,6 @@ export class DefaultClientManager implements ClientManager {
190229
this.confluentCloudKafkaRestBaseUrl = endpoint;
191230
}
192231

193-
getConsumer(): Promise<KafkaJS.Consumer> {
194-
throw new Error("Method not implemented.");
195-
}
196-
197232
/** @inheritdoc */
198233
getKafkaClient(): KafkaJS.Kafka {
199234
return this.kafkaClient.get();
@@ -238,4 +273,9 @@ export class DefaultClientManager implements ClientManager {
238273
await this.producer.close();
239274
this.kafkaClient.close();
240275
}
276+
277+
/** @inheritdoc */
278+
getSchemaRegistryClient(): SchemaRegistryClient {
279+
return this.schemaRegistryClient.get();
280+
}
241281
}

src/confluent/middleware.ts

Lines changed: 16 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,27 @@
1-
import env from "@src/env.js";
21
import { Middleware } from "openapi-fetch";
3-
/**
4-
* Middleware to add Authorization header to every request.
5-
*
6-
* This middleware intercepts each request and adds an Authorization header
7-
* using the Flink API key and secret from the environment variables.
8-
*
9-
* @param {Object} request - The request object.
10-
* @returns {Object} The modified request object with the Authorization header.
11-
*/
12-
export const confluentCloudFlinkAuthMiddleware: Middleware = {
13-
async onRequest({ request }) {
14-
console.error(`${JSON.stringify(request)}`);
15-
// add Authorization header to every request
16-
request.headers.set(
17-
"Authorization",
18-
`Basic ${Buffer.from(`${env.FLINK_API_KEY}:${env.FLINK_API_SECRET}`).toString("base64")}`,
19-
);
20-
return request;
21-
},
22-
};
232

24-
export const confluentCloudAuthMiddleware: Middleware = {
25-
async onRequest({ request }) {
26-
console.error(`${JSON.stringify(request)}`);
27-
// add Authorization header to every request
28-
request.headers.set(
29-
"Authorization",
30-
`Basic ${Buffer.from(`${env.CONFLUENT_CLOUD_API_KEY}:${env.CONFLUENT_CLOUD_API_SECRET}`).toString("base64")}`,
31-
);
32-
return request;
33-
},
34-
};
3+
export interface ConfluentEndpoints {
4+
cloud: string;
5+
flink: string;
6+
schemaRegistry: string;
7+
kafka: string;
8+
}
359

36-
export const confluentCloudSchemaRegistryAuthMiddleware: Middleware = {
37-
async onRequest({ request }) {
38-
console.error(`${JSON.stringify(request)}`);
39-
// add Authorization header to every request
40-
request.headers.set(
41-
"Authorization",
42-
`Basic ${Buffer.from(`${env.SCHEMA_REGISTRY_API_KEY}:${env.SCHEMA_REGISTRY_API_SECRET}`).toString("base64")}`,
43-
);
44-
return request;
45-
},
46-
};
10+
export interface ConfluentAuth {
11+
apiKey: string;
12+
apiSecret: string;
13+
}
4714

48-
export const confluentCloudKafkaAuthMiddleware: Middleware = {
15+
/**
16+
* Creates a middleware that adds Authorization header using the provided auth credentials
17+
*/
18+
export const createAuthMiddleware = (auth: ConfluentAuth): Middleware => ({
4919
async onRequest({ request }) {
5020
console.error(`${JSON.stringify(request)}`);
51-
// add Authorization header to every request
5221
request.headers.set(
5322
"Authorization",
54-
`Basic ${Buffer.from(`${env.KAFKA_API_KEY}:${env.KAFKA_API_SECRET}`).toString("base64")}`,
23+
`Basic ${Buffer.from(`${auth.apiKey}:${auth.apiSecret}`).toString("base64")}`,
5524
);
5625
return request;
5726
},
58-
};
27+
});

0 commit comments

Comments
 (0)