Skip to content

Commit 15ef0f4

Browse files
committed
添加RocketMQ 定时任务
1 parent 6f17e5c commit 15ef0f4

File tree

4 files changed

+98
-6
lines changed

4 files changed

+98
-6
lines changed

rocketmqdemo/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@
44

55
- demo02 普通生产者消费者(异步发送消息)
66

7-
- demo03 顺序消费
7+
- demo03 顺序消费
8+
9+
- demo04 延时消息

rocketmqdemo/src/main/java/com/bruis/rocketmqdemo/demo03/Producer.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.bruis.rocketmqdemo.demo03;
22

33
import org.apache.rocketmq.client.producer.DefaultMQProducer;
4+
import org.apache.rocketmq.client.producer.MessageQueueSelector;
45
import org.apache.rocketmq.client.producer.SendResult;
56
import org.apache.rocketmq.common.message.Message;
7+
import org.apache.rocketmq.common.message.MessageQueue;
68

79
import java.text.SimpleDateFormat;
810
import java.util.ArrayList;
@@ -42,11 +44,20 @@ public static void main(String[] args) throws Exception {
4244
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
4345
Message msg = new Message(TOPIC_NAME, tags[i % tags.length], "KEY" + i, body.getBytes());
4446

45-
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
46-
Long id = (Long) arg; //根据订单id选择发送queue
47-
long index = id % mqs.size();
48-
return mqs.get((int) index);
49-
}, orderList.get(i).getOrderId());//订单id
47+
// SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
48+
// Long id = (Long) arg; //根据订单id选择发送queue
49+
// long index = id % mqs.size();
50+
// return mqs.get((int) index);
51+
// }, orderList.get(i).getOrderId());//订单id
52+
53+
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
54+
@Override
55+
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
56+
Long id = (Long) arg;
57+
long index = id % mqs.size();
58+
return mqs.get((int) index);
59+
}
60+
}, orderList.get(i).getOrderId());
5061

5162
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
5263
sendResult.getSendStatus(),
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.bruis.rocketmqdemo.demo04;
2+
3+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4+
import org.apache.rocketmq.client.consumer.listener.*;
5+
import org.apache.rocketmq.common.message.MessageExt;
6+
import java.util.List;
7+
8+
/**
9+
*
10+
* 延时消息消费者
11+
*
12+
* @author lhy
13+
* @date 2021/7/24
14+
*/
15+
public class ScheduledMessageConsumer {
16+
17+
public static void main(String[] args) throws Exception {
18+
19+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ScheduledMessageProducer.GROUP_NAME);
20+
// 订阅指定的topic
21+
consumer.subscribe(ScheduledMessageProducer.TOPIC_NAME, "*");
22+
consumer.setNamesrvAddr(ScheduledMessageProducer.NAMESRV_ADDRESS);
23+
// 注册消息监听者
24+
consumer.registerMessageListener(new MessageListenerConcurrently() {
25+
26+
// 消费消息
27+
@Override
28+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
29+
for (MessageExt message : messages) {
30+
// Print approximate delay time period
31+
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
32+
}
33+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
34+
}
35+
});
36+
consumer.start();
37+
}
38+
39+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.bruis.rocketmqdemo.demo04;
2+
3+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
4+
import org.apache.rocketmq.common.message.Message;
5+
6+
/**
7+
*
8+
* 延时消息生产
9+
*
10+
* @author lhy
11+
* @date 2021/7/24
12+
*/
13+
public class ScheduledMessageProducer {
14+
15+
public static final String GROUP_NAME = "scheduled_group";
16+
17+
public static final String TOPIC_NAME = "scheduled_test_topic";
18+
19+
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
20+
21+
public static void main(String[] args) throws Exception {
22+
23+
DefaultMQProducer producer = new DefaultMQProducer(GROUP_NAME);
24+
producer.setNamesrvAddr(NAMESRV_ADDRESS);
25+
producer.start();
26+
27+
int totalMessagesToSend = 100;
28+
29+
for (int i = 0; i < totalMessagesToSend; i++) {
30+
Message message = new Message(TOPIC_NAME, ("Hello Scheduled Message " + i).getBytes());
31+
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
32+
message.setDelayTimeLevel(3);
33+
// 发送消息
34+
producer.send(message);
35+
}
36+
37+
producer.shutdown();
38+
}
39+
40+
}

0 commit comments

Comments
 (0)