Skip to content

Commit 93ac64c

Browse files
committed
Fixed kafka key serializer class
1 parent 6cd40ad commit 93ac64c

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,22 @@
3535

3636
public class KafkaBenchmarkConsumer implements BenchmarkConsumer {
3737

38-
private final KafkaConsumer<byte[], byte[]> consumer;
38+
private final KafkaConsumer<String, byte[]> consumer;
3939

4040
private final ExecutorService executor;
4141
private final Future<?> consumerTask;
4242
private volatile boolean closing = false;
4343

44-
public KafkaBenchmarkConsumer(KafkaConsumer<byte[], byte[]> consumer, ConsumerCallback callback) {
44+
public KafkaBenchmarkConsumer(KafkaConsumer<String, byte[]> consumer, ConsumerCallback callback) {
4545
this.consumer = consumer;
4646
this.executor = Executors.newSingleThreadExecutor();
4747

4848
this.consumerTask = this.executor.submit(() -> {
4949
while (!closing) {
50-
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
50+
ConsumerRecords<String, byte[]> records = consumer.poll(100);
5151

5252
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
53-
for (ConsumerRecord<byte[], byte[]> record : records) {
53+
for (ConsumerRecord<String, byte[]> record : records) {
5454
callback.messageReceived(record.value(), record.timestamp());
5555

5656
offsetMap.put(new TopicPartition(record.topic(), record.partition()),

driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.kafka.clients.producer.ProducerConfig;
3838
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3939
import org.apache.kafka.common.serialization.ByteArraySerializer;
40+
import org.apache.kafka.common.serialization.StringDeserializer;
41+
import org.apache.kafka.common.serialization.StringSerializer;
4042

4143
import com.fasterxml.jackson.databind.DeserializationFeature;
4244
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -69,13 +71,13 @@ public void initialize(File configurationFile) throws IOException {
6971
producerProperties = new Properties();
7072
commonProperties.forEach((key, value) -> producerProperties.put(key, value));
7173
producerProperties.load(new StringReader(config.producerConfig));
72-
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
74+
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
7375
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
7476

7577
consumerProperties = new Properties();
7678
commonProperties.forEach((key, value) -> consumerProperties.put(key, value));
7779
consumerProperties.load(new StringReader(config.consumerConfig));
78-
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
80+
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
7981
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
8082

8183
admin = AdminClient.create(commonProperties);
@@ -111,7 +113,7 @@ public CompletableFuture<BenchmarkConsumer> createConsumer(String topic, String
111113
Properties properties = new Properties();
112114
consumerProperties.forEach((key, value) -> properties.put(key, value));
113115
properties.put(ConsumerConfig.GROUP_ID_CONFIG, subscriptionName);
114-
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
116+
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties);
115117
try {
116118
consumer.subscribe(Arrays.asList(topic));
117119
return CompletableFuture.completedFuture(new KafkaBenchmarkConsumer(consumer, consumerCallback));

0 commit comments

Comments
 (0)