From e0371b6b4568a847013c0b212f094f4d6585bfd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 4 Dec 2023 17:13:01 +0100 Subject: [PATCH] Start producers concurrently --- .../java/com/rabbitmq/perf/MulticastSet.java | 88 ++++++++++--------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/rabbitmq/perf/MulticastSet.java b/src/main/java/com/rabbitmq/perf/MulticastSet.java index d8d5a3a2..615907a1 100644 --- a/src/main/java/com/rabbitmq/perf/MulticastSet.java +++ b/src/main/java/com/rabbitmq/perf/MulticastSet.java @@ -35,23 +35,8 @@ import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; @@ -502,30 +487,53 @@ private void createConsumers( private void createProducers( boolean announceStartup, AgentState[] producerStates, Connection[] producerConnections) - throws IOException, TimeoutException { - int producerIndex = 0; + throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(producerConnections.length); + CountDownLatch allCreated = new CountDownLatch(producerConnections.length); + List states = new CopyOnWriteArrayList<>(); + IntStream.range(0, producerConnections.length * params.getProducerChannelCount()) + .forEach(ignored -> states.add(null)); for (int i = 0; i < producerConnections.length; i++) { - if (announceStartup) { - System.out.println("id: " + testID + ", starting producer #" + i); - } - Connection producerConnection = createConnection(PRODUCER_THREAD_PREFIX + i); - producerConnections[i] = producerConnection; - for (int j = 0; j < params.getProducerChannelCount(); j++) { - if (announceStartup) { - System.out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j); - } - AgentState agentState = new AgentState(); - agentState.runnable = - params.createProducer( - producerIndex, - producerConnection, - performanceMetrics, - this.completionHandler, - this.rateIndicator, - this.messageSizeIndicator); - producerIndex++; - producerStates[(i * params.getProducerChannelCount()) + j] = agentState; - } + int producerIndex = i; + executorService.submit( + (Callable) + () -> { + Connection producerConnection = + createConnection(PRODUCER_THREAD_PREFIX + producerIndex); + producerConnections[producerIndex] = producerConnection; + for (int j = 0; j < params.getProducerChannelCount(); j++) { + if (announceStartup) { + System.out.println( + "id: " + + testID + + ", starting producer #" + + producerIndex + + ", channel #" + + j + + " (thread " + + Thread.currentThread().getName() + + ")"); + } + AgentState agentState = new AgentState(); + agentState.runnable = + params.createProducer( + producerIndex, + producerConnection, + performanceMetrics, + this.completionHandler, + this.rateIndicator, + this.messageSizeIndicator); + states.set((producerIndex * params.getProducerChannelCount()) + j, agentState); + } + allCreated.countDown(); + return null; + }); + } + + allCreated.await(); + executorService.shutdown(); + for (int i = 0; i < states.size(); i++) { + producerStates[i] = states.get(i); } }