Skip to content

Commit 02e40c4

Browse files
committed
Ensure Kafka consumers are ready before starting the warmup traffic
1 parent e7bd864 commit 02e40c4

File tree

2 files changed

+28
-4
lines changed

2 files changed

+28
-4
lines changed

benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
import org.HdrHistogram.Histogram;
3434
import org.HdrHistogram.Recorder;
35-
import org.apache.kafka.common.errors.TopicAuthorizationException;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
3837

@@ -78,6 +77,8 @@ public TestResult run() {
7877
List<BenchmarkConsumer> consumers = createConsumers(topics);
7978
List<BenchmarkProducer> producers = createProducers(topics);
8079

80+
ensureTopicsAreReady(producers, consumers);
81+
8182
RateLimiter rateLimiter;
8283
AtomicBoolean runCompleted = new AtomicBoolean();
8384
if (workload.producerRate > 0) {
@@ -114,6 +115,27 @@ public TestResult run() {
114115
return result;
115116
}
116117

118+
private void ensureTopicsAreReady(List<BenchmarkProducer> producers, List<BenchmarkConsumer> consumers) {
119+
log.info("Waiting for consumers to be ready");
120+
// This is work around the fact that there's no way to have a consumer ready in Kafka without first publishing
121+
// some message on the topic, which will then trigger the partitions assignement to the consumers
122+
123+
// In this case we just publish 1 message and then wait for consumers to receive the data
124+
producers.forEach(producer -> producer.sendAsync(new byte[10]).thenRun(() -> totalMessagesSent.increment()));
125+
126+
long expectedMessages = workload.subscriptionsPerTopic * producers.size();
127+
128+
while (totalMessagesReceived.sum() < expectedMessages) {
129+
try {
130+
Thread.sleep(100);
131+
} catch (InterruptedException e) {
132+
throw new RuntimeException(e);
133+
}
134+
}
135+
136+
log.info("All consumers are ready");
137+
}
138+
117139
/**
118140
* Adjust the publish rate to a level that is sustainable, meaning that we can consume all the messages that are
119141
* being produced
@@ -246,6 +268,7 @@ private List<BenchmarkConsumer> createConsumers(List<String> topics) {
246268
}
247269

248270
List<BenchmarkConsumer> consumers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
271+
249272
log.info("Created {} consumers in {} ms", consumers.size(), timer.elapsedMillis());
250273
return consumers;
251274
}

driver-kafka/kafka.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver
2525

2626
replicationFactor: 1
2727

28-
commonConfig: >
28+
commonConfig: |
2929
bootstrap.servers=localhost:9092
3030
31-
producerConfig: >
31+
producerConfig: |
3232
acks=all
3333
34-
consumerConfig: >
34+
consumerConfig: |
35+
auto.offset.reset=earliest
3536
enable.auto.commit=false

0 commit comments

Comments
 (0)