3
3
*/
4
4
5
5
import { KafkaJS } from "@confluentinc/kafka-javascript" ;
6
+ import { SchemaRegistryClient } from "@confluentinc/schemaregistry" ;
6
7
import {
7
- confluentCloudAuthMiddleware ,
8
- confluentCloudFlinkAuthMiddleware ,
9
- confluentCloudKafkaAuthMiddleware ,
10
- confluentCloudSchemaRegistryAuthMiddleware ,
8
+ ConfluentAuth ,
9
+ ConfluentEndpoints ,
10
+ createAuthMiddleware ,
11
11
} from "@src/confluent/middleware.js" ;
12
12
import { paths } from "@src/confluent/openapi-schema.js" ;
13
13
import { AsyncLazy , Lazy } from "@src/lazy.js" ;
@@ -24,7 +24,7 @@ export interface KafkaClientManager {
24
24
/** Gets a connected producer client for publishing messages */
25
25
getProducer ( ) : Promise < KafkaJS . Producer > ;
26
26
/** Gets a connected consumer client for subscribing to topics */
27
- getConsumer ( ) : Promise < KafkaJS . Consumer > ;
27
+ getConsumer ( sessionId ?: string ) : Promise < KafkaJS . Consumer > ;
28
28
/** Disconnects and cleans up all client connections */
29
29
disconnect ( ) : Promise < void > ;
30
30
}
@@ -51,15 +51,38 @@ export interface ConfluentCloudRestClientManager {
51
51
setConfluentCloudKafkaRestEndpoint ( endpoint : string ) : void ;
52
52
}
53
53
54
+ /**
55
+ * Interface for managing Schema Registry client connections.
56
+ */
57
+ export interface SchemaRegistryClientHandler {
58
+ getSchemaRegistryClient ( ) : SchemaRegistryClient ;
59
+ }
60
+
54
61
export interface ClientManager
55
62
extends KafkaClientManager ,
56
- ConfluentCloudRestClientManager { }
63
+ ConfluentCloudRestClientManager ,
64
+ SchemaRegistryClientHandler {
65
+ getSchemaRegistryClient ( ) : SchemaRegistryClient ;
66
+ }
67
+
68
+ export interface ClientManagerConfig {
69
+ kafka : KafkaJS . CommonConstructorConfig ;
70
+ endpoints : ConfluentEndpoints ;
71
+ auth : {
72
+ cloud : ConfluentAuth ;
73
+ flink : ConfluentAuth ;
74
+ schemaRegistry : ConfluentAuth ;
75
+ kafka : ConfluentAuth ;
76
+ } ;
77
+ }
57
78
58
79
/**
59
80
* Default implementation of client management for Kafka and Confluent Cloud services.
60
81
* Manages lifecycle and lazy initialization of various client connections.
61
82
*/
62
- export class DefaultClientManager implements ClientManager {
83
+ export class DefaultClientManager
84
+ implements ClientManager , SchemaRegistryClientHandler
85
+ {
63
86
private confluentCloudBaseUrl : string ;
64
87
private confluentCloudFlinkBaseUrl : string ;
65
88
private confluentCloudSchemaRegistryBaseUrl : string ;
@@ -79,27 +102,19 @@ export class DefaultClientManager implements ClientManager {
79
102
private readonly confluentCloudKafkaRestClient : Lazy <
80
103
Client < paths , `${string } /${string } `>
81
104
> ;
105
+ private readonly schemaRegistryClient : Lazy < SchemaRegistryClient > ;
106
+
82
107
/**
83
108
* Creates a new DefaultClientManager instance.
84
- * @param config - Configuration options for KafkaJS client
85
- * @param confluentCloudBaseUrl - Base URL for Confluent Cloud REST API
86
- * @param confluentCloudFlinkBaseUrl - Base URL for Flink REST API
87
- * @param confluentCloudSchemaRegistryBaseUrl - Base URL for Schema Registry REST API
88
- * @param confluentCloudKafkaRestBaseUrl - Base URL for Kafka REST API
109
+ * @param config - Configuration for all clients
89
110
*/
90
- constructor (
91
- config : KafkaJS . CommonConstructorConfig ,
92
- confluentCloudBaseUrl ?: string ,
93
- confluentCloudFlinkBaseUrl ?: string ,
94
- confluentCloudSchemaRegistryBaseUrl ?: string ,
95
- confluentCloudKafkaRestBaseUrl ?: string ,
96
- ) {
97
- this . confluentCloudBaseUrl = confluentCloudBaseUrl || "" ;
98
- this . confluentCloudFlinkBaseUrl = confluentCloudFlinkBaseUrl || "" ;
99
- this . confluentCloudSchemaRegistryBaseUrl =
100
- confluentCloudSchemaRegistryBaseUrl || "" ;
101
- this . confluentCloudKafkaRestBaseUrl = confluentCloudKafkaRestBaseUrl || "" ;
102
- this . kafkaClient = new Lazy ( ( ) => new KafkaJS . Kafka ( config ) ) ;
111
+ constructor ( config : ClientManagerConfig ) {
112
+ this . confluentCloudBaseUrl = config . endpoints . cloud ;
113
+ this . confluentCloudFlinkBaseUrl = config . endpoints . flink ;
114
+ this . confluentCloudSchemaRegistryBaseUrl = config . endpoints . schemaRegistry ;
115
+ this . confluentCloudKafkaRestBaseUrl = config . endpoints . kafka ;
116
+
117
+ this . kafkaClient = new Lazy ( ( ) => new KafkaJS . Kafka ( config . kafka ) ) ;
103
118
this . adminClient = new AsyncLazy (
104
119
async ( ) => {
105
120
console . error ( "Connecting Kafka Admin" ) ;
@@ -113,10 +128,8 @@ export class DefaultClientManager implements ClientManager {
113
128
async ( ) => {
114
129
console . error ( "Connecting Kafka Producer" ) ;
115
130
const producer = this . kafkaClient . get ( ) . producer ( {
116
- kafkaJS : {
117
- acks : 1 ,
118
- compression : KafkaJS . CompressionTypes . GZIP ,
119
- } ,
131
+ "compression.type" : "gzip" ,
132
+ "linger.ms" : 5 ,
120
133
} ) ;
121
134
await producer . connect ( ) ;
122
135
return producer ;
@@ -131,7 +144,7 @@ export class DefaultClientManager implements ClientManager {
131
144
const client = createClient < paths > ( {
132
145
baseUrl : this . confluentCloudBaseUrl ,
133
146
} ) ;
134
- client . use ( confluentCloudAuthMiddleware ) ;
147
+ client . use ( createAuthMiddleware ( config . auth . cloud ) ) ;
135
148
return client ;
136
149
} ) ;
137
150
@@ -142,7 +155,7 @@ export class DefaultClientManager implements ClientManager {
142
155
const client = createClient < paths > ( {
143
156
baseUrl : this . confluentCloudFlinkBaseUrl ,
144
157
} ) ;
145
- client . use ( confluentCloudFlinkAuthMiddleware ) ;
158
+ client . use ( createAuthMiddleware ( config . auth . flink ) ) ;
146
159
return client ;
147
160
} ) ;
148
161
@@ -153,7 +166,7 @@ export class DefaultClientManager implements ClientManager {
153
166
const client = createClient < paths > ( {
154
167
baseUrl : this . confluentCloudSchemaRegistryBaseUrl ,
155
168
} ) ;
156
- client . use ( confluentCloudSchemaRegistryAuthMiddleware ) ;
169
+ client . use ( createAuthMiddleware ( config . auth . schemaRegistry ) ) ;
157
170
return client ;
158
171
} ) ;
159
172
@@ -164,9 +177,35 @@ export class DefaultClientManager implements ClientManager {
164
177
const client = createClient < paths > ( {
165
178
baseUrl : this . confluentCloudKafkaRestBaseUrl ,
166
179
} ) ;
167
- client . use ( confluentCloudKafkaAuthMiddleware ) ;
180
+ client . use ( createAuthMiddleware ( config . auth . kafka ) ) ;
168
181
return client ;
169
182
} ) ;
183
+
184
+ this . schemaRegistryClient = new Lazy ( ( ) => {
185
+ const { apiKey, apiSecret } = config . auth . schemaRegistry ;
186
+ return new SchemaRegistryClient ( {
187
+ baseURLs : [ config . endpoints . schemaRegistry ] ,
188
+ basicAuthCredentials : {
189
+ credentialsSource : "USER_INFO" ,
190
+ userInfo : `${ apiKey } :${ apiSecret } ` ,
191
+ } ,
192
+ } ) ;
193
+ } ) ;
194
+ }
195
+
196
+ /** @inheritdoc */
197
+ async getConsumer ( sessionId ?: string ) : Promise < KafkaJS . Consumer > {
198
+ const baseGroupId = "mcp-confluent" ; // should be configurable?
199
+ const groupId = sessionId ? `${ baseGroupId } -${ sessionId } ` : baseGroupId ;
200
+ console . error ( `Creating new Kafka Consumer with groupId: ${ groupId } ` ) ;
201
+ return this . kafkaClient . get ( ) . consumer ( {
202
+ kafkaJS : {
203
+ fromBeginning : true ,
204
+ groupId,
205
+ allowAutoTopicCreation : false ,
206
+ autoCommit : false ,
207
+ } ,
208
+ } ) ;
170
209
}
171
210
/**
172
211
* a function that sets a new confluent cloud rest endpoint.
@@ -190,10 +229,6 @@ export class DefaultClientManager implements ClientManager {
190
229
this . confluentCloudKafkaRestBaseUrl = endpoint ;
191
230
}
192
231
193
- getConsumer ( ) : Promise < KafkaJS . Consumer > {
194
- throw new Error ( "Method not implemented." ) ;
195
- }
196
-
197
232
/** @inheritdoc */
198
233
getKafkaClient ( ) : KafkaJS . Kafka {
199
234
return this . kafkaClient . get ( ) ;
@@ -238,4 +273,9 @@ export class DefaultClientManager implements ClientManager {
238
273
await this . producer . close ( ) ;
239
274
this . kafkaClient . close ( ) ;
240
275
}
276
+
277
+ /** @inheritdoc */
278
+ getSchemaRegistryClient ( ) : SchemaRegistryClient {
279
+ return this . schemaRegistryClient . get ( ) ;
280
+ }
241
281
}
0 commit comments