Skip to content

Commit f3c9e94

Browse files
committed
Add enqueuing timeout option in Netty configuration
Part of the back-pressure mechanism. References #1663
1 parent d8d0c37 commit f3c9e94

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.net.URLDecoder;
3636
import java.security.KeyManagementException;
3737
import java.security.NoSuchAlgorithmException;
38+
import java.time.Duration;
3839
import java.util.*;
3940
import java.util.Map.Entry;
4041
import java.util.concurrent.*;
@@ -1097,6 +1098,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
10971098
this.nettyConf.channelCustomizer,
10981099
this.nettyConf.bootstrapCustomizer,
10991100
this.nettyConf.sslContextFactory,
1101+
this.nettyConf.enqueuingTimeout,
11001102
connectionTimeout,
11011103
socketConf,
11021104
maxInboundMessageBodySize);
@@ -1917,6 +1919,7 @@ public static final class NettyConfiguration {
19171919
private Consumer<io.netty.channel.Channel> channelCustomizer = ch -> {};
19181920
private Consumer<Bootstrap> bootstrapCustomizer = b -> {};
19191921
private Function<String, SslContext> sslContextFactory;
1922+
private Duration enqueuingTimeout = Duration.ofSeconds(10);
19201923

19211924
private NettyConfiguration(ConnectionFactory cf) {
19221925
this.cf = cf;
@@ -1990,6 +1993,19 @@ public NettyConfiguration sslContextFactory(Function<String, SslContext> sslCont
19901993
return this;
19911994
}
19921995

1996+
/**
1997+
* Set the timeout to enqueue outbound frames.
1998+
*
1999+
* <p>Default is 10 seconds.
2000+
*
2001+
* @param enqueuingTimeout the enqueuing timeout
2002+
* @return this configuration instance
2003+
*/
2004+
public NettyConfiguration enqueuingTimeout(Duration enqueuingTimeout) {
2005+
this.enqueuingTimeout = enqueuingTimeout;
2006+
return this;
2007+
}
2008+
19932009
/**
19942010
* Go back to the connection factory.
19952011
*

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,14 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory
7070
private final Function<String, SslContext> sslContextFactory;
7171
private final Consumer<Channel> channelCustomizer;
7272
private final Consumer<Bootstrap> bootstrapCustomizer;
73+
private final Duration enqueuingTimeout;
7374

7475
public NettyFrameHandlerFactory(
7576
EventLoopGroup eventLoopGroup,
7677
Consumer<Channel> channelCustomizer,
7778
Consumer<Bootstrap> bootstrapCustomizer,
7879
Function<String, SslContext> sslContextFactory,
80+
Duration enqueuingTimeout,
7981
int connectionTimeout,
8082
SocketConfigurator configurator,
8183
int maxInboundMessageBodySize) {
@@ -85,6 +87,7 @@ public NettyFrameHandlerFactory(
8587
this.channelCustomizer = channelCustomizer == null ? Utils.noOpConsumer() : channelCustomizer;
8688
this.bootstrapCustomizer =
8789
bootstrapCustomizer == null ? Utils.noOpConsumer() : bootstrapCustomizer;
90+
this.enqueuingTimeout = enqueuingTimeout;
8891
}
8992

9093
private static void closeNettyState(Channel channel, EventLoopGroup eventLoopGroup) {
@@ -127,6 +130,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
127130
addr,
128131
sslContext,
129132
this.eventLoopGroup,
133+
this.enqueuingTimeout,
130134
this.channelCustomizer,
131135
this.bootstrapCustomizer);
132136
}
@@ -146,6 +150,7 @@ private static final class NettyFrameHandler implements FrameHandler {
146150
'A', 'M', 'Q', 'P', 0, AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR, AMQP.PROTOCOL.REVISION
147151
};
148152
private final EventLoopGroup eventLoopGroup;
153+
private final Duration enqueuingTimeout;
149154
private final Channel channel;
150155
private final AmqpHandler handler;
151156
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -155,9 +160,11 @@ private NettyFrameHandler(
155160
Address addr,
156161
SslContext sslContext,
157162
EventLoopGroup elg,
163+
Duration enqueuingTimeout,
158164
Consumer<Channel> channelCustomizer,
159165
Consumer<Bootstrap> bootstrapCustomizer)
160166
throws IOException {
167+
this.enqueuingTimeout = enqueuingTimeout;
161168
Bootstrap b = new Bootstrap();
162169
bootstrapCustomizer.accept(b);
163170
if (b.config().group() == null) {
@@ -310,7 +317,8 @@ public void writeFrame(Frame frame) throws IOException {
310317
this.doWriteFrame(frame);
311318
} else {
312319
try {
313-
boolean canWriteNow = this.handler.writableLatch().await(10, SECONDS);
320+
boolean canWriteNow =
321+
this.handler.writableLatch().await(enqueuingTimeout.toMillis(), MILLISECONDS);
314322
if (canWriteNow) {
315323
this.doWriteFrame(frame);
316324
} else {

0 commit comments

Comments
 (0)