Skip to content

Commit 126965a

Browse files
committed
[netty#5639] Ensure fireChannelActive() is also called if Channel is closed in connect promise.
Motivation: We need to ensure we also call fireChannelActive() if the Channel is directly closed in a ChannelFutureListener that is belongs to the promise for the connect. Otherwise we will see missing active events. Modifications: Ensure we always call fireChannelActive() if the Channel was active. Result: No missing events.
1 parent f327eaa commit 126965a

File tree

4 files changed

+59
-3
lines changed

4 files changed

+59
-3
lines changed

testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
import io.netty.bootstrap.Bootstrap;
1919
import io.netty.bootstrap.ServerBootstrap;
2020
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelFutureListener;
2122
import io.netty.channel.ChannelHandlerContext;
2223
import io.netty.channel.ChannelInboundHandlerAdapter;
2324
import io.netty.util.concurrent.ImmediateEventExecutor;
2425
import io.netty.util.concurrent.Promise;
2526
import org.junit.Test;
2627

2728
import java.net.InetSocketAddress;
29+
import java.util.concurrent.BlockingQueue;
30+
import java.util.concurrent.LinkedBlockingQueue;
2831

2932
import static org.junit.Assert.*;
3033

@@ -67,6 +70,46 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
6770
}
6871
}
6972

73+
@Test(timeout = 3000)
74+
public void testChannelEventsFiredWhenClosedDirectly() throws Throwable {
75+
run();
76+
}
77+
78+
public void testChannelEventsFiredWhenClosedDirectly(ServerBootstrap sb, Bootstrap cb) throws Throwable {
79+
final BlockingQueue<Integer> events = new LinkedBlockingQueue<Integer>();
80+
81+
Channel sc = null;
82+
Channel cc = null;
83+
try {
84+
sb.childHandler(new ChannelInboundHandlerAdapter());
85+
sc = sb.bind(0).syncUninterruptibly().channel();
86+
87+
cb.handler(new ChannelInboundHandlerAdapter() {
88+
@Override
89+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
90+
events.add(0);
91+
}
92+
93+
@Override
94+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
95+
events.add(1);
96+
}
97+
});
98+
// Connect and directly close again.
99+
cc = cb.connect(sc.localAddress()).addListener(ChannelFutureListener.CLOSE).
100+
syncUninterruptibly().channel();
101+
assertEquals(0, events.take().intValue());
102+
assertEquals(1, events.take().intValue());
103+
} finally {
104+
if (cc != null) {
105+
cc.close();
106+
}
107+
if (sc != null) {
108+
sc.close();
109+
}
110+
}
111+
}
112+
70113
private static void assertLocalAddress(InetSocketAddress address) {
71114
assertTrue(address.getPort() > 0);
72115
assertFalse(address.getAddress().isAnyLocalAddress());

transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -745,12 +745,16 @@ private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
745745
}
746746
active = true;
747747

748+
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
749+
// We still need to ensure we call fireChannelActive() in this case.
750+
boolean active = isActive();
751+
748752
// trySuccess() will return false if a user cancelled the connection attempt.
749753
boolean promiseSet = promise.trySuccess();
750754

751755
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
752756
// because what happened is what happened.
753-
if (!wasActive && isActive()) {
757+
if (!wasActive && active) {
754758
pipeline().fireChannelActive();
755759
}
756760

transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,16 @@ private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
247247
return;
248248
}
249249

250+
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
251+
// We still need to ensure we call fireChannelActive() in this case.
252+
boolean active = isActive();
253+
250254
// trySuccess() will return false if a user cancelled the connection attempt.
251255
boolean promiseSet = promise.trySuccess();
252256

253257
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
254258
// because what happened is what happened.
255-
if (!wasActive && isActive()) {
259+
if (!wasActive && active) {
256260
pipeline().fireChannelActive();
257261
}
258262

transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,13 @@ public void connect(
6969
try {
7070
boolean wasActive = isActive();
7171
doConnect(remoteAddress, localAddress);
72+
73+
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
74+
// We still need to ensure we call fireChannelActive() in this case.
75+
boolean active = isActive();
76+
7277
safeSetSuccess(promise);
73-
if (!wasActive && isActive()) {
78+
if (!wasActive && active) {
7479
pipeline().fireChannelActive();
7580
}
7681
} catch (Throwable t) {

0 commit comments

Comments
 (0)