Skip to content

Commit 09fb8b5

Browse files
authored
Merge pull request apache#201 from zhouxinyu/ROCKETMQ-324
[ROCKETMQ-324] Expose an interface for client to specify the async call back executor
2 parents 8c30310 + 37cf2a7 commit 09fb8b5

File tree

6 files changed

+91
-1
lines changed

6 files changed

+91
-1
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,6 +1060,10 @@ public void endTransaction(
10601060
this.defaultMQProducer.getSendMsgTimeout());
10611061
}
10621062

1063+
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
1064+
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
1065+
}
1066+
10631067
public SendResult send(Message msg,
10641068
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
10651069
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);

client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Collection;
2020
import java.util.List;
21+
import java.util.concurrent.ExecutorService;
2122
import org.apache.rocketmq.client.ClientConfig;
2223
import org.apache.rocketmq.client.QueryResult;
2324
import org.apache.rocketmq.client.Validators;
@@ -34,6 +35,7 @@
3435
import org.apache.rocketmq.common.message.MessageQueue;
3536
import org.apache.rocketmq.remoting.RPCHook;
3637
import org.apache.rocketmq.remoting.exception.RemotingException;
38+
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
3739

3840
/**
3941
* This class is the entry point for applications intending to send messages.
@@ -630,6 +632,16 @@ public SendResult send(Collection<Message> msgs, MessageQueue messageQueue,
630632
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
631633
}
632634

635+
/**
636+
* Sets an Executor to be used for executing callback methods.
637+
* If the Executor is not set, {@link NettyRemotingClient#publicExecutor} will be used.
638+
*
639+
* @param callbackExecutor the instance of Executor
640+
*/
641+
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
642+
this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor);
643+
}
644+
633645
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
634646
MessageBatch msgBatch;
635647
try {

client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.HashMap;
2323
import java.util.List;
2424
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
2527
import org.apache.rocketmq.client.ClientConfig;
2628
import org.apache.rocketmq.client.exception.MQBrokerException;
2729
import org.apache.rocketmq.client.exception.MQClientException;
@@ -39,6 +41,7 @@
3941
import org.apache.rocketmq.common.protocol.route.QueueData;
4042
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
4143
import org.apache.rocketmq.remoting.exception.RemotingException;
44+
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
4245
import org.junit.After;
4346
import org.junit.Before;
4447
import org.junit.Test;
@@ -195,6 +198,22 @@ public void run() {
195198
}
196199
}
197200

201+
@Test
202+
public void testSetCallbackExecutor() throws MQClientException {
203+
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
204+
producer = new DefaultMQProducer(producerGroupTemp);
205+
producer.setNamesrvAddr("127.0.0.1:9876");
206+
producer.start();
207+
208+
ExecutorService customized = Executors.newCachedThreadPool();
209+
producer.setCallbackExecutor(customized);
210+
211+
NettyRemotingClient remotingClient = (NettyRemotingClient) producer.getDefaultMQProducerImpl()
212+
.getmQClientFactory().getMQClientAPIImpl().getRemotingClient();
213+
214+
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
215+
}
216+
198217
public static TopicRouteData createTopicRoute() {
199218
TopicRouteData topicRouteData = new TopicRouteData();
200219

remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,7 @@ void invokeOneway(final String addr, final RemotingCommand request, final long t
4646
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
4747
final ExecutorService executor);
4848

49+
void setCallbackExecutor(final ExecutorService callbackExecutor);
50+
4951
boolean isChannelWritable(final String addr);
5052
}

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
8787
private final Lock lockNamesrvChannel = new ReentrantLock();
8888

8989
private final ExecutorService publicExecutor;
90+
91+
/**
92+
* Invoke the callback methods in this executor when process response.
93+
*/
94+
private ExecutorService callbackExecutor;
9095
private final ChannelEventListener channelEventListener;
9196
private DefaultEventExecutorGroup defaultEventExecutorGroup;
9297
private RPCHook rpcHook;
@@ -582,7 +587,12 @@ public RPCHook getRPCHook() {
582587

583588
@Override
584589
public ExecutorService getCallbackExecutor() {
585-
return this.publicExecutor;
590+
return callbackExecutor != null ? callbackExecutor : publicExecutor;
591+
}
592+
593+
@Override
594+
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
595+
this.callbackExecutor = callbackExecutor;
586596
}
587597

588598
static class ChannelWrapper {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.remoting.netty;
18+
19+
import java.lang.reflect.Field;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
import org.mockito.junit.MockitoJUnitRunner;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
28+
@RunWith(MockitoJUnitRunner.class)
29+
public class NettyRemotingClientTest {
30+
private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig());
31+
32+
@Test
33+
public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {
34+
Field field = NettyRemotingClient.class.getDeclaredField("publicExecutor");
35+
field.setAccessible(true);
36+
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(field.get(remotingClient));
37+
38+
ExecutorService customized = Executors.newCachedThreadPool();
39+
remotingClient.setCallbackExecutor(customized);
40+
41+
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
42+
}
43+
}

0 commit comments

Comments
 (0)