@@ -70,12 +70,14 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory
70
70
private final Function <String , SslContext > sslContextFactory ;
71
71
private final Consumer <Channel > channelCustomizer ;
72
72
private final Consumer <Bootstrap > bootstrapCustomizer ;
73
+ private final Duration enqueuingTimeout ;
73
74
74
75
public NettyFrameHandlerFactory (
75
76
EventLoopGroup eventLoopGroup ,
76
77
Consumer <Channel > channelCustomizer ,
77
78
Consumer <Bootstrap > bootstrapCustomizer ,
78
79
Function <String , SslContext > sslContextFactory ,
80
+ Duration enqueuingTimeout ,
79
81
int connectionTimeout ,
80
82
SocketConfigurator configurator ,
81
83
int maxInboundMessageBodySize ) {
@@ -85,6 +87,7 @@ public NettyFrameHandlerFactory(
85
87
this .channelCustomizer = channelCustomizer == null ? Utils .noOpConsumer () : channelCustomizer ;
86
88
this .bootstrapCustomizer =
87
89
bootstrapCustomizer == null ? Utils .noOpConsumer () : bootstrapCustomizer ;
90
+ this .enqueuingTimeout = enqueuingTimeout ;
88
91
}
89
92
90
93
private static void closeNettyState (Channel channel , EventLoopGroup eventLoopGroup ) {
@@ -127,6 +130,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
127
130
addr ,
128
131
sslContext ,
129
132
this .eventLoopGroup ,
133
+ this .enqueuingTimeout ,
130
134
this .channelCustomizer ,
131
135
this .bootstrapCustomizer );
132
136
}
@@ -146,6 +150,7 @@ private static final class NettyFrameHandler implements FrameHandler {
146
150
'A' , 'M' , 'Q' , 'P' , 0 , AMQP .PROTOCOL .MAJOR , AMQP .PROTOCOL .MINOR , AMQP .PROTOCOL .REVISION
147
151
};
148
152
private final EventLoopGroup eventLoopGroup ;
153
+ private final Duration enqueuingTimeout ;
149
154
private final Channel channel ;
150
155
private final AmqpHandler handler ;
151
156
private final AtomicBoolean closed = new AtomicBoolean (false );
@@ -155,9 +160,11 @@ private NettyFrameHandler(
155
160
Address addr ,
156
161
SslContext sslContext ,
157
162
EventLoopGroup elg ,
163
+ Duration enqueuingTimeout ,
158
164
Consumer <Channel > channelCustomizer ,
159
165
Consumer <Bootstrap > bootstrapCustomizer )
160
166
throws IOException {
167
+ this .enqueuingTimeout = enqueuingTimeout ;
161
168
Bootstrap b = new Bootstrap ();
162
169
bootstrapCustomizer .accept (b );
163
170
if (b .config ().group () == null ) {
@@ -310,7 +317,8 @@ public void writeFrame(Frame frame) throws IOException {
310
317
this .doWriteFrame (frame );
311
318
} else {
312
319
try {
313
- boolean canWriteNow = this .handler .writableLatch ().await (10 , SECONDS );
320
+ boolean canWriteNow =
321
+ this .handler .writableLatch ().await (enqueuingTimeout .toMillis (), MILLISECONDS );
314
322
if (canWriteNow ) {
315
323
this .doWriteFrame (frame );
316
324
} else {
0 commit comments