Skip to content

Commit 45a64fd

Browse files
committed
Include client IP per message queue of consumer progress command output
1 parent ab01386 commit 45a64fd

File tree

1 file changed

+34
-7
lines changed

1 file changed

+34
-7
lines changed

tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@
1616
*/
1717
package org.apache.rocketmq.tools.command.consumer;
1818

19-
import java.util.Collections;
20-
import java.util.Date;
21-
import java.util.LinkedList;
22-
import java.util.List;
2319
import org.apache.commons.cli.CommandLine;
2420
import org.apache.commons.cli.Option;
2521
import org.apache.commons.cli.Options;
@@ -30,7 +26,9 @@
3026
import org.apache.rocketmq.common.admin.ConsumeStats;
3127
import org.apache.rocketmq.common.admin.OffsetWrapper;
3228
import org.apache.rocketmq.common.message.MessageQueue;
29+
import org.apache.rocketmq.common.protocol.body.Connection;
3330
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
31+
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
3432
import org.apache.rocketmq.common.protocol.body.TopicList;
3533
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
3634
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -40,6 +38,13 @@
4038
import org.apache.rocketmq.tools.command.SubCommandException;
4139
import org.slf4j.Logger;
4240

41+
import java.util.Collections;
42+
import java.util.Date;
43+
import java.util.HashMap;
44+
import java.util.LinkedList;
45+
import java.util.List;
46+
import java.util.Map;
47+
4348
public class ConsumerProgressSubCommand implements SubCommand {
4449
private final Logger log = ClientLogger.getLog();
4550

@@ -62,6 +67,24 @@ public Options buildCommandlineOptions(Options options) {
6267
return options;
6368
}
6469

70+
private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
71+
String groupName) {
72+
Map<MessageQueue, String> results = new HashMap<>();
73+
try {
74+
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
75+
for (Connection connection : consumerConnection.getConnectionSet()) {
76+
String clientId = connection.getClientId();
77+
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId,
78+
false);
79+
for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
80+
results.put(messageQueue, clientId.split("@")[0]);
81+
}
82+
}
83+
} catch (Exception ignore) {
84+
}
85+
return results;
86+
}
87+
6588
@Override
6689
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
6790
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -75,13 +98,14 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
7598
List<MessageQueue> mqList = new LinkedList<MessageQueue>();
7699
mqList.addAll(consumeStats.getOffsetTable().keySet());
77100
Collections.sort(mqList);
78-
79-
System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n",
101+
Map<MessageQueue, String> messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
102+
System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %-20s %s%n",
80103
"#Topic",
81104
"#Broker Name",
82105
"#QID",
83106
"#Broker Offset",
84107
"#Consumer Offset",
108+
"#Client IP",
85109
"#Diff",
86110
"#LastTime");
87111

@@ -95,12 +119,15 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
95119
lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
96120
} catch (Exception e) {
97121
}
98-
System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n",
122+
123+
String clientIP = messageQueueAllocationResult.get(mq);
124+
System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20s %-20d %s%n",
99125
UtilAll.frontStringAtLeast(mq.getTopic(), 32),
100126
UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
101127
mq.getQueueId(),
102128
offsetWrapper.getBrokerOffset(),
103129
offsetWrapper.getConsumerOffset(),
130+
null != clientIP ? clientIP : "NA",
104131
diff,
105132
lastTime
106133
);

0 commit comments

Comments
 (0)