1
1
package com .provectus .kafka .ui .serdes .builtin .sr ;
2
2
3
+ import static com .provectus .kafka .ui .serdes .builtin .sr .Serialize .serializeAvro ;
4
+ import static com .provectus .kafka .ui .serdes .builtin .sr .Serialize .serializeJson ;
5
+ import static com .provectus .kafka .ui .serdes .builtin .sr .Serialize .serializeProto ;
3
6
import static io .confluent .kafka .serializers .AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
4
7
import static io .confluent .kafka .serializers .AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ;
5
8
6
9
import com .google .common .annotations .VisibleForTesting ;
7
10
import com .provectus .kafka .ui .exception .ValidationException ;
8
11
import com .provectus .kafka .ui .serde .api .DeserializeResult ;
9
12
import com .provectus .kafka .ui .serde .api .PropertyResolver ;
10
- import com .provectus .kafka .ui .serde .api .RecordHeaders ;
11
13
import com .provectus .kafka .ui .serde .api .SchemaDescription ;
12
14
import com .provectus .kafka .ui .serdes .BuiltInSerde ;
13
15
import com .provectus .kafka .ui .util .jsonschema .AvroJsonSchemaConverter ;
32
34
import java .util .Optional ;
33
35
import java .util .concurrent .Callable ;
34
36
import javax .annotation .Nullable ;
35
- import lombok .RequiredArgsConstructor ;
36
37
import lombok .SneakyThrows ;
37
38
import org .apache .kafka .common .config .SslConfigs ;
38
39
39
40
40
41
public class SchemaRegistrySerde implements BuiltInSerde {
41
42
43
+ private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0 ;
44
+ private static final int SR_PAYLOAD_PREFIX_LENGTH = 5 ;
45
+
42
46
public static String name () {
43
47
return "SchemaRegistry" ;
44
48
}
@@ -221,8 +225,8 @@ private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) {
221
225
.convert (basePath , ((AvroSchema ) parsedSchema ).rawSchema ())
222
226
.toJson ();
223
227
case JSON ->
224
- //need to use confluent JsonSchema since it includes resolved references
225
- ((JsonSchema ) parsedSchema ).rawSchema ().toString ();
228
+ //need to use confluent JsonSchema since it includes resolved references
229
+ ((JsonSchema ) parsedSchema ).rawSchema ().toString ();
226
230
};
227
231
}
228
232
@@ -254,35 +258,27 @@ private String schemaSubject(String topic, Target type) {
254
258
@ Override
255
259
public Serializer serializer (String topic , Target type ) {
256
260
String subject = schemaSubject (topic , type );
257
- var schema = getSchemaBySubject (subject )
258
- .orElseThrow (() -> new ValidationException (String .format ("No schema for subject '%s' found" , subject )));
259
- boolean isKey = type == Target .KEY ;
260
- SchemaType schemaType = SchemaType .fromString (schema .getSchemaType ())
261
- .orElseThrow (() -> new IllegalStateException ("Unknown schema type: " + schema .getSchemaType ()));
261
+ SchemaMetadata meta = getSchemaBySubject (subject )
262
+ .orElseThrow (() -> new ValidationException (
263
+ String .format ("No schema for subject '%s' found" , subject )));
264
+ ParsedSchema schema = getSchemaById (meta .getId ())
265
+ .orElseThrow (() -> new IllegalStateException (
266
+ String .format ("Schema found for id %s, subject '%s'" , meta .getId (), subject )));
267
+ SchemaType schemaType = SchemaType .fromString (meta .getSchemaType ())
268
+ .orElseThrow (() -> new IllegalStateException ("Unknown schema type: " + meta .getSchemaType ()));
262
269
return switch (schemaType ) {
263
- case PROTOBUF -> new ProtobufSchemaRegistrySerializer (topic , isKey , schemaRegistryClient , schema );
264
- case AVRO -> new AvroSchemaRegistrySerializer (topic , isKey , schemaRegistryClient , schema );
265
- case JSON -> new JsonSchemaSchemaRegistrySerializer (topic , isKey , schemaRegistryClient , schema );
270
+ case PROTOBUF -> input ->
271
+ serializeProto (schemaRegistryClient , topic , type , (ProtobufSchema ) schema , meta .getId (), input );
272
+ case AVRO -> input ->
273
+ serializeAvro ((AvroSchema ) schema , meta .getId (), input );
274
+ case JSON -> input ->
275
+ serializeJson ((JsonSchema ) schema , meta .getId (), input );
266
276
};
267
277
}
268
278
269
279
@ Override
270
280
public Deserializer deserializer (String topic , Target type ) {
271
- return new SrDeserializer (topic );
272
- }
273
-
274
- ///--------------------------------------------------------------
275
-
276
- private static final byte SR_RECORD_MAGIC_BYTE = (byte ) 0 ;
277
- private static final int SR_RECORD_PREFIX_LENGTH = 5 ;
278
-
279
- @ RequiredArgsConstructor
280
- private class SrDeserializer implements Deserializer {
281
-
282
- private final String topic ;
283
-
284
- @ Override
285
- public DeserializeResult deserialize (RecordHeaders headers , byte [] data ) {
281
+ return (headers , data ) -> {
286
282
var schemaId = extractSchemaIdFromMsg (data );
287
283
SchemaType format = getMessageFormatBySchemaId (schemaId );
288
284
MessageFormatter formatter = schemaRegistryFormatters .get (format );
@@ -294,7 +290,7 @@ public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
294
290
"type" , format .name ()
295
291
)
296
292
);
297
- }
293
+ };
298
294
}
299
295
300
296
private SchemaType getMessageFormatBySchemaId (int schemaId ) {
@@ -306,7 +302,7 @@ private SchemaType getMessageFormatBySchemaId(int schemaId) {
306
302
307
303
private int extractSchemaIdFromMsg (byte [] data ) {
308
304
ByteBuffer buffer = ByteBuffer .wrap (data );
309
- if (buffer .remaining () > SR_RECORD_PREFIX_LENGTH && buffer .get () == SR_RECORD_MAGIC_BYTE ) {
305
+ if (buffer .remaining () >= SR_PAYLOAD_PREFIX_LENGTH && buffer .get () == SR_PAYLOAD_MAGIC_BYTE ) {
310
306
return buffer .getInt ();
311
307
}
312
308
throw new ValidationException (
0 commit comments