Skip to content

Commit b10b227

Browse files
Support set client.id with Zone ID for Kafka driver (openmessaging#424)
* Support set client.id with Zone ID for Kafka driver * Apply spotless fix
1 parent fe3c5a0 commit b10b227

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545

4646
public class KafkaBenchmarkDriver implements BenchmarkDriver {
4747

48+
private static final String ZONE_ID_CONFIG = "zone.id";
49+
private static final String ZONE_ID_TEMPLATE = "{zone.id}";
50+
private static final String KAFKA_CLIENT_ID = "client.id";
4851
private Config config;
4952

5053
private List<BenchmarkProducer> producers = Collections.synchronizedList(new ArrayList<>());
@@ -63,6 +66,13 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
6366
Properties commonProperties = new Properties();
6467
commonProperties.load(new StringReader(config.commonConfig));
6568

69+
if (commonProperties.containsKey(KAFKA_CLIENT_ID)) {
70+
commonProperties.put(
71+
KAFKA_CLIENT_ID,
72+
applyZoneId(
73+
commonProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG)));
74+
}
75+
6676
producerProperties = new Properties();
6777
commonProperties.forEach((key, value) -> producerProperties.put(key, value));
6878
producerProperties.load(new StringReader(config.producerConfig));
@@ -151,6 +161,10 @@ public void close() throws Exception {
151161
admin.close();
152162
}
153163

164+
private static String applyZoneId(String clientId, String zoneId) {
165+
return clientId.replace(ZONE_ID_TEMPLATE, zoneId);
166+
}
167+
154168
private static final ObjectMapper mapper =
155169
new ObjectMapper(new YAMLFactory())
156170
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

0 commit comments

Comments
 (0)