Skip to content

Commit 6924150

Browse files
committed
Added option to build & drain backlog in workload
1 parent 02e40c4 commit 6924150

File tree

4 files changed

+81
-6
lines changed

4 files changed

+81
-6
lines changed

benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ public class Workload {
3737
public int consumerPerSubscription;
3838

3939
public int producerRate;
40-
40+
41+
/**
42+
* If the consumer backlog is > 0, the generator will accumulate messages until the requested amount of storage is
43+
* retained and then it will start the consumers to drain it.
44+
*
45+
* The testDurationMinutes will be overruled to allow the test to complete when the consumer has drained all the
46+
* backlog and it's on par with the producer
47+
*/
48+
public long consumerBacklogSizeGB = 0;
49+
4150
public int testDurationMinutes;
4251
}

benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,19 @@ public class WorkloadGenerator implements ConsumerCallback, AutoCloseable {
6565
private final LongAdder totalMessagesReceived = new LongAdder();
6666

6767
private boolean testCompleted = false;
68+
private volatile boolean needToWaitForBacklogDraining = false;
6869

6970
public WorkloadGenerator(String driverName, BenchmarkDriver benchmarkDriver, Workload workload) {
7071
this.driverName = driverName;
7172
this.benchmarkDriver = benchmarkDriver;
7273
this.workload = workload;
74+
75+
if (workload.consumerBacklogSizeGB > 0 && workload.producerRate == 0) {
76+
throw new IllegalArgumentException("Cannot probe producer sustainable rate when building backlog");
77+
}
7378
}
7479

75-
public TestResult run() {
80+
public TestResult run() throws Exception {
7681
List<String> topics = createTopics();
7782
List<BenchmarkConsumer> consumers = createConsumers(topics);
7883
List<BenchmarkProducer> producers = createProducers(topics);
@@ -97,6 +102,17 @@ public TestResult run() {
97102
generateLoad(producers, TimeUnit.MINUTES.toSeconds(1), rateLimiter);
98103
runCompleted.set(true);
99104

105+
if (workload.consumerBacklogSizeGB > 0) {
106+
log.info("Stopping all consumers to build backlog");
107+
for (BenchmarkConsumer consumer : consumers) {
108+
consumer.close();
109+
}
110+
111+
executor.execute(() -> {
112+
buildAndDrainBacklog(topics);
113+
});
114+
}
115+
100116
log.info("----- Starting benchmark traffic ------");
101117
runCompleted.set(false);
102118

@@ -288,6 +304,49 @@ private List<BenchmarkProducer> createProducers(List<String> topics) {
288304
return producers;
289305
}
290306

307+
private void buildAndDrainBacklog(List<String> topics) {
308+
this.needToWaitForBacklogDraining = true;
309+
310+
long requestedBacklogSize = workload.consumerBacklogSizeGB * 1024 * 1024 * 1024;
311+
312+
while (true) {
313+
long currentBacklogSize = (workload.subscriptionsPerTopic * totalMessagesSent.sum()
314+
- totalMessagesReceived.sum()) * workload.messageSize;
315+
316+
if (currentBacklogSize >= requestedBacklogSize) {
317+
break;
318+
}
319+
320+
try {
321+
Thread.sleep(100);
322+
} catch (InterruptedException e) {
323+
throw new RuntimeException(e);
324+
}
325+
}
326+
327+
log.info("--- Start draining backlog ---");
328+
329+
createConsumers(topics);
330+
331+
final long minBacklog = 1000;
332+
333+
while (true) {
334+
long currentBacklog = workload.subscriptionsPerTopic * totalMessagesSent.sum()
335+
- totalMessagesReceived.sum();
336+
if (currentBacklog <= minBacklog) {
337+
log.info("--- Completed backlog draining ---");
338+
needToWaitForBacklogDraining = false;
339+
return;
340+
}
341+
342+
try {
343+
Thread.sleep(100);
344+
} catch (InterruptedException e) {
345+
throw new RuntimeException(e);
346+
}
347+
}
348+
}
349+
291350
private TestResult generateLoad(List<BenchmarkProducer> producers, long testDurationsSeconds,
292351
RateLimiter rateLimiter) {
293352
Recorder recorder = new Recorder(TimeUnit.SECONDS.toMicros(30), 5);
@@ -386,7 +445,7 @@ private TestResult generateLoad(List<BenchmarkProducer> producers, long testDura
386445

387446
reportHistogram.reset();
388447

389-
if (now >= testEndTime) {
448+
if (now >= testEndTime && !needToWaitForBacklogDraining) {
390449
testCompleted = true;
391450
reportHistogram = cumulativeRecorder.getIntervalHistogram();
392451
log.info(

driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkConsumer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.ExecutorService;
2424
import java.util.concurrent.Executors;
25+
import java.util.concurrent.Future;
2526

2627
import org.apache.kafka.clients.consumer.ConsumerRecord;
2728
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -37,13 +38,15 @@ public class KafkaBenchmarkConsumer implements BenchmarkConsumer {
3738
private final KafkaConsumer<byte[], byte[]> consumer;
3839

3940
private final ExecutorService executor;
41+
private final Future<?> consumerTask;
42+
private volatile boolean closing = false;
4043

4144
public KafkaBenchmarkConsumer(KafkaConsumer<byte[], byte[]> consumer, ConsumerCallback callback) {
4245
this.consumer = consumer;
4346
this.executor = Executors.newSingleThreadExecutor();
4447

45-
this.executor.execute(() -> {
46-
while (true) {
48+
this.consumerTask = this.executor.submit(() -> {
49+
while (!closing) {
4750
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
4851

4952
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
@@ -63,8 +66,10 @@ public KafkaBenchmarkConsumer(KafkaConsumer<byte[], byte[]> consumer, ConsumerCa
6366

6467
@Override
6568
public void close() throws Exception {
69+
closing = true;
70+
executor.shutdown();
71+
consumerTask.get();
6672
consumer.close();
67-
executor.shutdownNow();
6873
}
6974

7075
}

workloads/simple-workload.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ consumerPerSubscription : 1
3333

3434
producerRate : 10000
3535

36+
consumerBacklogSizeGB : 0
37+
3638
testDurationMinutes : 5
3739

3840

0 commit comments

Comments
 (0)