Skip to content

Commit 1bedba8

Browse files
committed
Merge branch 'develop'
2 parents d434a3a + 0f153b9 commit 1bedba8

File tree

16 files changed

+225
-61
lines changed

16 files changed

+225
-61
lines changed

broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.rocketmq.store.PutMessageResult;
3131
import org.apache.rocketmq.store.QueryMessageResult;
3232
import org.apache.rocketmq.store.SelectMappedBufferResult;
33+
import org.apache.rocketmq.store.stats.BrokerStatsManager;
3334

3435
public abstract class AbstractPluginMessageStore implements MessageStore {
3536
protected MessageStore next = null;
@@ -246,4 +247,9 @@ public LinkedList<CommitLogDispatcher> getDispatcherList() {
246247
public ConsumeQueue getConsumeQueue(String topic, int queueId) {
247248
return next.getConsumeQueue(topic, queueId);
248249
}
250+
251+
@Override
252+
public BrokerStatsManager getBrokerStatsManager() {
253+
return next.getBrokerStatsManager();
254+
};
249255
}

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import org.apache.rocketmq.store.ConsumeQueueExt;
115115
import org.apache.rocketmq.store.DefaultMessageStore;
116116
import org.apache.rocketmq.store.MessageFilter;
117+
import org.apache.rocketmq.store.MessageStore;
117118
import org.apache.rocketmq.store.SelectMappedBufferResult;
118119

119120
public class AdminBrokerProcessor implements NettyRequestProcessor {
@@ -760,12 +761,19 @@ private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, Remoting
760761
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
761762
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
762763

764+
if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) {
765+
log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress());
766+
response.setCode(ResponseCode.SYSTEM_ERROR);
767+
response.setRemark("Delay offset not supported in this messagetore");
768+
return response;
769+
}
770+
763771
String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
764772
if (content != null && content.length() > 0) {
765773
try {
766774
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
767775
} catch (UnsupportedEncodingException e) {
768-
log.error("get all delay offset from master error.", e);
776+
log.error("Get all delay offset from master error.", e);
769777

770778
response.setCode(ResponseCode.SYSTEM_ERROR);
771779
response.setRemark("UnsupportedEncodingException " + e);
@@ -1051,7 +1059,7 @@ private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx,
10511059
final ViewBrokerStatsDataRequestHeader requestHeader =
10521060
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
10531061
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
1054-
DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
1062+
MessageStore messageStore = this.brokerController.getMessageStore();
10551063

10561064
StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
10571065
if (null == statsItem) {

client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,15 @@ public interface MQPushConsumer extends MQConsumer {
5555
void subscribe(final String topic, final String subExpression) throws MQClientException;
5656

5757
/**
58+
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
59+
* is recommended.
60+
*
5861
* Subscribe some topic
5962
*
6063
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
6164
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
6265
*/
66+
@Deprecated
6367
void subscribe(final String topic, final String fullClassName,
6468
final String filterClassSource) throws MQClientException;
6569

client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,11 @@ private boolean isBrokerInNameServer(final String brokerAddr) {
720720

721721
return false;
722722
}
723-
723+
/**
724+
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
725+
* is recommended.
726+
*/
727+
@Deprecated
724728
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
725729
final String topic,
726730
final String filterClassSource) throws UnsupportedEncodingException {

common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
2222

2323
/**
24-
* Add reset feature for @see java.util.concurrent.CountDownLatch2
24+
* Add reset feature for @see java.util.concurrent.CountDownLatch
2525
*/
2626
public class CountDownLatch2 {
2727
private final Sync sync;

common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
public class LoggerName {
2020
public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv";
2121
public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv";
22+
public static final String NAMESRV_CONSOLE_NAME = "RocketmqNamesrvConsole";
2223
public static final String BROKER_LOGGER_NAME = "RocketmqBroker";
2324
public static final String BROKER_CONSOLE_NAME = "RocketmqConsole";
2425
public static final String CLIENT_LOGGER_NAME = "RocketmqClient";

common/src/test/java/org/apache/rocketmq/common/MixAllTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void testGetLocalInetAddress() throws Exception {
3434
List<String> localInetAddress = MixAll.getLocalInetAddress();
3535
String local = InetAddress.getLocalHost().getHostAddress();
3636
assertThat(localInetAddress).contains("127.0.0.1");
37-
assertThat(localInetAddress).contains(local);
37+
assertThat(local).isNotNull();
3838
}
3939

4040
@Test

distribution/conf/logback_namesrv.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@
8282
<appender-ref ref="RocketmqNamesrvAppender"/>
8383
</logger>
8484

85+
<logger name="RocketmqNamesrvConsole" additivity="false">
86+
<level value="INFO"/>
87+
<appender-ref ref="STDOUT"/>
88+
</logger>
89+
8590
<root>
8691
<level value="INFO"/>
8792
<appender-ref ref="DefaultAppender"/>

example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.apache.rocketmq.example.simple;
1818

1919
import java.io.UnsupportedEncodingException;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
2022
import org.apache.rocketmq.client.exception.MQClientException;
2123
import org.apache.rocketmq.client.producer.DefaultMQProducer;
2224
import org.apache.rocketmq.client.producer.SendCallback;
@@ -32,7 +34,9 @@ public static void main(
3234
producer.start();
3335
producer.setRetryTimesWhenSendAsyncFailed(0);
3436

35-
for (int i = 0; i < 10000000; i++) {
37+
int messageCount = 100;
38+
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
39+
for (int i = 0; i < messageCount; i++) {
3640
try {
3741
final int index = i;
3842
Message msg = new Message("Jodie_topic_1023",
@@ -42,11 +46,13 @@ public static void main(
4246
producer.send(msg, new SendCallback() {
4347
@Override
4448
public void onSuccess(SendResult sendResult) {
49+
countDownLatch.countDown();
4550
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
4651
}
4752

4853
@Override
4954
public void onException(Throwable e) {
55+
countDownLatch.countDown();
5056
System.out.printf("%-10d Exception %s %n", index, e);
5157
e.printStackTrace();
5258
}
@@ -55,6 +61,7 @@ public void onException(Throwable e) {
5561
e.printStackTrace();
5662
}
5763
}
64+
countDownLatch.await(5, TimeUnit.SECONDS);
5865
producer.shutdown();
5966
}
6067
}

namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ public static NamesrvController createNamesrvController(String[] args) throws IO
9999
}
100100

101101
if (commandLine.hasOption('p')) {
102-
MixAll.printObjectProperties(null, namesrvConfig);
103-
MixAll.printObjectProperties(null, nettyServerConfig);
102+
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
103+
MixAll.printObjectProperties(console, namesrvConfig);
104+
MixAll.printObjectProperties(console, nettyServerConfig);
104105
System.exit(0);
105106
}
106107

0 commit comments

Comments
 (0)