Skip to content

Commit 68a9168

Browse files
eddyvbhughes20
andauthored
chore(refactor): lazy loading of clients, factory pattern for supplying mcp tools, env loading (#4)
* Add zod-to-json-schema dep, add lazy class to help with instantiating kafka client(s) when required by mcp server. Seperate required env files from ones that the LLM can prompt for. * updates to schemas * checkpoint, add validation for environment vars vs provided parameters * Update handlers to check for provided arg or present in config * place bootstrap servers in env var requirements * TODO: add functionality for users to update baseUrl, bootstrap servers, etc... * factory pattern for mcp tools * update readme with instructions on how to add a new tool * Update src/confluent/tools/handlers/connect/list-connectors-handler.ts Co-authored-by: Bethan Hughes <74008833+bhughes20@users.noreply.github.com> * Update src/confluent/tools/handlers/connect/read-connectors-handler.ts Co-authored-by: Bethan Hughes <74008833+bhughes20@users.noreply.github.com> * Update README.md Co-authored-by: Bethan Hughes <74008833+bhughes20@users.noreply.github.com> * rename middleware for flink, fix typos * Optional vars can be loaded through env as well --------- Co-authored-by: Bethan Hughes <74008833+bhughes20@users.noreply.github.com>
1 parent d6e5f16 commit 68a9168

24 files changed

+1538
-1031
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,14 @@ If you need to regenerate the typescript types from the openapi schema, you can
6060
# as of v7.5.2 there is a bug when using allOf w/ required https://github.com/openapi-ts/openapi-typescript/issues/1474. need --empty-objects-unknown flag to avoid it
6161
npx openapi-typescript ./openapi.json -o ./src/confluent/openapi-schema.d.ts --empty-objects-unknown
6262
```
63+
64+
## Adding a new Tool
65+
66+
To add a new tool, you can follow these steps:
67+
68+
1. Add a new enum to the enum class `ToolName`
69+
2. Add your new tool to the handlers map in the `ToolFactory` class.
70+
3. Create a new file, exporting the class that extends `BaseToolHandler`.
71+
1. Implement the `handle` method of the base class.
72+
2. Implement the `getToolConfig` method of the base class.
73+
4. Once satisfied, add it to the set of `enabledTools` in `index.ts`.

package-lock.json

Lines changed: 262 additions & 225 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
"@confluentinc/kafka-javascript": "^1.0.0",
3737
"@modelcontextprotocol/sdk": "^1.0.4",
3838
"openapi-fetch": "^0.13.4",
39-
"zod": "^3.24.0"
39+
"zod": "^3.24.0",
40+
"zod-to-json-schema": "^3.24.1"
4041
},
4142
"files": [
4243
"dist"

src/confluent/client-manager.ts

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/**
2+
* @fileoverview Provides client management functionality for Kafka and Confluent Cloud services.
3+
*/
4+
5+
import { KafkaJS } from "@confluentinc/kafka-javascript";
6+
import {
7+
confluentCloudAuthMiddleware,
8+
confluentCloudFlinkAuthMiddleware,
9+
} from "@src/confluent/middleware.js";
10+
import { paths } from "@src/confluent/openapi-schema.js";
11+
import { AsyncLazy, Lazy } from "@src/lazy.js";
12+
import createClient, { Client } from "openapi-fetch";
13+
14+
/**
15+
* Interface for managing Kafka client connections and operations.
16+
*/
17+
export interface KafkaClientManager {
18+
/** Gets the main Kafka client instance */
19+
getKafkaClient(): KafkaJS.Kafka;
20+
/** Gets a connected admin client for Kafka administration operations */
21+
getAdminClient(): Promise<KafkaJS.Admin>;
22+
/** Gets a connected producer client for publishing messages */
23+
getProducer(): Promise<KafkaJS.Producer>;
24+
/** Gets a connected consumer client for subscribing to topics */
25+
getConsumer(): Promise<KafkaJS.Consumer>;
26+
/** Disconnects and cleans up all client connections */
27+
disconnect(): Promise<void>;
28+
}
29+
30+
/**
31+
* Interface for managing Confluent Cloud REST client connections.
32+
*/
33+
export interface ConfluentCloudRestClientManager {
34+
/** Gets a configured REST client for Confluent Cloud Flink operations */
35+
getConfluentCloudFlinkRestClient(): Client<paths, `${string}/${string}`>;
36+
/** Gets a configured REST client for general Confluent Cloud operations */
37+
getConfluentCloudRestClient(): Client<paths, `${string}/${string}`>;
38+
}
39+
40+
export interface ClientManager
41+
extends KafkaClientManager,
42+
ConfluentCloudRestClientManager {}
43+
44+
/**
45+
* Default implementation of client management for Kafka and Confluent Cloud services.
46+
* Manages lifecycle and lazy initialization of various client connections.
47+
*/
48+
export class DefaultClientManager implements ClientManager {
49+
private readonly kafkaClient: Lazy<KafkaJS.Kafka>;
50+
private readonly adminClient: AsyncLazy<KafkaJS.Admin>;
51+
private readonly producer: AsyncLazy<KafkaJS.Producer>;
52+
private readonly confluentCloudFlinkRestClient: Lazy<
53+
Client<paths, `${string}/${string}`>
54+
>;
55+
private readonly confluentCloudRestClient: Lazy<
56+
Client<paths, `${string}/${string}`>
57+
>;
58+
59+
/**
60+
* Creates a new DefaultClientManager instance.
61+
* @param config - Configuration options for KafkaJS client
62+
* @param confluentCloudBaseUrl - Base URL for Confluent Cloud REST API
63+
* @param confluentCloudFlinkBaseUrl - Base URL for Flink REST API
64+
*/
65+
constructor(
66+
config: KafkaJS.CommonConstructorConfig,
67+
confluentCloudBaseUrl?: string,
68+
confluentCloudFlinkBaseUrl?: string,
69+
) {
70+
this.kafkaClient = new Lazy(() => new KafkaJS.Kafka(config));
71+
this.adminClient = new AsyncLazy(
72+
async () => {
73+
console.error("Connecting Kafka Admin");
74+
const admin = this.kafkaClient.get().admin();
75+
await admin.connect();
76+
return admin;
77+
},
78+
(admin) => admin.disconnect(),
79+
);
80+
this.producer = new AsyncLazy(
81+
async () => {
82+
console.error("Connecting Kafka Producer");
83+
const producer = this.kafkaClient.get().producer({
84+
kafkaJS: {
85+
acks: 1,
86+
compression: KafkaJS.CompressionTypes.GZIP,
87+
},
88+
});
89+
await producer.connect();
90+
return producer;
91+
},
92+
(producer) => producer.disconnect(),
93+
);
94+
95+
this.confluentCloudRestClient = new Lazy(() => {
96+
console.error(
97+
`Initializing Confluent Cloud REST client for base URL ${confluentCloudBaseUrl}`,
98+
);
99+
const client = createClient<paths>({
100+
baseUrl: confluentCloudBaseUrl,
101+
});
102+
client.use(confluentCloudAuthMiddleware);
103+
return client;
104+
});
105+
106+
this.confluentCloudFlinkRestClient = new Lazy(() => {
107+
console.error(
108+
`Initializing Confluent Cloud Flink REST client for base URL ${confluentCloudFlinkBaseUrl}`,
109+
);
110+
const client = createClient<paths>({
111+
baseUrl: confluentCloudFlinkBaseUrl,
112+
});
113+
client.use(confluentCloudFlinkAuthMiddleware);
114+
return client;
115+
});
116+
}
117+
getConsumer(): Promise<KafkaJS.Consumer> {
118+
throw new Error("Method not implemented.");
119+
}
120+
121+
/** @inheritdoc */
122+
getKafkaClient(): KafkaJS.Kafka {
123+
return this.kafkaClient.get();
124+
}
125+
126+
/** @inheritdoc */
127+
getConfluentCloudFlinkRestClient(): Client<paths, `${string}/${string}`> {
128+
return this.confluentCloudFlinkRestClient.get();
129+
}
130+
131+
/** @inheritdoc */
132+
getConfluentCloudRestClient(): Client<paths, `${string}/${string}`> {
133+
return this.confluentCloudRestClient.get();
134+
}
135+
136+
/** @inheritdoc */
137+
async getAdminClient(): Promise<KafkaJS.Admin> {
138+
return this.adminClient.get();
139+
}
140+
141+
/** @inheritdoc */
142+
async getProducer(): Promise<KafkaJS.Producer> {
143+
return this.producer.get();
144+
}
145+
146+
/** @inheritdoc */
147+
async disconnect(): Promise<void> {
148+
await this.adminClient.close();
149+
await this.producer.close();
150+
this.kafkaClient.close();
151+
}
152+
}

0 commit comments

Comments
 (0)