Skip to content

Commit a5de856

Browse files
committed
Added configurable topic options for Kafka driver
1 parent cf64633 commit a5de856

File tree

5 files changed

+63
-2
lines changed

5 files changed

+63
-2
lines changed

driver-kafka/kafka-exactly-once.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver
2525

2626
replicationFactor: 3
2727

28+
topicConfig: |
29+
min.insync.replicas=2
30+
2831
commonConfig: |
2932
bootstrap.servers=localhost:9092
3033

driver-kafka/kafka-sync.yaml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
# Kafka driver profile that is configuring to sync all
21+
# published messages to disk to achieve durability
22+
23+
name : Kafka
24+
driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver
25+
26+
# Kafka client specific configurations
27+
28+
replicationFactor: 3
29+
30+
topicConfig: |
31+
min.insync.replicas=2
32+
flush.messages=1
33+
flush.ms=0
34+
35+
commonConfig: |
36+
bootstrap.servers=localhost:9092
37+
38+
producerConfig: |
39+
acks=all
40+
linger.ms=1
41+
batch.size=131072
42+
43+
consumerConfig: |
44+
auto.offset.reset=earliest
45+
enable.auto.commit=false

driver-kafka/kafka.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver
2525

2626
replicationFactor: 3
2727

28+
topicConfig: |
29+
min.insync.replicas=2
30+
2831
commonConfig: |
2932
bootstrap.servers=localhost:9092
3033

driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
public class Config {
2222
public short replicationFactor;
2323

24+
public String topicConfig;
25+
2426
public String commonConfig;
2527

2628
public String producerConfig;

driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.util.ArrayList;
2525
import java.util.Arrays;
2626
import java.util.Collections;
27+
import java.util.HashMap;
2728
import java.util.List;
29+
import java.util.Map;
2830
import java.util.Properties;
2931
import java.util.concurrent.CompletableFuture;
3032
import java.util.concurrent.ExecutionException;
@@ -56,6 +58,7 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver {
5658
private KafkaProducer<String, byte[]> producer;
5759
private List<BenchmarkConsumer> consumers = Collections.synchronizedList(new ArrayList<>());
5860

61+
private Properties topicProperties;
5962
private Properties producerProperties;
6063
private Properties consumerProperties;
6164

@@ -80,6 +83,9 @@ public void initialize(File configurationFile) throws IOException {
8083
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
8184
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
8285

86+
topicProperties = new Properties();
87+
topicProperties.load(new StringReader(config.topicConfig));
88+
8389
admin = AdminClient.create(commonProperties);
8490

8591
producer = new KafkaProducer<>(producerProperties);
@@ -90,12 +96,14 @@ public String getTopicNamePrefix() {
9096
return "test-topic";
9197
}
9298

99+
@SuppressWarnings({ "rawtypes", "unchecked" })
93100
@Override
94101
public CompletableFuture<Void> createTopic(String topic, int partitions) {
95102
return CompletableFuture.runAsync(() -> {
96103
try {
97-
admin.createTopics(Arrays.asList(new NewTopic(topic, partitions, config.replicationFactor))).all()
98-
.get();
104+
NewTopic newTopic = new NewTopic(topic, partitions, config.replicationFactor);
105+
newTopic.configs(new HashMap<>((Map) topicProperties));
106+
admin.createTopics(Arrays.asList(newTopic)).all().get();
99107
} catch (InterruptedException | ExecutionException e) {
100108
throw new RuntimeException(e);
101109
}

0 commit comments

Comments
 (0)