Skip to content

Commit 1517d3e

Browse files
committed
Add --netty and --netty-threads options
References rabbitmq/rabbitmq-java-client#1663
1 parent 86c2a83 commit 1517d3e

File tree

14 files changed

+559
-78
lines changed

14 files changed

+559
-78
lines changed

.github/workflows/test-alphas.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ jobs:
1919
name: Test against ${{ matrix.rabbitmq-image }}
2020
steps:
2121
- uses: actions/checkout@v4
22+
- name: Checkout tls-gen
23+
uses: actions/checkout@v4
24+
with:
25+
repository: rabbitmq/tls-gen
26+
path: './tls-gen'
2227
- name: Set up JDK
2328
uses: actions/setup-java@v4
2429
with:

.github/workflows/test-pr.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ jobs:
1212

1313
steps:
1414
- uses: actions/checkout@v4
15+
- name: Checkout tls-gen
16+
uses: actions/checkout@v4
17+
with:
18+
repository: rabbitmq/tls-gen
19+
path: './tls-gen'
1520
- name: Set up JDK
1621
uses: actions/setup-java@v4
1722
with:

.github/workflows/test-supported-java-versions.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ jobs:
1818
name: Test against Java ${{ matrix.distribution }} ${{ matrix.version }}
1919
steps:
2020
- uses: actions/checkout@v4
21+
- name: Checkout tls-gen
22+
uses: actions/checkout@v4
23+
with:
24+
repository: rabbitmq/tls-gen
25+
path: './tls-gen'
2126
- name: Set up JDK
2227
uses: actions/setup-java@v4
2328
with:

.github/workflows/test.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ jobs:
1212

1313
steps:
1414
- uses: actions/checkout@v4
15+
- name: Checkout tls-gen
16+
uses: actions/checkout@v4
17+
with:
18+
repository: rabbitmq/tls-gen
19+
path: './tls-gen'
1520
- name: Set up JDK
1621
uses: actions/setup-java@v4
1722
with:

ci/start-broker.sh

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,32 @@ wait_for_message() {
1010
done
1111
}
1212

13+
make -C "${PWD}"/tls-gen/basic
14+
15+
mkdir -p rabbitmq-configuration/tls
16+
cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
17+
chmod o+r rabbitmq-configuration/tls/*
18+
chmod g+r rabbitmq-configuration/tls/*
19+
20+
echo "loopback_users = none
21+
22+
listeners.ssl.default = 5671
23+
24+
ssl_options.cacertfile = /etc/rabbitmq/tls/ca_certificate.pem
25+
ssl_options.certfile = /etc/rabbitmq/tls/server_$(hostname)_certificate.pem
26+
ssl_options.keyfile = /etc/rabbitmq/tls/server_$(hostname)_key.pem
27+
ssl_options.verify = verify_peer
28+
ssl_options.fail_if_no_peer_cert = false
29+
ssl_options.depth = 1
30+
31+
auth_mechanisms.1 = PLAIN" >> rabbitmq-configuration/rabbitmq.conf
32+
1333
echo "Running RabbitMQ ${RABBITMQ_IMAGE}"
1434

1535
docker rm -f rabbitmq 2>/dev/null || echo "rabbitmq was not running"
1636
docker run -d --name rabbitmq \
1737
--network host \
38+
-v "${PWD}"/rabbitmq-configuration:/etc/rabbitmq \
1839
"${RABBITMQ_IMAGE}"
1940

2041
wait_for_message rabbitmq "completed with"

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
5656

5757
<spotless.check.skip>true</spotless.check.skip>
58-
<rabbitmq.version>5.26.0</rabbitmq.version>
58+
<rabbitmq.version>5.27.0-SNAPSHOT</rabbitmq.version>
5959
<slf4j.version>2.0.17</slf4j.version>
6060
<commons-cli.version>1.10.0</commons-cli.version>
6161
<metrics.version>4.2.33</metrics.version>

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.ByteArrayInputStream;
2626
import java.io.DataInputStream;
2727
import java.io.IOException;
28+
import java.io.PrintStream;
2829
import java.time.Duration;
2930
import java.util.*;
3031
import java.util.concurrent.Callable;
@@ -98,6 +99,7 @@ public class Consumer extends AgentBase implements Runnable {
9899

99100
private final Runnable rateLimiterCallback;
100101
private final boolean rateLimitation;
102+
private final PrintStream out;
101103

102104
public Consumer(ConsumerParameters parameters) {
103105
super(
@@ -124,6 +126,7 @@ public Consumer(ConsumerParameters parameters) {
124126

125127
this.queueNames.set(new ArrayList<>(parameters.getQueueNames()));
126128
this.initialQueueNames = new ArrayList<>(parameters.getQueueNames());
129+
this.out = parameters.getOut();
127130

128131
if (parameters.getConsumerLatenciesIndicator().isVariable()) {
129132
this.consumerLatency =
@@ -372,7 +375,7 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
372375

373376
@Override
374377
public void handleCancel(String consumerTag) {
375-
System.out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
378+
out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
376379
epochMessageCount.set(0);
377380
if (consumerTagBranchMap.containsKey(consumerTag)) {
378381
String qName = consumerTagBranchMap.get(consumerTag);
@@ -393,7 +396,7 @@ public void handleCancel(String consumerTag) {
393396
delay.toMillis(),
394397
TimeUnit.MILLISECONDS);
395398
} else {
396-
System.out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
399+
out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
397400
}
398401
}
399402
}

src/main/java/com/rabbitmq/perf/ConsumerParameters.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.client.Channel;
1919
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
2020
import com.rabbitmq.perf.metrics.PerformanceMetrics;
21+
import java.io.PrintStream;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.concurrent.ExecutorService;
@@ -60,6 +61,16 @@ public class ConsumerParameters {
6061

6162
private int id;
6263
private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;
64+
private PrintStream out = System.out;
65+
66+
ConsumerParameters setOut(PrintStream out) {
67+
this.out = out;
68+
return this;
69+
}
70+
71+
PrintStream getOut() {
72+
return out;
73+
}
6374

6475
public Channel getChannel() {
6576
return channel;

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
2525
import com.rabbitmq.perf.metrics.PerformanceMetrics;
2626
import java.io.IOException;
27+
import java.io.PrintStream;
2728
import java.time.Duration;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
@@ -137,6 +138,8 @@ public class MulticastParams {
137138

138139
private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;
139140

141+
private PrintStream out = System.out;
142+
140143
public void setExchangeType(String exchangeType) {
141144
this.exchangeType = exchangeType;
142145
}
@@ -319,6 +322,10 @@ void setConsumerStartDelay(Duration csd) {
319322
this.consumerStartDelay = csd;
320323
}
321324

325+
void setOut(PrintStream out) {
326+
this.out = out;
327+
}
328+
322329
public int getConsumerCount() {
323330
return consumerCount;
324331
}
@@ -485,6 +492,10 @@ public Duration getConsumerStartDelay() {
485492
return consumerStartDelay;
486493
}
487494

495+
PrintStream getOut() {
496+
return out;
497+
}
498+
488499
public void setPolling(boolean polling) {
489500
this.polling = polling;
490501
}
@@ -655,7 +666,8 @@ public Consumer createConsumer(
655666
topologyRecordingScheduledExecutorService)
656667
.setStartListener(this.startListener)
657668
.setRateLimiterFactory(this.rateLimiterFactory)
658-
.setFunctionalLogger(this.functionalLogger));
669+
.setFunctionalLogger(this.functionalLogger)
670+
.setOut(this.out));
659671
this.topologyHandler.next();
660672
return consumer;
661673
}

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.rabbitmq.perf.metrics.PerformanceMetrics;
2929
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
3030
import java.io.IOException;
31+
import java.io.PrintStream;
3132
import java.net.URISyntaxException;
3233
import java.security.KeyManagementException;
3334
import java.security.NoSuchAlgorithmException;
@@ -80,6 +81,7 @@ public class MulticastSet {
8081
private final ConnectionCreator connectionCreator;
8182
private final ExpectedMetrics expectedMetrics;
8283
private final InstanceSynchronization instanceSynchronization;
84+
private final PrintStream out;
8385

8486
public MulticastSet(
8587
PerformanceMetrics performanceMetrics,
@@ -167,6 +169,7 @@ public MulticastSet(
167169
this.connectionCreator = new ConnectionCreator(this.factory, this.uris, connectionAllocation);
168170
this.expectedMetrics = expectedMetrics;
169171
this.instanceSynchronization = instanceSynchronization;
172+
this.out = params.getOut();
170173
}
171174

172175
protected static int nbThreadsForConsumer(MulticastParams params) {
@@ -233,6 +236,7 @@ public void run(boolean announceStartup)
233236
: params.getServersUpLimit(),
234237
uris,
235238
factory)) {
239+
// TODO do not set a heartbeat executor if Netty is used
236240
ScheduledExecutorService heartbeatSenderExecutorService =
237241
this.threadingHandler.scheduledExecutorService(
238242
"perf-test-heartbeat-sender-", this.params.getHeartbeatSenderThreads());
@@ -382,7 +386,7 @@ public void run(boolean announceStartup)
382386

383387
executeShutdownSequence.run();
384388
} else {
385-
System.out.println(
389+
out.println(
386390
"Could not connect to broker(s) in "
387391
+ params.getServersStartUpTimeout()
388392
+ " second(s), exiting.");
@@ -475,7 +479,7 @@ private void createConsumers(
475479
int consumerIndex = 0;
476480
for (int i = 0; i < consumerConnections.length; i++) {
477481
if (announceStartup) {
478-
System.out.println("id: " + testID + ", starting consumer #" + i);
482+
out.println("id: " + testID + ", starting consumer #" + i);
479483
}
480484
ExecutorService executorService = consumersExecutorsFactory.apply(i);
481485
factory.setSharedExecutor(executorService);
@@ -484,7 +488,7 @@ private void createConsumers(
484488
consumerConnections[i] = consumerConnection;
485489
for (int j = 0; j < params.getConsumerChannelCount(); j++) {
486490
if (announceStartup) {
487-
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
491+
out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
488492
}
489493
Consumer consumer =
490494
params.createConsumer(
@@ -507,13 +511,13 @@ private void createProducers(
507511
int producerIndex = 0;
508512
for (int i = 0; i < producerConnections.length; i++) {
509513
if (announceStartup) {
510-
System.out.println("id: " + testID + ", starting producer #" + i);
514+
out.println("id: " + testID + ", starting producer #" + i);
511515
}
512516
Connection producerConnection = createConnection(PRODUCER_THREAD_PREFIX + i);
513517
producerConnections[i] = producerConnection;
514518
for (int j = 0; j < params.getProducerChannelCount(); j++) {
515519
if (announceStartup) {
516-
System.out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j);
520+
out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j);
517521
}
518522
AgentState agentState = new AgentState();
519523
agentState.runnable =
@@ -538,7 +542,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
538542
runnable.run();
539543
LOGGER.debug("Consumer runnable started");
540544
if (params.getConsumerSlowStart()) {
541-
System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
545+
out.println("Delaying start by 1 second because -S/--slow-start was requested");
542546
Thread.sleep(1000);
543547
}
544548
}
@@ -551,8 +555,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
551555
for (Runnable runnable : consumerRunnables) {
552556
runnable.run();
553557
if (params.getConsumerSlowStart()) {
554-
System.out.println(
555-
"Delaying start by 1 second because -S/--slow-start was requested");
558+
out.println("Delaying start by 1 second because -S/--slow-start was requested");
556559
try {
557560
Thread.sleep(1000);
558561
} catch (InterruptedException e) {

0 commit comments

Comments
 (0)