@@ -65,14 +65,19 @@ public class WorkloadGenerator implements ConsumerCallback, AutoCloseable {
65
65
private final LongAdder totalMessagesReceived = new LongAdder ();
66
66
67
67
private boolean testCompleted = false ;
68
+ private volatile boolean needToWaitForBacklogDraining = false ;
68
69
69
70
public WorkloadGenerator (String driverName , BenchmarkDriver benchmarkDriver , Workload workload ) {
70
71
this .driverName = driverName ;
71
72
this .benchmarkDriver = benchmarkDriver ;
72
73
this .workload = workload ;
74
+
75
+ if (workload .consumerBacklogSizeGB > 0 && workload .producerRate == 0 ) {
76
+ throw new IllegalArgumentException ("Cannot probe producer sustainable rate when building backlog" );
77
+ }
73
78
}
74
79
75
- public TestResult run () {
80
+ public TestResult run () throws Exception {
76
81
List <String > topics = createTopics ();
77
82
List <BenchmarkConsumer > consumers = createConsumers (topics );
78
83
List <BenchmarkProducer > producers = createProducers (topics );
@@ -97,6 +102,17 @@ public TestResult run() {
97
102
generateLoad (producers , TimeUnit .MINUTES .toSeconds (1 ), rateLimiter );
98
103
runCompleted .set (true );
99
104
105
+ if (workload .consumerBacklogSizeGB > 0 ) {
106
+ log .info ("Stopping all consumers to build backlog" );
107
+ for (BenchmarkConsumer consumer : consumers ) {
108
+ consumer .close ();
109
+ }
110
+
111
+ executor .execute (() -> {
112
+ buildAndDrainBacklog (topics );
113
+ });
114
+ }
115
+
100
116
log .info ("----- Starting benchmark traffic ------" );
101
117
runCompleted .set (false );
102
118
@@ -288,6 +304,49 @@ private List<BenchmarkProducer> createProducers(List<String> topics) {
288
304
return producers ;
289
305
}
290
306
307
+ private void buildAndDrainBacklog (List <String > topics ) {
308
+ this .needToWaitForBacklogDraining = true ;
309
+
310
+ long requestedBacklogSize = workload .consumerBacklogSizeGB * 1024 * 1024 * 1024 ;
311
+
312
+ while (true ) {
313
+ long currentBacklogSize = (workload .subscriptionsPerTopic * totalMessagesSent .sum ()
314
+ - totalMessagesReceived .sum ()) * workload .messageSize ;
315
+
316
+ if (currentBacklogSize >= requestedBacklogSize ) {
317
+ break ;
318
+ }
319
+
320
+ try {
321
+ Thread .sleep (100 );
322
+ } catch (InterruptedException e ) {
323
+ throw new RuntimeException (e );
324
+ }
325
+ }
326
+
327
+ log .info ("--- Start draining backlog ---" );
328
+
329
+ createConsumers (topics );
330
+
331
+ final long minBacklog = 1000 ;
332
+
333
+ while (true ) {
334
+ long currentBacklog = workload .subscriptionsPerTopic * totalMessagesSent .sum ()
335
+ - totalMessagesReceived .sum ();
336
+ if (currentBacklog <= minBacklog ) {
337
+ log .info ("--- Completed backlog draining ---" );
338
+ needToWaitForBacklogDraining = false ;
339
+ return ;
340
+ }
341
+
342
+ try {
343
+ Thread .sleep (100 );
344
+ } catch (InterruptedException e ) {
345
+ throw new RuntimeException (e );
346
+ }
347
+ }
348
+ }
349
+
291
350
private TestResult generateLoad (List <BenchmarkProducer > producers , long testDurationsSeconds ,
292
351
RateLimiter rateLimiter ) {
293
352
Recorder recorder = new Recorder (TimeUnit .SECONDS .toMicros (30 ), 5 );
@@ -386,7 +445,7 @@ private TestResult generateLoad(List<BenchmarkProducer> producers, long testDura
386
445
387
446
reportHistogram .reset ();
388
447
389
- if (now >= testEndTime ) {
448
+ if (now >= testEndTime && ! needToWaitForBacklogDraining ) {
390
449
testCompleted = true ;
391
450
reportHistogram = cumulativeRecorder .getIntervalHistogram ();
392
451
log .info (
0 commit comments