Skip to content

Commit a5a3ded

Browse files
committed
Add enqueuing timeout option in Netty configuration
Part of the back-pressure mechanism. References #1663 (cherry picked from commit f3c9e94) Conflicts: src/main/java/com/rabbitmq/client/ConnectionFactory.java
1 parent 01550b0 commit a5a3ded

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.net.URLDecoder;
3636
import java.security.KeyManagementException;
3737
import java.security.NoSuchAlgorithmException;
38+
import java.time.Duration;
3839
import java.util.*;
3940
import java.util.Map.Entry;
4041
import java.util.concurrent.*;
@@ -1075,6 +1076,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
10751076
this.nettyConf.channelCustomizer,
10761077
this.nettyConf.bootstrapCustomizer,
10771078
this.nettyConf.sslContextFactory,
1079+
this.nettyConf.enqueuingTimeout,
10781080
connectionTimeout,
10791081
socketConf,
10801082
maxInboundMessageBodySize);
@@ -1474,6 +1476,7 @@ public static final class NettyConfiguration {
14741476
private Consumer<io.netty.channel.Channel> channelCustomizer = ch -> {};
14751477
private Consumer<Bootstrap> bootstrapCustomizer = b -> {};
14761478
private Function<String, SslContext> sslContextFactory;
1479+
private Duration enqueuingTimeout = Duration.ofSeconds(10);
14771480

14781481
private NettyConfiguration(ConnectionFactory cf) {
14791482
this.cf = cf;
@@ -1547,6 +1550,19 @@ public NettyConfiguration sslContextFactory(Function<String, SslContext> sslCont
15471550
return this;
15481551
}
15491552

1553+
/**
1554+
* Set the timeout to enqueue outbound frames.
1555+
*
1556+
* <p>Default is 10 seconds.
1557+
*
1558+
* @param enqueuingTimeout the enqueuing timeout
1559+
* @return this configuration instance
1560+
*/
1561+
public NettyConfiguration enqueuingTimeout(Duration enqueuingTimeout) {
1562+
this.enqueuingTimeout = enqueuingTimeout;
1563+
return this;
1564+
}
1565+
15501566
/**
15511567
* Go back to the connection factory.
15521568
*

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,14 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory
7070
private final Function<String, SslContext> sslContextFactory;
7171
private final Consumer<Channel> channelCustomizer;
7272
private final Consumer<Bootstrap> bootstrapCustomizer;
73+
private final Duration enqueuingTimeout;
7374

7475
public NettyFrameHandlerFactory(
7576
EventLoopGroup eventLoopGroup,
7677
Consumer<Channel> channelCustomizer,
7778
Consumer<Bootstrap> bootstrapCustomizer,
7879
Function<String, SslContext> sslContextFactory,
80+
Duration enqueuingTimeout,
7981
int connectionTimeout,
8082
SocketConfigurator configurator,
8183
int maxInboundMessageBodySize) {
@@ -85,6 +87,7 @@ public NettyFrameHandlerFactory(
8587
this.channelCustomizer = channelCustomizer == null ? Utils.noOpConsumer() : channelCustomizer;
8688
this.bootstrapCustomizer =
8789
bootstrapCustomizer == null ? Utils.noOpConsumer() : bootstrapCustomizer;
90+
this.enqueuingTimeout = enqueuingTimeout;
8891
}
8992

9093
private static void closeNettyState(Channel channel, EventLoopGroup eventLoopGroup) {
@@ -127,6 +130,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
127130
addr,
128131
sslContext,
129132
this.eventLoopGroup,
133+
this.enqueuingTimeout,
130134
this.channelCustomizer,
131135
this.bootstrapCustomizer);
132136
}
@@ -146,6 +150,7 @@ private static final class NettyFrameHandler implements FrameHandler {
146150
'A', 'M', 'Q', 'P', 0, AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR, AMQP.PROTOCOL.REVISION
147151
};
148152
private final EventLoopGroup eventLoopGroup;
153+
private final Duration enqueuingTimeout;
149154
private final Channel channel;
150155
private final AmqpHandler handler;
151156
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -155,9 +160,11 @@ private NettyFrameHandler(
155160
Address addr,
156161
SslContext sslContext,
157162
EventLoopGroup elg,
163+
Duration enqueuingTimeout,
158164
Consumer<Channel> channelCustomizer,
159165
Consumer<Bootstrap> bootstrapCustomizer)
160166
throws IOException {
167+
this.enqueuingTimeout = enqueuingTimeout;
161168
Bootstrap b = new Bootstrap();
162169
bootstrapCustomizer.accept(b);
163170
if (b.config().group() == null) {
@@ -310,7 +317,8 @@ public void writeFrame(Frame frame) throws IOException {
310317
this.doWriteFrame(frame);
311318
} else {
312319
try {
313-
boolean canWriteNow = this.handler.writableLatch().await(10, SECONDS);
320+
boolean canWriteNow =
321+
this.handler.writableLatch().await(enqueuingTimeout.toMillis(), MILLISECONDS);
314322
if (canWriteNow) {
315323
this.doWriteFrame(frame);
316324
} else {

0 commit comments

Comments
 (0)