Skip to content

Commit c5ad4f7

Browse files
authored
Use multiple producer threads to drive the load generation (openmessaging#2)
1 parent 3351a33 commit c5ad4f7

File tree

1 file changed

+39
-30
lines changed

1 file changed

+39
-30
lines changed

benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.text.DecimalFormat;
2222
import java.util.ArrayList;
23+
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Random;
2526
import java.util.concurrent.CompletableFuture;
@@ -35,6 +36,7 @@
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

39+
import com.google.common.collect.Lists;
3840
import com.google.common.io.BaseEncoding;
3941
import com.google.common.util.concurrent.RateLimiter;
4042

@@ -355,38 +357,45 @@ private TestResult generateLoad(List<BenchmarkProducer> producers, long testDura
355357
long startTime = System.nanoTime();
356358
this.testCompleted = false;
357359

358-
executor.submit(() -> {
359-
try {
360-
byte[] payloadData = new byte[workload.messageSize];
361-
362-
// Send messages on all topics/producers
363-
while (!testCompleted) {
364-
for (int i = 0; i < producers.size(); i++) {
365-
BenchmarkProducer producer = producers.get(i);
366-
rateLimiter.acquire();
367-
368-
final long sendTime = System.nanoTime();
369-
370-
producer.sendAsync(payloadData).thenRun(() -> {
371-
messagesSent.increment();
372-
totalMessagesSent.increment();
373-
bytesSent.add(payloadData.length);
374-
375-
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sendTime);
376-
recorder.recordValue(latencyMicros);
377-
cumulativeRecorder.recordValue(latencyMicros);
378-
379-
}).exceptionally(ex -> {
380-
log.warn("Write error on message", ex);
381-
System.exit(-1);
382-
return null;
383-
});
360+
int processors = Runtime.getRuntime().availableProcessors();
361+
Collections.shuffle(producers);
362+
363+
final byte[] payloadData = new byte[workload.messageSize];
364+
365+
// Divide the producers across multiple different threads
366+
for (List<BenchmarkProducer> producersPerThread : Lists.partition(producers, processors)) {
367+
executor.submit(() -> {
368+
try {
369+
370+
// Send messages on all topics/producers assigned to this thread
371+
while (!testCompleted) {
372+
for (int i = 0; i < producersPerThread.size(); i++) {
373+
BenchmarkProducer producer = producersPerThread.get(i);
374+
rateLimiter.acquire();
375+
376+
final long sendTime = System.nanoTime();
377+
378+
producer.sendAsync(payloadData).thenRun(() -> {
379+
messagesSent.increment();
380+
totalMessagesSent.increment();
381+
bytesSent.add(payloadData.length);
382+
383+
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sendTime);
384+
recorder.recordValue(latencyMicros);
385+
cumulativeRecorder.recordValue(latencyMicros);
386+
387+
}).exceptionally(ex -> {
388+
log.warn("Write error on message", ex);
389+
System.exit(-1);
390+
return null;
391+
});
392+
}
384393
}
394+
} catch (Throwable t) {
395+
log.error("Got error", t);
385396
}
386-
} catch (Throwable t) {
387-
log.error("Got error", t);
388-
}
389-
});
397+
});
398+
}
390399

391400
// Print report stats
392401
long oldTime = System.nanoTime();

0 commit comments

Comments
 (0)