Skip to content

Commit b1ae7c7

Browse files
authored
Merge pull request #855 from rabbitmq/netty
Add support for Netty
2 parents 86c2a83 + 1e8f8d0 commit b1ae7c7

File tree

17 files changed

+746
-129
lines changed

17 files changed

+746
-129
lines changed

.github/workflows/test-alphas.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ jobs:
1919
name: Test against ${{ matrix.rabbitmq-image }}
2020
steps:
2121
- uses: actions/checkout@v4
22+
- name: Checkout tls-gen
23+
uses: actions/checkout@v4
24+
with:
25+
repository: rabbitmq/tls-gen
26+
path: './tls-gen'
2227
- name: Set up JDK
2328
uses: actions/setup-java@v4
2429
with:

.github/workflows/test-pr.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ jobs:
1212

1313
steps:
1414
- uses: actions/checkout@v4
15+
- name: Checkout tls-gen
16+
uses: actions/checkout@v4
17+
with:
18+
repository: rabbitmq/tls-gen
19+
path: './tls-gen'
1520
- name: Set up JDK
1621
uses: actions/setup-java@v4
1722
with:

.github/workflows/test-supported-java-versions.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ jobs:
1818
name: Test against Java ${{ matrix.distribution }} ${{ matrix.version }}
1919
steps:
2020
- uses: actions/checkout@v4
21+
- name: Checkout tls-gen
22+
uses: actions/checkout@v4
23+
with:
24+
repository: rabbitmq/tls-gen
25+
path: './tls-gen'
2126
- name: Set up JDK
2227
uses: actions/setup-java@v4
2328
with:

.github/workflows/test.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ jobs:
1212

1313
steps:
1414
- uses: actions/checkout@v4
15+
- name: Checkout tls-gen
16+
uses: actions/checkout@v4
17+
with:
18+
repository: rabbitmq/tls-gen
19+
path: './tls-gen'
1520
- name: Set up JDK
1621
uses: actions/setup-java@v4
1722
with:

ci/start-broker.sh

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,32 @@ wait_for_message() {
1010
done
1111
}
1212

13+
make -C "${PWD}"/tls-gen/basic
14+
15+
mkdir -p rabbitmq-configuration/tls
16+
cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
17+
chmod o+r rabbitmq-configuration/tls/*
18+
chmod g+r rabbitmq-configuration/tls/*
19+
20+
echo "loopback_users = none
21+
22+
listeners.ssl.default = 5671
23+
24+
ssl_options.cacertfile = /etc/rabbitmq/tls/ca_certificate.pem
25+
ssl_options.certfile = /etc/rabbitmq/tls/server_$(hostname)_certificate.pem
26+
ssl_options.keyfile = /etc/rabbitmq/tls/server_$(hostname)_key.pem
27+
ssl_options.verify = verify_peer
28+
ssl_options.fail_if_no_peer_cert = false
29+
ssl_options.depth = 1
30+
31+
auth_mechanisms.1 = PLAIN" >> rabbitmq-configuration/rabbitmq.conf
32+
1333
echo "Running RabbitMQ ${RABBITMQ_IMAGE}"
1434

1535
docker rm -f rabbitmq 2>/dev/null || echo "rabbitmq was not running"
1636
docker run -d --name rabbitmq \
1737
--network host \
38+
-v "${PWD}"/rabbitmq-configuration:/etc/rabbitmq \
1839
"${RABBITMQ_IMAGE}"
1940

2041
wait_for_message rabbitmq "completed with"

pom.xml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@
5555
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
5656

5757
<spotless.check.skip>true</spotless.check.skip>
58-
<rabbitmq.version>5.26.0</rabbitmq.version>
58+
<rabbitmq.version>5.27.0-SNAPSHOT</rabbitmq.version>
5959
<slf4j.version>2.0.17</slf4j.version>
6060
<commons-cli.version>1.10.0</commons-cli.version>
6161
<metrics.version>4.2.33</metrics.version>
6262
<micrometer.version>1.15.2</micrometer.version>
6363
<jgroups.version>5.4.8.Final</jgroups.version>
6464
<jgroups-kubernetes.version>2.0.2.Final</jgroups-kubernetes.version>
65+
<netty.version>4.2.3.Final</netty.version>
6566
<gson.version>2.13.1</gson.version>
6667
<resilience4j.version>2.1.0</resilience4j.version>
6768
<logback.version>1.3.15</logback.version>
@@ -169,6 +170,18 @@
169170
<artifactId>jgroups-kubernetes</artifactId>
170171
<version>${jgroups-kubernetes.version}</version>
171172
</dependency>
173+
<dependency>
174+
<groupId>io.netty</groupId>
175+
<artifactId>netty-transport-native-epoll</artifactId>
176+
<version>${netty.version}</version>
177+
<classifier>linux-x86_64</classifier>
178+
</dependency>
179+
<dependency>
180+
<groupId>io.netty</groupId>
181+
<artifactId>netty-transport-native-kqueue</artifactId>
182+
<version>${netty.version}</version>
183+
<classifier>osx-aarch_64</classifier>
184+
</dependency>
172185

173186
<dependency>
174187
<groupId>org.junit.jupiter</groupId>

src/docs/asciidoc/usage-advanced.adoc

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ Another way to avoid `java.lang.OutOfMemoryError: unable to create new native th
276276
exceptions is to tune the number of file descriptors allowed per process
277277
at the OS level, as some distributions use very low limits.
278278
Here the recommendations are the same as for the broker, so you
279-
can refer to our https://www.rabbitmq.com/networking.html#os-tuning[networking guide].
279+
can refer to our https://www.rabbitmq.com/docs/networking#os-tuning[networking guide].
280280

281281
[[workloads-with-a-large-number-of-clients]]
282282
== Workloads With a Large Number of Clients
@@ -386,37 +386,29 @@ These are 1-thread thread pools in this case, so this is 10 threads overall inst
386386
huge resource saving to simulate more clients with a single PerfTest instance for large IoT workloads.
387387

388388
By default, PerfTest uses blocking network socket I/O to communicate with
389-
the broker. This mode works fine for clients in many cases but the RabbitMQ Java client
390-
also supports an https://www.rabbitmq.com/api-guide.html#java-nio[asynchronous I/O mode],
391-
where resources like threads can be easily tuned. The goal here is to use as few
392-
resources as possible to simulate as much load as possible with a single PerfTest instance.
389+
the broker.
390+
This mode works fine for clients in many cases but the RabbitMQ Java client can also use Netty for its network layer.
391+
Netty uses a multithreaded event loop to handle I/O operation and the number of threads can be easily tuned.
392+
The goal here is to use as few resources as possible to simulate as much load as possible with a single PerfTest instance.
393393
In the slow publisher example above, a handful of threads should be enough
394-
to handle the I/O. That's what the
395-
`--nio-threads` flag is for:
394+
to handle the I/O.
395+
That's what the `--netty-threads` flag is for:
396396

397-
.Reducing the number of IO threads by enabling the NIO mode with `--nio-threads`
397+
.Reducing the number of I/O threads by using Netty with `--netty-threads`
398398
[source,bash,indent=0]
399399
--------
400400
java -jar perf-test.jar --queue-pattern 'perf-test-%d' \
401401
--queue-pattern-from 1 --queue-pattern-to 1000 \
402402
--producers 1000 --consumers 1000 \
403-
--heartbeat-sender-threads 10 \
404403
--publishing-interval 60 --producer-random-start-delay 1800 \
405404
--producer-scheduler-threads 10 \
406-
--nio-threads 10
405+
--consumers-thread-pools 10 \
406+
--netty-threads 10
407407
--------
408408

409-
This way PerfTest will use 12 threads for I/O over all the connections.
410-
With the default blocking I/O mode, each producer (or consumer)
411-
uses a thread for the I/O loop, that is 2000 threads to simulate 1000 producers and
412-
1000 consumers. Using NIO in PerfTest can dramatically reduce the resources used
413-
to simulate workloads with a large number of connections with appropriate tuning.
414-
415-
Note that in NIO mode the number of threads used can increase temporarily when connections close
416-
unexpectedly and connection recovery kicks in. This is due to the NIO mode dispatching
417-
connection closing to non-I/O threads to avoid deadlocks. Connection recovery can be disabled
418-
with the `--disable-connection-recovery` flag.
419-
409+
This way PerfTest will use 10 threads for I/O over all the connections.
410+
With the default blocking I/O mode, each producer (or consumer) uses a thread for the I/O loop, that is 2000 threads to simulate 1000 producers and 1000 consumers.
411+
Using Netty in PerfTest can dramatically reduce the resources used to simulate workloads with a large number of connections with appropriate tuning.
420412

421413
== Running Producers and Consumers on Different Machines
422414

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.ByteArrayInputStream;
2626
import java.io.DataInputStream;
2727
import java.io.IOException;
28+
import java.io.PrintStream;
2829
import java.time.Duration;
2930
import java.util.*;
3031
import java.util.concurrent.Callable;
@@ -98,6 +99,7 @@ public class Consumer extends AgentBase implements Runnable {
9899

99100
private final Runnable rateLimiterCallback;
100101
private final boolean rateLimitation;
102+
private final PrintStream out;
101103

102104
public Consumer(ConsumerParameters parameters) {
103105
super(
@@ -124,6 +126,7 @@ public Consumer(ConsumerParameters parameters) {
124126

125127
this.queueNames.set(new ArrayList<>(parameters.getQueueNames()));
126128
this.initialQueueNames = new ArrayList<>(parameters.getQueueNames());
129+
this.out = parameters.getOut();
127130

128131
if (parameters.getConsumerLatenciesIndicator().isVariable()) {
129132
this.consumerLatency =
@@ -372,7 +375,7 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
372375

373376
@Override
374377
public void handleCancel(String consumerTag) {
375-
System.out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
378+
out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
376379
epochMessageCount.set(0);
377380
if (consumerTagBranchMap.containsKey(consumerTag)) {
378381
String qName = consumerTagBranchMap.get(consumerTag);
@@ -393,7 +396,7 @@ public void handleCancel(String consumerTag) {
393396
delay.toMillis(),
394397
TimeUnit.MILLISECONDS);
395398
} else {
396-
System.out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
399+
out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
397400
}
398401
}
399402
}

src/main/java/com/rabbitmq/perf/ConsumerParameters.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.client.Channel;
1919
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
2020
import com.rabbitmq.perf.metrics.PerformanceMetrics;
21+
import java.io.PrintStream;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.concurrent.ExecutorService;
@@ -60,6 +61,16 @@ public class ConsumerParameters {
6061

6162
private int id;
6263
private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;
64+
private PrintStream out = System.out;
65+
66+
ConsumerParameters setOut(PrintStream out) {
67+
this.out = out;
68+
return this;
69+
}
70+
71+
PrintStream getOut() {
72+
return out;
73+
}
6374

6475
public Channel getChannel() {
6576
return channel;

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
2525
import com.rabbitmq.perf.metrics.PerformanceMetrics;
2626
import java.io.IOException;
27+
import java.io.PrintStream;
2728
import java.time.Duration;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
@@ -137,6 +138,9 @@ public class MulticastParams {
137138

138139
private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;
139140

141+
private PrintStream out = System.out;
142+
private boolean netty = false;
143+
140144
public void setExchangeType(String exchangeType) {
141145
this.exchangeType = exchangeType;
142146
}
@@ -319,6 +323,10 @@ void setConsumerStartDelay(Duration csd) {
319323
this.consumerStartDelay = csd;
320324
}
321325

326+
void setOut(PrintStream out) {
327+
this.out = out;
328+
}
329+
322330
public int getConsumerCount() {
323331
return consumerCount;
324332
}
@@ -485,6 +493,10 @@ public Duration getConsumerStartDelay() {
485493
return consumerStartDelay;
486494
}
487495

496+
PrintStream getOut() {
497+
return out;
498+
}
499+
488500
public void setPolling(boolean polling) {
489501
this.polling = polling;
490502
}
@@ -655,7 +667,8 @@ public Consumer createConsumer(
655667
topologyRecordingScheduledExecutorService)
656668
.setStartListener(this.startListener)
657669
.setRateLimiterFactory(this.rateLimiterFactory)
658-
.setFunctionalLogger(this.functionalLogger));
670+
.setFunctionalLogger(this.functionalLogger)
671+
.setOut(this.out));
659672
this.topologyHandler.next();
660673
return consumer;
661674
}
@@ -761,6 +774,14 @@ public void setProducerSchedulerThreadCount(int producerSchedulerThreadCount) {
761774
this.producerSchedulerThreadCount = producerSchedulerThreadCount;
762775
}
763776

777+
void setNetty(boolean netty) {
778+
this.netty = netty;
779+
}
780+
781+
boolean netty() {
782+
return this.netty;
783+
}
784+
764785
/**
765786
* Contract to handle the creation and configuration of resources. E.g. creation of queues,
766787
* binding exchange to queues.

0 commit comments

Comments
 (0)