Skip to content

Commit b0583a3

Browse files
iliaxiliaxHaarolean
authored
BE: Refactor SchemaRegistry serialization logic (#4116)
Co-authored-by: iliax <ikuramshin@provectus.com> Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
1 parent 4ec7975 commit b0583a3

File tree

7 files changed

+152
-238
lines changed

7 files changed

+152
-238
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/JsonSchemaSchemaRegistrySerializer.java

Lines changed: 0 additions & 79 deletions
This file was deleted.

kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/ProtobufSchemaRegistrySerializer.java

Lines changed: 0 additions & 50 deletions
This file was deleted.

kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.provectus.kafka.ui.serdes.builtin.sr;
22

3+
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeAvro;
4+
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeJson;
5+
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeProto;
36
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
47
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
58

69
import com.google.common.annotations.VisibleForTesting;
710
import com.provectus.kafka.ui.exception.ValidationException;
811
import com.provectus.kafka.ui.serde.api.DeserializeResult;
912
import com.provectus.kafka.ui.serde.api.PropertyResolver;
10-
import com.provectus.kafka.ui.serde.api.RecordHeaders;
1113
import com.provectus.kafka.ui.serde.api.SchemaDescription;
1214
import com.provectus.kafka.ui.serdes.BuiltInSerde;
1315
import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
@@ -32,13 +34,15 @@
3234
import java.util.Optional;
3335
import java.util.concurrent.Callable;
3436
import javax.annotation.Nullable;
35-
import lombok.RequiredArgsConstructor;
3637
import lombok.SneakyThrows;
3738
import org.apache.kafka.common.config.SslConfigs;
3839

3940

4041
public class SchemaRegistrySerde implements BuiltInSerde {
4142

43+
private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
44+
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
45+
4246
public static String name() {
4347
return "SchemaRegistry";
4448
}
@@ -221,8 +225,8 @@ private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) {
221225
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
222226
.toJson();
223227
case JSON ->
224-
//need to use confluent JsonSchema since it includes resolved references
225-
((JsonSchema) parsedSchema).rawSchema().toString();
228+
//need to use confluent JsonSchema since it includes resolved references
229+
((JsonSchema) parsedSchema).rawSchema().toString();
226230
};
227231
}
228232

@@ -254,35 +258,27 @@ private String schemaSubject(String topic, Target type) {
254258
@Override
255259
public Serializer serializer(String topic, Target type) {
256260
String subject = schemaSubject(topic, type);
257-
var schema = getSchemaBySubject(subject)
258-
.orElseThrow(() -> new ValidationException(String.format("No schema for subject '%s' found", subject)));
259-
boolean isKey = type == Target.KEY;
260-
SchemaType schemaType = SchemaType.fromString(schema.getSchemaType())
261-
.orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType()));
261+
SchemaMetadata meta = getSchemaBySubject(subject)
262+
.orElseThrow(() -> new ValidationException(
263+
String.format("No schema for subject '%s' found", subject)));
264+
ParsedSchema schema = getSchemaById(meta.getId())
265+
.orElseThrow(() -> new IllegalStateException(
266+
String.format("Schema found for id %s, subject '%s'", meta.getId(), subject)));
267+
SchemaType schemaType = SchemaType.fromString(meta.getSchemaType())
268+
.orElseThrow(() -> new IllegalStateException("Unknown schema type: " + meta.getSchemaType()));
262269
return switch (schemaType) {
263-
case PROTOBUF -> new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
264-
case AVRO -> new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
265-
case JSON -> new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
270+
case PROTOBUF -> input ->
271+
serializeProto(schemaRegistryClient, topic, type, (ProtobufSchema) schema, meta.getId(), input);
272+
case AVRO -> input ->
273+
serializeAvro((AvroSchema) schema, meta.getId(), input);
274+
case JSON -> input ->
275+
serializeJson((JsonSchema) schema, meta.getId(), input);
266276
};
267277
}
268278

269279
@Override
270280
public Deserializer deserializer(String topic, Target type) {
271-
return new SrDeserializer(topic);
272-
}
273-
274-
///--------------------------------------------------------------
275-
276-
private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0;
277-
private static final int SR_RECORD_PREFIX_LENGTH = 5;
278-
279-
@RequiredArgsConstructor
280-
private class SrDeserializer implements Deserializer {
281-
282-
private final String topic;
283-
284-
@Override
285-
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
281+
return (headers, data) -> {
286282
var schemaId = extractSchemaIdFromMsg(data);
287283
SchemaType format = getMessageFormatBySchemaId(schemaId);
288284
MessageFormatter formatter = schemaRegistryFormatters.get(format);
@@ -294,7 +290,7 @@ public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
294290
"type", format.name()
295291
)
296292
);
297-
}
293+
};
298294
}
299295

300296
private SchemaType getMessageFormatBySchemaId(int schemaId) {
@@ -306,7 +302,7 @@ private SchemaType getMessageFormatBySchemaId(int schemaId) {
306302

307303
private int extractSchemaIdFromMsg(byte[] data) {
308304
ByteBuffer buffer = ByteBuffer.wrap(data);
309-
if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) {
305+
if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) {
310306
return buffer.getInt();
311307
}
312308
throw new ValidationException(

kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerializer.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)