Skip to content

Commit 6c5a950

Browse files
committed
[ROCKETMQ-323] Release semaphore after callback being finished for async invoke
1 parent 8c30310 commit 6c5a950

File tree

2 files changed

+98
-2
lines changed

2 files changed

+98
-2
lines changed

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,14 +257,13 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
257257
if (responseFuture != null) {
258258
responseFuture.setResponseCommand(cmd);
259259

260-
responseFuture.release();
261-
262260
responseTable.remove(opaque);
263261

264262
if (responseFuture.getInvokeCallback() != null) {
265263
executeInvokeCallback(responseFuture);
266264
} else {
267265
responseFuture.putResponse(cmd);
266+
responseFuture.release();
268267
}
269268
} else {
270269
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -287,6 +286,8 @@ public void run() {
287286
responseFuture.executeInvokeCallback();
288287
} catch (Throwable e) {
289288
log.warn("execute callback in executor exception, and callback throw", e);
289+
} finally {
290+
responseFuture.release();
290291
}
291292
}
292293
});
@@ -303,6 +304,8 @@ public void run() {
303304
responseFuture.executeInvokeCallback();
304305
} catch (Throwable e) {
305306
log.warn("executeInvokeCallback Exception", e);
307+
} finally {
308+
responseFuture.release();
306309
}
307310
}
308311
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.remoting.netty;
18+
19+
import java.util.concurrent.Semaphore;
20+
import org.apache.rocketmq.remoting.InvokeCallback;
21+
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
22+
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import org.mockito.Spy;
26+
import org.mockito.junit.MockitoJUnitRunner;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
import static org.mockito.Mockito.when;
30+
31+
@RunWith(MockitoJUnitRunner.class)
32+
public class NettyRemotingAbstractTest {
33+
@Spy
34+
private NettyRemotingAbstract remotingAbstract = new NettyRemotingClient(new NettyClientConfig());
35+
36+
@Test
37+
public void testProcessResponseCommand() throws InterruptedException {
38+
final Semaphore semaphore = new Semaphore(0);
39+
ResponseFuture responseFuture = new ResponseFuture(1, 3000, new InvokeCallback() {
40+
@Override
41+
public void operationComplete(final ResponseFuture responseFuture) {
42+
assertThat(semaphore.availablePermits()).isEqualTo(0);
43+
}
44+
}, new SemaphoreReleaseOnlyOnce(semaphore));
45+
46+
remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
47+
48+
RemotingCommand response = RemotingCommand.createResponseCommand(0, "Foo");
49+
response.setOpaque(1);
50+
remotingAbstract.processResponseCommand(null, response);
51+
52+
// Acquire the release permit after call back
53+
semaphore.acquire(1);
54+
assertThat(semaphore.availablePermits()).isEqualTo(0);
55+
}
56+
57+
@Test
58+
public void testProcessResponseCommand_NullCallBack() throws InterruptedException {
59+
final Semaphore semaphore = new Semaphore(0);
60+
ResponseFuture responseFuture = new ResponseFuture(1, 3000, null,
61+
new SemaphoreReleaseOnlyOnce(semaphore));
62+
63+
remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
64+
65+
RemotingCommand response = RemotingCommand.createResponseCommand(0, "Foo");
66+
response.setOpaque(1);
67+
remotingAbstract.processResponseCommand(null, response);
68+
69+
assertThat(semaphore.availablePermits()).isEqualTo(1);
70+
}
71+
72+
@Test
73+
public void testProcessResponseCommand_RunCallBackInCurrentThread() throws InterruptedException {
74+
final Semaphore semaphore = new Semaphore(0);
75+
ResponseFuture responseFuture = new ResponseFuture(1, 3000, new InvokeCallback() {
76+
@Override
77+
public void operationComplete(final ResponseFuture responseFuture) {
78+
assertThat(semaphore.availablePermits()).isEqualTo(0);
79+
}
80+
}, new SemaphoreReleaseOnlyOnce(semaphore));
81+
82+
remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
83+
when(remotingAbstract.getCallbackExecutor()).thenReturn(null);
84+
85+
RemotingCommand response = RemotingCommand.createResponseCommand(0, "Foo");
86+
response.setOpaque(1);
87+
remotingAbstract.processResponseCommand(null, response);
88+
89+
// Acquire the release permit after call back finished in current thread
90+
semaphore.acquire(1);
91+
assertThat(semaphore.availablePermits()).isEqualTo(0);
92+
}
93+
}

0 commit comments

Comments
 (0)