|
37 | 37 | import org.apache.kafka.clients.producer.ProducerConfig;
|
38 | 38 | import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
39 | 39 | import org.apache.kafka.common.serialization.ByteArraySerializer;
|
| 40 | +import org.apache.kafka.common.serialization.StringDeserializer; |
| 41 | +import org.apache.kafka.common.serialization.StringSerializer; |
40 | 42 |
|
41 | 43 | import com.fasterxml.jackson.databind.DeserializationFeature;
|
42 | 44 | import com.fasterxml.jackson.databind.ObjectMapper;
|
@@ -69,13 +71,13 @@ public void initialize(File configurationFile) throws IOException {
|
69 | 71 | producerProperties = new Properties();
|
70 | 72 | commonProperties.forEach((key, value) -> producerProperties.put(key, value));
|
71 | 73 | producerProperties.load(new StringReader(config.producerConfig));
|
72 |
| - producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); |
| 74 | + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
73 | 75 | producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
|
74 | 76 |
|
75 | 77 | consumerProperties = new Properties();
|
76 | 78 | commonProperties.forEach((key, value) -> consumerProperties.put(key, value));
|
77 | 79 | consumerProperties.load(new StringReader(config.consumerConfig));
|
78 |
| - consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); |
| 80 | + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); |
79 | 81 | consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
|
80 | 82 |
|
81 | 83 | admin = AdminClient.create(commonProperties);
|
@@ -111,7 +113,7 @@ public CompletableFuture<BenchmarkConsumer> createConsumer(String topic, String
|
111 | 113 | Properties properties = new Properties();
|
112 | 114 | consumerProperties.forEach((key, value) -> properties.put(key, value));
|
113 | 115 | properties.put(ConsumerConfig.GROUP_ID_CONFIG, subscriptionName);
|
114 |
| - KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties); |
| 116 | + KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties); |
115 | 117 | try {
|
116 | 118 | consumer.subscribe(Arrays.asList(topic));
|
117 | 119 | return CompletableFuture.completedFuture(new KafkaBenchmarkConsumer(consumer, consumerCallback));
|
|
0 commit comments