|
32 | 32 |
|
33 | 33 | import org.HdrHistogram.Histogram;
|
34 | 34 | import org.HdrHistogram.Recorder;
|
35 |
| -import org.apache.kafka.common.errors.TopicAuthorizationException; |
36 | 35 | import org.slf4j.Logger;
|
37 | 36 | import org.slf4j.LoggerFactory;
|
38 | 37 |
|
@@ -78,6 +77,8 @@ public TestResult run() {
|
78 | 77 | List<BenchmarkConsumer> consumers = createConsumers(topics);
|
79 | 78 | List<BenchmarkProducer> producers = createProducers(topics);
|
80 | 79 |
|
| 80 | + ensureTopicsAreReady(producers, consumers); |
| 81 | + |
81 | 82 | RateLimiter rateLimiter;
|
82 | 83 | AtomicBoolean runCompleted = new AtomicBoolean();
|
83 | 84 | if (workload.producerRate > 0) {
|
@@ -114,6 +115,27 @@ public TestResult run() {
|
114 | 115 | return result;
|
115 | 116 | }
|
116 | 117 |
|
| 118 | + private void ensureTopicsAreReady(List<BenchmarkProducer> producers, List<BenchmarkConsumer> consumers) { |
| 119 | + log.info("Waiting for consumers to be ready"); |
| 120 | + // This is work around the fact that there's no way to have a consumer ready in Kafka without first publishing |
| 121 | + // some message on the topic, which will then trigger the partitions assignement to the consumers |
| 122 | + |
| 123 | + // In this case we just publish 1 message and then wait for consumers to receive the data |
| 124 | + producers.forEach(producer -> producer.sendAsync(new byte[10]).thenRun(() -> totalMessagesSent.increment())); |
| 125 | + |
| 126 | + long expectedMessages = workload.subscriptionsPerTopic * producers.size(); |
| 127 | + |
| 128 | + while (totalMessagesReceived.sum() < expectedMessages) { |
| 129 | + try { |
| 130 | + Thread.sleep(100); |
| 131 | + } catch (InterruptedException e) { |
| 132 | + throw new RuntimeException(e); |
| 133 | + } |
| 134 | + } |
| 135 | + |
| 136 | + log.info("All consumers are ready"); |
| 137 | + } |
| 138 | + |
117 | 139 | /**
|
118 | 140 | * Adjust the publish rate to a level that is sustainable, meaning that we can consume all the messages that are
|
119 | 141 | * being produced
|
@@ -246,6 +268,7 @@ private List<BenchmarkConsumer> createConsumers(List<String> topics) {
|
246 | 268 | }
|
247 | 269 |
|
248 | 270 | List<BenchmarkConsumer> consumers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
|
| 271 | + |
249 | 272 | log.info("Created {} consumers in {} ms", consumers.size(), timer.elapsedMillis());
|
250 | 273 | return consumers;
|
251 | 274 | }
|
|
0 commit comments