1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
3
+ * contributor license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright ownership.
5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
6
+ * (the "License"); you may not use this file except in compliance with
7
+ * the License. You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+ package org .apache .rocketmq .remoting .netty ;
18
+
19
+ import java .util .concurrent .Semaphore ;
20
+ import org .apache .rocketmq .remoting .InvokeCallback ;
21
+ import org .apache .rocketmq .remoting .common .SemaphoreReleaseOnlyOnce ;
22
+ import org .apache .rocketmq .remoting .protocol .RemotingCommand ;
23
+ import org .junit .Test ;
24
+ import org .junit .runner .RunWith ;
25
+ import org .mockito .Spy ;
26
+ import org .mockito .junit .MockitoJUnitRunner ;
27
+
28
+ import static org .assertj .core .api .Assertions .assertThat ;
29
+ import static org .mockito .Mockito .when ;
30
+
31
+ @ RunWith (MockitoJUnitRunner .class )
32
+ public class NettyRemotingAbstractTest {
33
+ @ Spy
34
+ private NettyRemotingAbstract remotingAbstract = new NettyRemotingClient (new NettyClientConfig ());
35
+
36
+ @ Test
37
+ public void testProcessResponseCommand () throws InterruptedException {
38
+ final Semaphore semaphore = new Semaphore (0 );
39
+ ResponseFuture responseFuture = new ResponseFuture (1 , 3000 , new InvokeCallback () {
40
+ @ Override
41
+ public void operationComplete (final ResponseFuture responseFuture ) {
42
+ assertThat (semaphore .availablePermits ()).isEqualTo (0 );
43
+ }
44
+ }, new SemaphoreReleaseOnlyOnce (semaphore ));
45
+
46
+ remotingAbstract .responseTable .putIfAbsent (1 , responseFuture );
47
+
48
+ RemotingCommand response = RemotingCommand .createResponseCommand (0 , "Foo" );
49
+ response .setOpaque (1 );
50
+ remotingAbstract .processResponseCommand (null , response );
51
+
52
+ // Acquire the release permit after call back
53
+ semaphore .acquire (1 );
54
+ assertThat (semaphore .availablePermits ()).isEqualTo (0 );
55
+ }
56
+
57
+ @ Test
58
+ public void testProcessResponseCommand_NullCallBack () throws InterruptedException {
59
+ final Semaphore semaphore = new Semaphore (0 );
60
+ ResponseFuture responseFuture = new ResponseFuture (1 , 3000 , null ,
61
+ new SemaphoreReleaseOnlyOnce (semaphore ));
62
+
63
+ remotingAbstract .responseTable .putIfAbsent (1 , responseFuture );
64
+
65
+ RemotingCommand response = RemotingCommand .createResponseCommand (0 , "Foo" );
66
+ response .setOpaque (1 );
67
+ remotingAbstract .processResponseCommand (null , response );
68
+
69
+ assertThat (semaphore .availablePermits ()).isEqualTo (1 );
70
+ }
71
+
72
+ @ Test
73
+ public void testProcessResponseCommand_RunCallBackInCurrentThread () throws InterruptedException {
74
+ final Semaphore semaphore = new Semaphore (0 );
75
+ ResponseFuture responseFuture = new ResponseFuture (1 , 3000 , new InvokeCallback () {
76
+ @ Override
77
+ public void operationComplete (final ResponseFuture responseFuture ) {
78
+ assertThat (semaphore .availablePermits ()).isEqualTo (0 );
79
+ }
80
+ }, new SemaphoreReleaseOnlyOnce (semaphore ));
81
+
82
+ remotingAbstract .responseTable .putIfAbsent (1 , responseFuture );
83
+ when (remotingAbstract .getCallbackExecutor ()).thenReturn (null );
84
+
85
+ RemotingCommand response = RemotingCommand .createResponseCommand (0 , "Foo" );
86
+ response .setOpaque (1 );
87
+ remotingAbstract .processResponseCommand (null , response );
88
+
89
+ // Acquire the release permit after call back finished in current thread
90
+ semaphore .acquire (1 );
91
+ assertThat (semaphore .availablePermits ()).isEqualTo (0 );
92
+ }
93
+ }
0 commit comments