16
16
*/
17
17
package org .apache .rocketmq .tools .command .consumer ;
18
18
19
- import java .util .Collections ;
20
- import java .util .Date ;
21
- import java .util .LinkedList ;
22
- import java .util .List ;
23
19
import org .apache .commons .cli .CommandLine ;
24
20
import org .apache .commons .cli .Option ;
25
21
import org .apache .commons .cli .Options ;
30
26
import org .apache .rocketmq .common .admin .ConsumeStats ;
31
27
import org .apache .rocketmq .common .admin .OffsetWrapper ;
32
28
import org .apache .rocketmq .common .message .MessageQueue ;
29
+ import org .apache .rocketmq .common .protocol .body .Connection ;
33
30
import org .apache .rocketmq .common .protocol .body .ConsumerConnection ;
31
+ import org .apache .rocketmq .common .protocol .body .ConsumerRunningInfo ;
34
32
import org .apache .rocketmq .common .protocol .body .TopicList ;
35
33
import org .apache .rocketmq .common .protocol .heartbeat .ConsumeType ;
36
34
import org .apache .rocketmq .common .protocol .heartbeat .MessageModel ;
40
38
import org .apache .rocketmq .tools .command .SubCommandException ;
41
39
import org .slf4j .Logger ;
42
40
41
+ import java .util .Collections ;
42
+ import java .util .Date ;
43
+ import java .util .HashMap ;
44
+ import java .util .LinkedList ;
45
+ import java .util .List ;
46
+ import java .util .Map ;
47
+
43
48
public class ConsumerProgressSubCommand implements SubCommand {
44
49
private final Logger log = ClientLogger .getLog ();
45
50
@@ -62,6 +67,24 @@ public Options buildCommandlineOptions(Options options) {
62
67
return options ;
63
68
}
64
69
70
+ private Map <MessageQueue , String > getMessageQueueAllocationResult (DefaultMQAdminExt defaultMQAdminExt ,
71
+ String groupName ) {
72
+ Map <MessageQueue , String > results = new HashMap <>();
73
+ try {
74
+ ConsumerConnection consumerConnection = defaultMQAdminExt .examineConsumerConnectionInfo (groupName );
75
+ for (Connection connection : consumerConnection .getConnectionSet ()) {
76
+ String clientId = connection .getClientId ();
77
+ ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt .getConsumerRunningInfo (groupName , clientId ,
78
+ false );
79
+ for (MessageQueue messageQueue : consumerRunningInfo .getMqTable ().keySet ()) {
80
+ results .put (messageQueue , clientId .split ("@" )[0 ]);
81
+ }
82
+ }
83
+ } catch (Exception ignore ) {
84
+ }
85
+ return results ;
86
+ }
87
+
65
88
@ Override
66
89
public void execute (CommandLine commandLine , Options options , RPCHook rpcHook ) throws SubCommandException {
67
90
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt (rpcHook );
@@ -75,13 +98,14 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
75
98
List <MessageQueue > mqList = new LinkedList <MessageQueue >();
76
99
mqList .addAll (consumeStats .getOffsetTable ().keySet ());
77
100
Collections .sort (mqList );
78
-
79
- System .out .printf ("%-32s %-32s %-4s %-20s %-20s %-20s %s%n" ,
101
+ Map < MessageQueue , String > messageQueueAllocationResult = getMessageQueueAllocationResult ( defaultMQAdminExt , consumerGroup );
102
+ System .out .printf ("%-32s %-32s %-4s %-20s %-20s %-20s %-20s %s%n" ,
80
103
"#Topic" ,
81
104
"#Broker Name" ,
82
105
"#QID" ,
83
106
"#Broker Offset" ,
84
107
"#Consumer Offset" ,
108
+ "#Client IP" ,
85
109
"#Diff" ,
86
110
"#LastTime" );
87
111
@@ -95,12 +119,15 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
95
119
lastTime = UtilAll .formatDate (new Date (offsetWrapper .getLastTimestamp ()), UtilAll .YYYY_MM_DD_HH_MM_SS );
96
120
} catch (Exception e ) {
97
121
}
98
- System .out .printf ("%-32s %-32s %-4d %-20d %-20d %-20d %s%n" ,
122
+
123
+ String clientIP = messageQueueAllocationResult .get (mq );
124
+ System .out .printf ("%-32s %-32s %-4d %-20d %-20d %-20s %-20d %s%n" ,
99
125
UtilAll .frontStringAtLeast (mq .getTopic (), 32 ),
100
126
UtilAll .frontStringAtLeast (mq .getBrokerName (), 32 ),
101
127
mq .getQueueId (),
102
128
offsetWrapper .getBrokerOffset (),
103
129
offsetWrapper .getConsumerOffset (),
130
+ null != clientIP ? clientIP : "NA" ,
104
131
diff ,
105
132
lastTime
106
133
);
0 commit comments