Skip to content

Commit 0b80f80

Browse files
zhouxinyudongeforever
authored andcommitted
[ROCKETMQ-294] Do flow control on the number and size dimensions when pull message
Author: yukon <yukon@apache.org> Closes apache#171 from zhouxinyu/ROCKETMQ-294.
1 parent bbd27c1 commit 0b80f80

File tree

8 files changed

+476
-14
lines changed

8 files changed

+476
-14
lines changed

client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,42 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
166166
private int consumeConcurrentlyMaxSpan = 2000;
167167

168168
/**
169-
* Flow control threshold
169+
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
170+
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
170171
*/
171172
private int pullThresholdForQueue = 1000;
172173

174+
/**
175+
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
176+
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
177+
*
178+
* <p>
179+
* The size of a message only measured by message body, so it's not accurate
180+
*/
181+
private int pullThresholdSizeForQueue = 100;
182+
183+
/**
184+
* Flow control threshold on topic level, default value is -1(Unlimited)
185+
* <p>
186+
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
187+
* {@code pullThresholdForTopic} if it is't unlimited
188+
* <p>
189+
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
190+
* then pullThresholdForQueue will be set to 100
191+
*/
192+
private int pullThresholdForTopic = -1;
193+
194+
/**
195+
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
196+
* <p>
197+
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
198+
* {@code pullThresholdSizeForTopic} if it is't unlimited
199+
* <p>
200+
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
201+
* assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
202+
*/
203+
private int pullThresholdSizeForTopic = -1;
204+
173205
/**
174206
* Message pull Interval
175207
*/
@@ -407,6 +439,30 @@ public void setPullThresholdForQueue(int pullThresholdForQueue) {
407439
this.pullThresholdForQueue = pullThresholdForQueue;
408440
}
409441

442+
public int getPullThresholdForTopic() {
443+
return pullThresholdForTopic;
444+
}
445+
446+
public void setPullThresholdForTopic(final int pullThresholdForTopic) {
447+
this.pullThresholdForTopic = pullThresholdForTopic;
448+
}
449+
450+
public int getPullThresholdSizeForQueue() {
451+
return pullThresholdSizeForQueue;
452+
}
453+
454+
public void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue) {
455+
this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
456+
}
457+
458+
public int getPullThresholdSizeForTopic() {
459+
return pullThresholdSizeForTopic;
460+
}
461+
462+
public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) {
463+
this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
464+
}
465+
410466
public Map<String, String> getSubscription() {
411467
return subscription;
412468
}

client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
106106
private MessageListener messageListenerInner;
107107
private OffsetStore offsetStore;
108108
private ConsumeMessageService consumeMessageService;
109-
private long flowControlTimes1 = 0;
110-
private long flowControlTimes2 = 0;
109+
private long queueFlowControlTimes = 0;
110+
private long queueMaxSpanFlowControlTimes = 0;
111111

112112
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
113113
this.defaultMQPushConsumer = defaultMQPushConsumer;
@@ -219,25 +219,37 @@ public void pullMessage(final PullRequest pullRequest) {
219219
return;
220220
}
221221

222-
long size = processQueue.getMsgCount().get();
223-
if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
222+
long cachedMessageCount = processQueue.getMsgCount().get();
223+
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
224+
225+
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
226+
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
227+
if ((queueFlowControlTimes++ % 1000) == 0) {
228+
log.warn(
229+
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
230+
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
231+
}
232+
return;
233+
}
234+
235+
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
224236
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
225-
if ((flowControlTimes1++ % 1000) == 0) {
237+
if ((queueFlowControlTimes++ % 1000) == 0) {
226238
log.warn(
227-
"the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
228-
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
239+
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
240+
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
229241
}
230242
return;
231243
}
232244

233245
if (!this.consumeOrderly) {
234246
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
235247
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
236-
if ((flowControlTimes2++ % 1000) == 0) {
248+
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
237249
log.warn(
238250
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
239251
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
240-
pullRequest, flowControlTimes2);
252+
pullRequest, queueMaxSpanFlowControlTimes);
241253
}
242254
return;
243255
}
@@ -732,6 +744,34 @@ private void checkConfig() throws MQClientException {
732744
null);
733745
}
734746

747+
// pullThresholdForTopic
748+
if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
749+
if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
750+
throw new MQClientException(
751+
"pullThresholdForTopic Out of range [1, 6553500]"
752+
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
753+
null);
754+
}
755+
}
756+
757+
// pullThresholdSizeForQueue
758+
if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
759+
throw new MQClientException(
760+
"pullThresholdSizeForQueue Out of range [1, 1024]"
761+
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
762+
null);
763+
}
764+
765+
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
766+
// pullThresholdSizeForTopic
767+
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
768+
throw new MQClientException(
769+
"pullThresholdSizeForTopic Out of range [1, 102400]"
770+
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
771+
null);
772+
}
773+
}
774+
735775
// pullInterval
736776
if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
737777
throw new MQClientException(

client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ProcessQueue {
4545
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
4646
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
4747
private final AtomicLong msgCount = new AtomicLong();
48+
private final AtomicLong msgSize = new AtomicLong();
4849
private final Lock lockConsume = new ReentrantLock();
4950
private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
5051
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
@@ -129,6 +130,7 @@ public boolean putMessage(final List<MessageExt> msgs) {
129130
if (null == old) {
130131
validMsgCnt++;
131132
this.queueOffsetMax = msg.getQueueOffset();
133+
msgSize.addAndGet(msg.getBody().length);
132134
}
133135
}
134136
msgCount.addAndGet(validMsgCnt);
@@ -189,6 +191,7 @@ public long removeMessage(final List<MessageExt> msgs) {
189191
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
190192
if (prev != null) {
191193
removedCnt--;
194+
msgSize.addAndGet(0 - msg.getBody().length);
192195
}
193196
}
194197
msgCount.addAndGet(removedCnt);
@@ -215,6 +218,10 @@ public AtomicLong getMsgCount() {
215218
return msgCount;
216219
}
217220

221+
public AtomicLong getMsgSize() {
222+
return msgSize;
223+
}
224+
218225
public boolean isDropped() {
219226
return dropped;
220227
}
@@ -250,7 +257,10 @@ public long commit() {
250257
this.lockTreeMap.writeLock().lockInterruptibly();
251258
try {
252259
Long offset = this.msgTreeMapTemp.lastKey();
253-
msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
260+
msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
261+
for (MessageExt msg : this.msgTreeMapTemp.values()) {
262+
msgSize.addAndGet(0 - msg.getBody().length);
263+
}
254264
this.msgTreeMapTemp.clear();
255265
if (offset != null) {
256266
return offset + 1;
@@ -334,6 +344,7 @@ public void clear() {
334344
this.msgTreeMap.clear();
335345
this.msgTreeMapTemp.clear();
336346
this.msgCount.set(0);
347+
this.msgSize.set(0);
337348
this.queueOffsetMax = 0L;
338349
} finally {
339350
this.lockTreeMap.writeLock().unlock();
@@ -387,6 +398,7 @@ public void fillProcessQueueInfo(final ProcessQueueInfo info) {
387398
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
388399
info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
389400
info.setCachedMsgCount(this.msgTreeMap.size());
401+
info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024)));
390402
}
391403

392404
if (!this.msgTreeMapTemp.isEmpty()) {

client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,26 @@ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<Messa
5757
long newVersion = System.currentTimeMillis();
5858
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
5959
subscriptionData.setSubVersion(newVersion);
60+
61+
int currentQueueCount = this.processQueueTable.size();
62+
if (currentQueueCount != 0) {
63+
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
64+
if (pullThresholdForTopic != -1) {
65+
int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
66+
log.info("The pullThresholdForQueue is changed from {} to {}",
67+
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
68+
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
69+
}
70+
71+
int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
72+
if (pullThresholdSizeForTopic != -1) {
73+
int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
74+
log.info("The pullThresholdSizeForQueue is changed from {} to {}",
75+
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
76+
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
77+
}
78+
}
79+
6080
// notify broker
6181
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
6282
}

client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
3333
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
3434
import org.apache.rocketmq.client.exception.MQBrokerException;
35+
import org.apache.rocketmq.client.exception.MQClientException;
3536
import org.apache.rocketmq.client.impl.CommunicationMode;
3637
import org.apache.rocketmq.client.impl.FindBrokerResult;
3738
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
@@ -62,6 +63,7 @@
6263
import org.mockito.stubbing.Answer;
6364

6465
import static org.assertj.core.api.Assertions.assertThat;
66+
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
6567
import static org.mockito.ArgumentMatchers.any;
6668
import static org.mockito.ArgumentMatchers.anyBoolean;
6769
import static org.mockito.ArgumentMatchers.anyLong;
@@ -207,6 +209,59 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderly
207209
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
208210
}
209211

212+
@Test
213+
public void testCheckConfig() {
214+
DefaultMQPushConsumer pushConsumer = createPushConsumer();
215+
216+
pushConsumer.setPullThresholdForQueue(65535 + 1);
217+
try {
218+
pushConsumer.start();
219+
failBecauseExceptionWasNotThrown(MQClientException.class);
220+
} catch (MQClientException e) {
221+
assertThat(e).hasMessageContaining("pullThresholdForQueue Out of range [1, 65535]");
222+
}
223+
224+
pushConsumer = createPushConsumer();
225+
pushConsumer.setPullThresholdForTopic(65535 * 100 + 1);
226+
227+
try {
228+
pushConsumer.start();
229+
failBecauseExceptionWasNotThrown(MQClientException.class);
230+
} catch (MQClientException e) {
231+
assertThat(e).hasMessageContaining("pullThresholdForTopic Out of range [1, 6553500]");
232+
}
233+
234+
pushConsumer = createPushConsumer();
235+
pushConsumer.setPullThresholdSizeForQueue(1024 + 1);
236+
try {
237+
pushConsumer.start();
238+
failBecauseExceptionWasNotThrown(MQClientException.class);
239+
} catch (MQClientException e) {
240+
assertThat(e).hasMessageContaining("pullThresholdSizeForQueue Out of range [1, 1024]");
241+
}
242+
243+
pushConsumer = createPushConsumer();
244+
pushConsumer.setPullThresholdSizeForTopic(1024 * 100 + 1);
245+
try {
246+
pushConsumer.start();
247+
failBecauseExceptionWasNotThrown(MQClientException.class);
248+
} catch (MQClientException e) {
249+
assertThat(e).hasMessageContaining("pullThresholdSizeForTopic Out of range [1, 102400]");
250+
}
251+
}
252+
253+
private DefaultMQPushConsumer createPushConsumer() {
254+
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
255+
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
256+
@Override
257+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
258+
ConsumeConcurrentlyContext context) {
259+
return null;
260+
}
261+
});
262+
return pushConsumer;
263+
}
264+
210265
private PullRequest createPullRequest() {
211266
PullRequest pullRequest = new PullRequest();
212267
pullRequest.setConsumerGroup(consumerGroup);

0 commit comments

Comments
 (0)