|
20 | 20 |
|
21 | 21 | import java.text.DecimalFormat;
|
22 | 22 | import java.util.ArrayList;
|
| 23 | +import java.util.Collections; |
23 | 24 | import java.util.List;
|
24 | 25 | import java.util.Random;
|
25 | 26 | import java.util.concurrent.CompletableFuture;
|
|
35 | 36 | import org.slf4j.Logger;
|
36 | 37 | import org.slf4j.LoggerFactory;
|
37 | 38 |
|
| 39 | +import com.google.common.collect.Lists; |
38 | 40 | import com.google.common.io.BaseEncoding;
|
39 | 41 | import com.google.common.util.concurrent.RateLimiter;
|
40 | 42 |
|
@@ -355,38 +357,45 @@ private TestResult generateLoad(List<BenchmarkProducer> producers, long testDura
|
355 | 357 | long startTime = System.nanoTime();
|
356 | 358 | this.testCompleted = false;
|
357 | 359 |
|
358 |
| - executor.submit(() -> { |
359 |
| - try { |
360 |
| - byte[] payloadData = new byte[workload.messageSize]; |
361 |
| - |
362 |
| - // Send messages on all topics/producers |
363 |
| - while (!testCompleted) { |
364 |
| - for (int i = 0; i < producers.size(); i++) { |
365 |
| - BenchmarkProducer producer = producers.get(i); |
366 |
| - rateLimiter.acquire(); |
367 |
| - |
368 |
| - final long sendTime = System.nanoTime(); |
369 |
| - |
370 |
| - producer.sendAsync(payloadData).thenRun(() -> { |
371 |
| - messagesSent.increment(); |
372 |
| - totalMessagesSent.increment(); |
373 |
| - bytesSent.add(payloadData.length); |
374 |
| - |
375 |
| - long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sendTime); |
376 |
| - recorder.recordValue(latencyMicros); |
377 |
| - cumulativeRecorder.recordValue(latencyMicros); |
378 |
| - |
379 |
| - }).exceptionally(ex -> { |
380 |
| - log.warn("Write error on message", ex); |
381 |
| - System.exit(-1); |
382 |
| - return null; |
383 |
| - }); |
| 360 | + int processors = Runtime.getRuntime().availableProcessors(); |
| 361 | + Collections.shuffle(producers); |
| 362 | + |
| 363 | + final byte[] payloadData = new byte[workload.messageSize]; |
| 364 | + |
| 365 | + // Divide the producers across multiple different threads |
| 366 | + for (List<BenchmarkProducer> producersPerThread : Lists.partition(producers, processors)) { |
| 367 | + executor.submit(() -> { |
| 368 | + try { |
| 369 | + |
| 370 | + // Send messages on all topics/producers assigned to this thread |
| 371 | + while (!testCompleted) { |
| 372 | + for (int i = 0; i < producersPerThread.size(); i++) { |
| 373 | + BenchmarkProducer producer = producersPerThread.get(i); |
| 374 | + rateLimiter.acquire(); |
| 375 | + |
| 376 | + final long sendTime = System.nanoTime(); |
| 377 | + |
| 378 | + producer.sendAsync(payloadData).thenRun(() -> { |
| 379 | + messagesSent.increment(); |
| 380 | + totalMessagesSent.increment(); |
| 381 | + bytesSent.add(payloadData.length); |
| 382 | + |
| 383 | + long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sendTime); |
| 384 | + recorder.recordValue(latencyMicros); |
| 385 | + cumulativeRecorder.recordValue(latencyMicros); |
| 386 | + |
| 387 | + }).exceptionally(ex -> { |
| 388 | + log.warn("Write error on message", ex); |
| 389 | + System.exit(-1); |
| 390 | + return null; |
| 391 | + }); |
| 392 | + } |
384 | 393 | }
|
| 394 | + } catch (Throwable t) { |
| 395 | + log.error("Got error", t); |
385 | 396 | }
|
386 |
| - } catch (Throwable t) { |
387 |
| - log.error("Got error", t); |
388 |
| - } |
389 |
| - }); |
| 397 | + }); |
| 398 | + } |
390 | 399 |
|
391 | 400 | // Print report stats
|
392 | 401 | long oldTime = System.nanoTime();
|
|
0 commit comments