Skip to content

Commit d7fd27a

Browse files
committed
docs: update mq article
1 parent f13c592 commit d7fd27a

File tree

1 file changed

+79
-15
lines changed

1 file changed

+79
-15
lines changed

docs/high-concurrency/how-to-ensure-the-reliable-transmission-of-messages.md

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,29 @@
2020

2121
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。
2222

23-
此时可以选择用 RabbitMQ 提供的事务功能,就是生产者**发送数据之前**开启 RabbitMQ 事务 `channel.txSelect` ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 `channel.txRollback` ,然后重试发送消息;如果收到了消息,那么可以提交事务 `channel.txCommit`
23+
此时可以选择用 RabbitMQ 提供的事务功能,就是生产者**发送数据之前**开启 RabbitMQ 事务 `channel.txSelect()` ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 `channel.txRollback()` ,然后重试发送消息;如果收到了消息,那么可以提交事务 `channel.txCommit()`
2424

2525
```java
26-
// 开启事务
27-
channel.txSelect
2826
try {
27+
// 通过工厂创建连接
28+
connection = factory.newConnection();
29+
// 获取通道
30+
channel = connection.createChannel();
31+
// 开启事务
32+
channel.txSelect();
33+
2934
// 这里发送消息
30-
} catch (Exception e) {
31-
channel.txRollback
35+
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
3236

33-
// 这里再次重发这条消息
34-
}
37+
// 模拟出现异常
38+
int result = 1 / 0;
3539

36-
// 提交事务
37-
channel.txCommit
40+
// 提交事务
41+
channel.txCommit();
42+
} catch (IOException | TimeoutException e) {
43+
// 捕捉异常,回滚事务
44+
channel.txRollback();
45+
}
3846
```
3947

4048
但是问题是,RabbitMQ 事务机制(同步)一搞,基本上**吞吐量会下来,因为太耗性能**
@@ -45,19 +53,73 @@ channel.txCommit
4553

4654
所以一般在生产者这块**避免数据丢失**,都是用 `confirm` 机制的。
4755

56+
> 已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。
57+
58+
客户端实现生产者 `confirm` 有 3 种方式:
59+
60+
1.**普通 confirm 模式**:每发送一条消息后,调用 `waitForConfirms()` 方法,等待服务器端 confirm,如果服务端返回 false 或者在一段时间内都没返回,客户端可以进行消息重发。
61+
62+
```java
63+
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
64+
if (!channel.waitForConfirms()) {
65+
// 消息发送失败
66+
// ...
67+
}
68+
```
69+
70+
2.**批量 confirm 模式**:每发送一批消息后,调用 `waitForConfirms()` 方法,等待服务端 confirm。
71+
72+
```java
73+
channel.confirmSelect();
74+
for (int i = 0; i < batchCount; ++i) {
75+
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
76+
}
77+
if (!channel.waitForConfirms()) {
78+
// 消息发送失败
79+
// ...
80+
}
81+
```
82+
83+
3.**异步 confirm 模式**:提供一个回调方法,服务端 confirm 了一条或者多条消息后客户端会回调这个方法。
84+
85+
```java
86+
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
87+
channel.confirmSelect();
88+
channel.addConfirmListener(new ConfirmListener() {
89+
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
90+
if (multiple) {
91+
confirmSet.headSet(deliveryTag + 1).clear();
92+
} else {
93+
confirmSet.remove(deliveryTag);
94+
}
95+
}
96+
97+
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
98+
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
99+
if (multiple) {
100+
confirmSet.headSet(deliveryTag + 1).clear();
101+
} else {
102+
confirmSet.remove(deliveryTag);
103+
}
104+
}
105+
});
106+
107+
while (true) {
108+
long nextSeqNo = channel.getNextPublishSeqNo();
109+
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
110+
confirmSet.add(nextSeqNo);
111+
}
112+
```
113+
48114
#### RabbitMQ 弄丢了数据
49115

50116
就是 RabbitMQ 自己弄丢了数据,这个你必须**开启 RabbitMQ 的持久化**,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,**恢复之后会自动读取之前存储的数据**,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,**可能导致少量数据丢失**,但是这个概率较小。
51117

52118
设置持久化有**两个步骤**
53119

54-
- 创建 queue 的时候将其设置为持久化<br>
120+
- 创建 queue 的时候将其设置为持久化。这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
55121

56-
这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
57-
58-
- 第二个是发送消息的时候将消息的 `deliveryMode` 设置为 2<br>
59-
60-
就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
122+
- 第二个是发送消息的时候将消息的 `deliveryMode` 设置为 2。就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
61123

62124
必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
63125

@@ -71,6 +133,8 @@ RabbitMQ 如果丢失了数据,主要是因为你消费的时候,**刚消费
71133

72134
这个时候得用 RabbitMQ 提供的 `ack` 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 `ack` ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 `ack` 一把。这样的话,如果你还没处理完,不就没有 `ack` 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
73135

136+
> 为了保证消息从队列种可靠地到达消费者,RabbitMQ 提供了消息确认机制。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false,RabbitMQ 会等待消费者显式发回 ack 信号后,才从内存(和磁盘,如果是持久化消息)中移去消息。否则,一旦消息被消费者消费,RabbitMQ 会在队列中立即删除它。
137+
74138
![rabbitmq-message-lose-solution](./images/rabbitmq-message-lose-solution.png)
75139

76140
### Kafka

0 commit comments

Comments
 (0)