Skip to content

Commit 57eadc4

Browse files
feat: Add CreateTopicTagsHandler and SearchTopicsByTagHandler (#9)
* adding schema registry info to be able to search for pii tag * updates * feat: add new schema registry tag tools and improve error handling, Add tag creation handler, Add topic search by tag handler, Improve error handling consistency, Fix schema registry secret env var name, Enable all available tools by default * remove claude desktop config --------- Co-authored-by: Niamh Thornbury <nthornbury@confluent.io>
1 parent 34dfb8f commit 57eadc4

21 files changed

+737
-15
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,7 @@ dist
132132

133133
# vscode
134134
.vscode
135-
.idea
135+
.idea
136+
.continue
137+
*.txt
138+
connector-config.json

src/confluent/client-manager.ts

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { KafkaJS } from "@confluentinc/kafka-javascript";
66
import {
77
confluentCloudAuthMiddleware,
88
confluentCloudFlinkAuthMiddleware,
9+
confluentCloudKafkaAuthMiddleware,
10+
confluentCloudSchemaRegistryAuthMiddleware,
911
} from "@src/confluent/middleware.js";
1012
import { paths } from "@src/confluent/openapi-schema.js";
1113
import { AsyncLazy, Lazy } from "@src/lazy.js";
@@ -35,9 +37,18 @@ export interface ConfluentCloudRestClientManager {
3537
getConfluentCloudFlinkRestClient(): Client<paths, `${string}/${string}`>;
3638
/** Gets a configured REST client for general Confluent Cloud operations */
3739
getConfluentCloudRestClient(): Client<paths, `${string}/${string}`>;
40+
/** Gets a configured REST client for Confluent Cloud Schema Registry operations */
41+
getConfluentCloudSchemaRegistryRestClient(): Client<
42+
paths,
43+
`${string}/${string}`
44+
>;
45+
/** Gets a configured REST client for Confluent Cloud Kafka operations */
46+
getConfluentCloudKafkaRestClient(): Client<paths, `${string}/${string}`>;
3847

3948
setConfluentCloudRestEndpoint(endpoint: string): void;
4049
setConfluentCloudFlinkEndpoint(endpoint: string): void;
50+
setConfluentCloudSchemaRegistryEndpoint(endpoint: string): void;
51+
setConfluentCloudKafkaRestEndpoint(endpoint: string): void;
4152
}
4253

4354
export interface ClientManager
@@ -51,6 +62,8 @@ export interface ClientManager
5162
export class DefaultClientManager implements ClientManager {
5263
private confluentCloudBaseUrl: string;
5364
private confluentCloudFlinkBaseUrl: string;
65+
private confluentCloudSchemaRegistryBaseUrl: string;
66+
private confluentCloudKafkaRestBaseUrl: string;
5467
private readonly kafkaClient: Lazy<KafkaJS.Kafka>;
5568
private readonly adminClient: AsyncLazy<KafkaJS.Admin>;
5669
private readonly producer: AsyncLazy<KafkaJS.Producer>;
@@ -60,20 +73,31 @@ export class DefaultClientManager implements ClientManager {
6073
private readonly confluentCloudRestClient: Lazy<
6174
Client<paths, `${string}/${string}`>
6275
>;
63-
76+
private readonly confluentCloudSchemaRegistryRestClient: Lazy<
77+
Client<paths, `${string}/${string}`>
78+
>;
79+
private readonly confluentCloudKafkaRestClient: Lazy<
80+
Client<paths, `${string}/${string}`>
81+
>;
6482
/**
6583
* Creates a new DefaultClientManager instance.
6684
* @param config - Configuration options for KafkaJS client
6785
* @param confluentCloudBaseUrl - Base URL for Confluent Cloud REST API
6886
* @param confluentCloudFlinkBaseUrl - Base URL for Flink REST API
87+
* @param confluentCloudSchemaRegistryBaseUrl - Base URl for Schema Registry REST API
6988
*/
7089
constructor(
7190
config: KafkaJS.CommonConstructorConfig,
7291
confluentCloudBaseUrl?: string,
7392
confluentCloudFlinkBaseUrl?: string,
93+
confluentCloudSchemaRegistryBaseUrl?: string,
94+
confluentCloudKafkaRestBaseUrl?: string,
7495
) {
7596
this.confluentCloudBaseUrl = confluentCloudBaseUrl || "";
7697
this.confluentCloudFlinkBaseUrl = confluentCloudFlinkBaseUrl || "";
98+
this.confluentCloudSchemaRegistryBaseUrl =
99+
confluentCloudSchemaRegistryBaseUrl || "";
100+
this.confluentCloudKafkaRestBaseUrl = confluentCloudKafkaRestBaseUrl || "";
77101
this.kafkaClient = new Lazy(() => new KafkaJS.Kafka(config));
78102
this.adminClient = new AsyncLazy(
79103
async () => {
@@ -120,6 +144,28 @@ export class DefaultClientManager implements ClientManager {
120144
client.use(confluentCloudFlinkAuthMiddleware);
121145
return client;
122146
});
147+
148+
this.confluentCloudSchemaRegistryRestClient = new Lazy(() => {
149+
console.error(
150+
`Initializing Confluent Cloud Schema Registry REST client for base URL ${this.confluentCloudSchemaRegistryBaseUrl}`,
151+
);
152+
const client = createClient<paths>({
153+
baseUrl: this.confluentCloudSchemaRegistryBaseUrl,
154+
});
155+
client.use(confluentCloudSchemaRegistryAuthMiddleware);
156+
return client;
157+
});
158+
159+
this.confluentCloudKafkaRestClient = new Lazy(() => {
160+
console.error(
161+
`Initializing Confluent Cloud Kafka REST client for base URL ${this.confluentCloudKafkaRestBaseUrl}`,
162+
);
163+
const client = createClient<paths>({
164+
baseUrl: this.confluentCloudKafkaRestBaseUrl,
165+
});
166+
client.use(confluentCloudKafkaAuthMiddleware);
167+
return client;
168+
});
123169
}
124170
/**
125171
* a function that sets a new confluent cloud rest endpoint.
@@ -134,6 +180,15 @@ export class DefaultClientManager implements ClientManager {
134180
this.confluentCloudFlinkRestClient.close();
135181
this.confluentCloudFlinkBaseUrl = endpoint;
136182
}
183+
setConfluentCloudSchemaRegistryEndpoint(endpoint: string): void {
184+
this.confluentCloudSchemaRegistryRestClient.close();
185+
this.confluentCloudSchemaRegistryBaseUrl = endpoint;
186+
}
187+
setConfluentCloudKafkaRestEndpoint(endpoint: string): void {
188+
this.confluentCloudKafkaRestClient.close();
189+
this.confluentCloudKafkaRestBaseUrl = endpoint;
190+
}
191+
137192
getConsumer(): Promise<KafkaJS.Consumer> {
138193
throw new Error("Method not implemented.");
139194
}
@@ -153,6 +208,19 @@ export class DefaultClientManager implements ClientManager {
153208
return this.confluentCloudRestClient.get();
154209
}
155210

211+
/** @inheritdoc */
212+
getConfluentCloudSchemaRegistryRestClient(): Client<
213+
paths,
214+
`${string}/${string}`
215+
> {
216+
return this.confluentCloudSchemaRegistryRestClient.get();
217+
}
218+
219+
/** @inheritdoc */
220+
getConfluentCloudKafkaRestClient(): Client<paths, `${string}/${string}`> {
221+
return this.confluentCloudKafkaRestClient.get();
222+
}
223+
156224
/** @inheritdoc */
157225
async getAdminClient(): Promise<KafkaJS.Admin> {
158226
return this.adminClient.get();

src/confluent/middleware.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,27 @@ export const confluentCloudAuthMiddleware: Middleware = {
3232
return request;
3333
},
3434
};
35+
36+
export const confluentCloudSchemaRegistryAuthMiddleware: Middleware = {
37+
async onRequest({ request }) {
38+
console.error(`${JSON.stringify(request)}`);
39+
// add Authorization header to every request
40+
request.headers.set(
41+
"Authorization",
42+
`Basic ${Buffer.from(`${env.SCHEMA_REGISTRY_API_KEY}:${env.SCHEMA_REGISTRY_API_SECRET}`).toString("base64")}`,
43+
);
44+
return request;
45+
},
46+
};
47+
48+
export const confluentCloudKafkaAuthMiddleware: Middleware = {
49+
async onRequest({ request }) {
50+
console.error(`${JSON.stringify(request)}`);
51+
// add Authorization header to every request
52+
request.headers.set(
53+
"Authorization",
54+
`Basic ${Buffer.from(`${env.KAFKA_API_KEY}:${env.KAFKA_API_SECRET}`).toString("base64")}`,
55+
);
56+
return request;
57+
},
58+
};
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { ClientManager } from "@src/confluent/client-manager.js";
2+
import { CallToolResult } from "@src/confluent/schema.js";
3+
import {
4+
BaseToolHandler,
5+
ToolConfig,
6+
} from "@src/confluent/tools/base-tools.js";
7+
import { ToolName } from "@src/confluent/tools/tool-name.js";
8+
import env from "@src/env.js";
9+
import { wrapAsPathBasedClient } from "openapi-fetch";
10+
import { z } from "zod";
11+
12+
const addTagToTopicArguments = z.object({
13+
baseUrl: z
14+
.string()
15+
.describe("The base URL of the Schema Registry REST API.")
16+
.url()
17+
.default(env.SCHEMA_REGISTRY_ENDPOINT ?? "")
18+
.optional(),
19+
tagAssignments: z
20+
.array(
21+
z.object({
22+
entityType: z.string().default("kafka_topic"),
23+
entityName: z
24+
.string()
25+
.describe(
26+
`Qualified name of the entity. If not provided, you can obtain it from using the ${ToolName.SEARCH_TOPICS_BY_TAG} tool. example: "lsrc-g2p81:lkc-xq8k7g:my-flights"`,
27+
),
28+
typeName: z.string().describe("Name of the tag to assign"),
29+
}),
30+
)
31+
.nonempty()
32+
.describe("Array of tag assignments to create"),
33+
});
34+
35+
export class AddTagToTopicHandler extends BaseToolHandler {
36+
async handle(
37+
clientManager: ClientManager,
38+
toolArguments: Record<string, unknown>,
39+
): Promise<CallToolResult> {
40+
const { tagAssignments, baseUrl } =
41+
addTagToTopicArguments.parse(toolArguments);
42+
43+
if (baseUrl !== undefined && baseUrl !== "") {
44+
clientManager.setConfluentCloudSchemaRegistryEndpoint(baseUrl);
45+
}
46+
47+
const pathBasedClient = wrapAsPathBasedClient(
48+
clientManager.getConfluentCloudSchemaRegistryRestClient(),
49+
);
50+
51+
const { data: response, error } = await pathBasedClient[
52+
"/catalog/v1/entity/tags"
53+
].POST({
54+
body: tagAssignments,
55+
});
56+
57+
if (error) {
58+
return this.createResponse(
59+
`Failed to assign tag: ${JSON.stringify(error)}`,
60+
true,
61+
);
62+
}
63+
return this.createResponse(
64+
`Successfully assigned tag: ${JSON.stringify(response)}`,
65+
);
66+
}
67+
68+
getToolConfig(): ToolConfig {
69+
return {
70+
name: ToolName.ADD_TAGS_TO_TOPIC,
71+
description: "Assign existing tags to Kafka topics in Confluent Cloud.",
72+
inputSchema: addTagToTopicArguments.shape,
73+
};
74+
}
75+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { ClientManager } from "@src/confluent/client-manager.js";
2+
import { CallToolResult } from "@src/confluent/schema.js";
3+
import {
4+
BaseToolHandler,
5+
ToolConfig,
6+
} from "@src/confluent/tools/base-tools.js";
7+
import { ToolName } from "@src/confluent/tools/tool-name.js";
8+
import env from "@src/env.js";
9+
import { wrapAsPathBasedClient } from "openapi-fetch";
10+
import { z } from "zod";
11+
12+
const createTagsArguments = z.object({
13+
baseUrl: z
14+
.string()
15+
.describe("The base URL of the Schema Registry REST API.")
16+
.url()
17+
.default(env.SCHEMA_REGISTRY_ENDPOINT ?? "")
18+
.optional(),
19+
tags: z
20+
.array(
21+
z.object({
22+
tagName: z.string().describe("Name of the tag to create").nonempty(),
23+
description: z
24+
.string()
25+
.describe("Description for the tag")
26+
.default("Tag created via API"),
27+
}),
28+
)
29+
.nonempty()
30+
.describe("Array of tag definitions to create"),
31+
});
32+
33+
export class CreateTopicTagsHandler extends BaseToolHandler {
34+
async handle(
35+
clientManager: ClientManager,
36+
toolArguments: Record<string, unknown>,
37+
): Promise<CallToolResult> {
38+
const { tags, baseUrl } = createTagsArguments.parse(toolArguments);
39+
40+
if (baseUrl !== undefined && baseUrl !== "") {
41+
clientManager.setConfluentCloudSchemaRegistryEndpoint(baseUrl);
42+
}
43+
44+
const pathBasedClient = wrapAsPathBasedClient(
45+
clientManager.getConfluentCloudSchemaRegistryRestClient(),
46+
);
47+
48+
const tagDefinitions = tags.map((tag) => ({
49+
entityTypes: ["kafka_topic"],
50+
name: tag.tagName,
51+
description: tag.description,
52+
}));
53+
54+
const { data: response, error } = await pathBasedClient[
55+
"/catalog/v1/types/tagdefs"
56+
].POST({
57+
body: tagDefinitions,
58+
});
59+
60+
if (error) {
61+
return this.createResponse(
62+
`Failed to create tag: ${JSON.stringify(error)}`,
63+
true,
64+
);
65+
}
66+
return this.createResponse(
67+
`Successfully created tag: ${JSON.stringify(response)}`,
68+
);
69+
}
70+
71+
getToolConfig(): ToolConfig {
72+
return {
73+
name: ToolName.CREATE_TOPIC_TAGS,
74+
description: "Create new tag definitions in Confluent Cloud.",
75+
inputSchema: createTagsArguments.shape,
76+
};
77+
}
78+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { ClientManager } from "@src/confluent/client-manager.js";
2+
import { CallToolResult } from "@src/confluent/schema.js";
3+
import {
4+
BaseToolHandler,
5+
ToolConfig,
6+
} from "@src/confluent/tools/base-tools.js";
7+
import { ToolName } from "@src/confluent/tools/tool-name.js";
8+
import env from "@src/env.js";
9+
import { wrapAsPathBasedClient } from "openapi-fetch";
10+
import { z } from "zod";
11+
12+
const deleteTagArguments = z.object({
13+
baseUrl: z
14+
.string()
15+
.describe("The base URL of the Schema Registry REST API.")
16+
.url()
17+
.default(env.SCHEMA_REGISTRY_ENDPOINT ?? "")
18+
.optional(),
19+
tagName: z.string().describe("Name of the tag to delete").nonempty(),
20+
});
21+
22+
export class DeleteTagHandler extends BaseToolHandler {
23+
async handle(
24+
clientManager: ClientManager,
25+
toolArguments: Record<string, unknown>,
26+
): Promise<CallToolResult> {
27+
const { tagName, baseUrl } = deleteTagArguments.parse(toolArguments);
28+
29+
if (baseUrl !== undefined && baseUrl !== "") {
30+
clientManager.setConfluentCloudSchemaRegistryEndpoint(baseUrl);
31+
}
32+
33+
const pathBasedClient = wrapAsPathBasedClient(
34+
clientManager.getConfluentCloudSchemaRegistryRestClient(),
35+
);
36+
37+
const { response, error } = await pathBasedClient[
38+
"/catalog/v1/types/tagdefs/{tagName}"
39+
].DELETE({
40+
params: {
41+
path: {
42+
tagName: tagName,
43+
},
44+
},
45+
});
46+
47+
if (error) {
48+
return this.createResponse(
49+
`Failed to delete tag: ${JSON.stringify(error)}`,
50+
true,
51+
);
52+
}
53+
return this.createResponse(
54+
`Successfully deleted tag: ${tagName}. Status: ${response?.status}`,
55+
);
56+
}
57+
58+
getToolConfig(): ToolConfig {
59+
return {
60+
name: ToolName.DELETE_TAG,
61+
description: "Delete a tag definition from Confluent Cloud.",
62+
inputSchema: deleteTagArguments.shape,
63+
};
64+
}
65+
}

0 commit comments

Comments
 (0)