Skip to content

Commit 082f875

Browse files
朱晔朱晔
authored andcommitted
(补充)使用RMQ的DLX实现延迟重试:rabbitmqdlx
1 parent 2ced285 commit 082f875

File tree

7 files changed

+252
-48
lines changed

7 files changed

+252
-48
lines changed

docker-compose.yml

Lines changed: 62 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -27,54 +27,69 @@ services:
2727
- "25672:25672"
2828
- "61613:61613"
2929
- "1883:1883"
30-
es1:
31-
image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
32-
container_name: es01
33-
environment:
34-
- node.name=es01
35-
- cluster.name=es-docker-cluster
36-
- discovery.seed_hosts=es02,es03
37-
- cluster.initial_master_nodes=es01,es02,es03
38-
- bootstrap.memory_lock=true
39-
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
40-
ulimits:
41-
memlock:
42-
soft: -1
43-
hard: -1
30+
grafana:
31+
image: grafana/grafana
4432
ports:
45-
- "9201:9200"
46-
- "9301:9300"
47-
es2:
48-
image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
49-
container_name: es02
33+
- 3000:3000
5034
environment:
51-
- node.name=es02
52-
- cluster.name=es-docker-cluster
53-
- discovery.seed_hosts=es01,es03
54-
- cluster.initial_master_nodes=es01,es02,es03
55-
- bootstrap.memory_lock=true
56-
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
57-
ulimits:
58-
memlock:
59-
soft: -1
60-
hard: -1
35+
- GF_SERVER_ROOT_UR=http://127.0.0.1
36+
- GF_SECURITY_ADMIN_PASSWORD=admin
37+
influxdb:
38+
image: influxdb
6139
ports:
62-
- "9202:9200"
63-
- "9302:9300"
64-
es3:
65-
image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
66-
container_name: es03
40+
- 8086:8086
41+
- 8083:8083
42+
- 2003:2003
6743
environment:
68-
- node.name=es03
69-
- cluster.name=es-docker-cluster
70-
- discovery.seed_hosts=es01,es02
71-
- cluster.initial_master_nodes=es01,es02,es03
72-
- bootstrap.memory_lock=true
73-
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
74-
ulimits:
75-
memlock:
76-
soft: -1
77-
hard: -1
78-
ports:
79-
- "9203:9200"
80-
- "9303:9300"
44+
- INFLUXDB_GRAPHITE_ENABLED=true
45+
# es1:
46+
# image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
47+
# container_name: es01
48+
# environment:
49+
# - node.name=es01
50+
# - cluster.name=es-docker-cluster
51+
# - discovery.seed_hosts=es02,es03
52+
# - cluster.initial_master_nodes=es01,es02,es03
53+
# - bootstrap.memory_lock=true
54+
# - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
55+
# ulimits:
56+
# memlock:
57+
# soft: -1
58+
# hard: -1
59+
# ports:
60+
# - "9201:9200"
61+
# - "9301:9300"
62+
# es2:
63+
# image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
64+
# container_name: es02
65+
# environment:
66+
# - node.name=es02
67+
# - cluster.name=es-docker-cluster
68+
# - discovery.seed_hosts=es01,es03
69+
# - cluster.initial_master_nodes=es01,es02,es03
70+
# - bootstrap.memory_lock=true
71+
# - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
72+
# ulimits:
73+
# memlock:
74+
# soft: -1
75+
# hard: -1
76+
# ports:
77+
# - "9202:9200"
78+
# - "9302:9300"
79+
# es3:
80+
# image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
81+
# container_name: es03
82+
# environment:
83+
# - node.name=es03
84+
# - cluster.name=es-docker-cluster
85+
# - discovery.seed_hosts=es01,es02
86+
# - cluster.initial_master_nodes=es01,es02,es03
87+
# - bootstrap.memory_lock=true
88+
# - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
89+
# ulimits:
90+
# memlock:
91+
# soft: -1
92+
# hard: -1
93+
# ports:
94+
# - "9203:9200"
95+
# - "9303:9300"
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.geekbang.time.commonmistakes.asyncprocess.rabbitmqdlx;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class CommonMistakesApplication {
8+
public static void main(String[] args) {
9+
SpringApplication.run(CommonMistakesApplication.class, args);
10+
}
11+
}
12+
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.geekbang.time.commonmistakes.asyncprocess.rabbitmqdlx;
2+
3+
public class Consts {
4+
public static final Integer RETRY_INTERNAL = 3000;
5+
public static final Integer RETRY_COUNT = 2;
6+
7+
8+
public static final String EXCHANGE = "worker";
9+
public static final String QUEUE = "worker";
10+
public static final String ROUTING_KEY = "worker";
11+
12+
public static final String BUFFER_QUEUE = "buffer";
13+
public static final String BUFFER_EXCHANGE = "buffer";
14+
public static final String BUFFER_ROUTING_KEY = "buffer";
15+
16+
public static final String DEAD_EXCHANGE = "dead";
17+
public static final String DEAD_QUEUE = "dead";
18+
public static final String DEAD_ROUTING_KEY = "dead";
19+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.geekbang.time.commonmistakes.asyncprocess.rabbitmqdlx;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.web.bind.annotation.GetMapping;
7+
import org.springframework.web.bind.annotation.RequestMapping;
8+
import org.springframework.web.bind.annotation.RestController;
9+
10+
import java.util.concurrent.atomic.AtomicLong;
11+
12+
@RequestMapping("deadletter")
13+
@Slf4j
14+
@RestController
15+
public class DeadLetterController {
16+
17+
AtomicLong atomicLong = new AtomicLong();
18+
@Autowired
19+
private RabbitTemplate rabbitTemplate;
20+
21+
@GetMapping("send")
22+
public void send() {
23+
rabbitTemplate.convertAndSend(Consts.EXCHANGE, Consts.QUEUE, "msg" + atomicLong.incrementAndGet());
24+
}
25+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package org.geekbang.time.commonmistakes.asyncprocess.rabbitmqdlx;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.amqp.core.Message;
7+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
8+
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.messaging.handler.annotation.Payload;
11+
import org.springframework.stereotype.Component;
12+
13+
import java.io.IOException;
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
@Component
18+
@Slf4j
19+
public class MQListener {
20+
@Autowired
21+
private MessagePropertiesConverter messagePropertiesConverter;
22+
23+
@RabbitListener(queues = Consts.QUEUE)
24+
public void handler(@Payload Message message, Channel channel) throws IOException {
25+
String m = new String(message.getBody());
26+
try {
27+
log.info("收到消息:{}", m);
28+
throw new RuntimeException("处理消息失败");
29+
} catch (Exception e) {
30+
Map<String, Object> headers = message.getMessageProperties().getHeaders();
31+
Long retryCount = getRetryCount(headers);
32+
//重试3次
33+
if (retryCount < Consts.RETRY_COUNT) {
34+
log.info("消费消息:{} 异常,准备重试第{}次", m, ++retryCount);
35+
36+
AMQP.BasicProperties rabbitMQProperties =
37+
messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), "UTF-8");
38+
rabbitMQProperties.builder().headers(headers);
39+
channel.basicPublish(Consts.BUFFER_EXCHANGE, Consts.BUFFER_ROUTING_KEY, rabbitMQProperties, message.getBody());
40+
} else {
41+
log.info("消息:{} 已重试 {} 次,发送到死信队列处理!", m, Consts.RETRY_COUNT);
42+
channel.basicPublish(Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY, null, message.getBody());
43+
}
44+
}
45+
}
46+
47+
private long getRetryCount(Map<String, Object> headers) {
48+
long retryCount = 0;
49+
if (null != headers) {
50+
if (headers.containsKey("x-death")) {
51+
List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
52+
if (!deathList.isEmpty()) {
53+
Map<String, Object> deathEntry = deathList.get(0);
54+
retryCount = (Long) deathEntry.get("count");
55+
}
56+
}
57+
}
58+
return retryCount;
59+
}
60+
61+
@RabbitListener(queues = Consts.DEAD_QUEUE)
62+
public void deadHandler(@Payload Message message) {
63+
log.error("收到死信消息: {}", new String(message.getBody()));
64+
}
65+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.geekbang.time.commonmistakes.asyncprocess.rabbitmqdlx;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.springframework.amqp.core.*;
5+
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
6+
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
7+
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
8+
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
12+
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
13+
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
14+
15+
@Configuration
16+
@Slf4j
17+
public class RabbitConfiguration implements RabbitListenerConfigurer {
18+
@Override
19+
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
20+
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
21+
}
22+
23+
@Bean
24+
public MessageHandlerMethodFactory messageHandlerMethodFactory() {
25+
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
26+
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
27+
return messageHandlerMethodFactory;
28+
}
29+
30+
@Bean
31+
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
32+
return new MappingJackson2MessageConverter();
33+
}
34+
35+
@Bean
36+
public MessagePropertiesConverter messagePropertiesConverter() {
37+
return new DefaultMessagePropertiesConverter();
38+
}
39+
40+
@Bean
41+
public Declarables declarablesForWorker() {
42+
Queue queue = new Queue(Consts.QUEUE);
43+
DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE);
44+
return new Declarables(queue, directExchange,
45+
BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY));
46+
}
47+
48+
@Bean
49+
public Declarables declarablesForBuffer() {
50+
Queue queue = QueueBuilder.durable(Consts.BUFFER_QUEUE)
51+
.withArgument("x-dead-letter-exchange", Consts.EXCHANGE)
52+
.withArgument("x-dead-letter-routing-key", Consts.ROUTING_KEY)
53+
.withArgument("x-message-ttl", Consts.RETRY_INTERNAL)
54+
.build();
55+
DirectExchange directExchange = new DirectExchange(Consts.BUFFER_EXCHANGE);
56+
return new Declarables(queue, directExchange,
57+
BindingBuilder.bind(queue).to(directExchange).with(Consts.BUFFER_ROUTING_KEY));
58+
}
59+
60+
@Bean
61+
public Declarables declarablesForDead() {
62+
Queue queue = new Queue(Consts.DEAD_QUEUE);
63+
DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE);
64+
return new Declarables(queue, directExchange,
65+
BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY));
66+
}
67+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
## 异步处理好用,但非常容易用错
22
- 异步处理需要消息补偿闭环:compensation
33
- 注意消息模式是广播还是工作队列:fanoutvswork
4-
- 别让死信堵塞了消息队列:deadletter
4+
- 别让死信堵塞了消息队列:deadletter
5+
- (补充)使用RMQ的DLX实现延迟重试:rabbitmqdlx

0 commit comments

Comments
 (0)