20
20
21
21
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。
22
22
23
- 此时可以选择用 RabbitMQ 提供的事务功能,就是生产者** 发送数据之前** 开启 RabbitMQ 事务 ` channel.txSelect ` ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 ` channel.txRollback ` ,然后重试发送消息;如果收到了消息,那么可以提交事务 ` channel.txCommit ` 。
23
+ 此时可以选择用 RabbitMQ 提供的事务功能,就是生产者** 发送数据之前** 开启 RabbitMQ 事务 ` channel.txSelect() ` ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 ` channel.txRollback() ` ,然后重试发送消息;如果收到了消息,那么可以提交事务 ` channel.txCommit() ` 。
24
24
25
25
``` java
26
- // 开启事务
27
- channel. txSelect
28
26
try {
27
+ // 通过工厂创建连接
28
+ connection = factory. newConnection();
29
+ // 获取通道
30
+ channel = connection. createChannel();
31
+ // 开启事务
32
+ channel. txSelect();
33
+
29
34
// 这里发送消息
30
- } catch (Exception e) {
31
- channel. txRollback
35
+ channel. basicPublish(exchange, routingKey, MessageProperties . PERSISTENT_TEXT_PLAIN , msg. getBytes());
32
36
33
- // 这里再次重发这条消息
34
- }
37
+ // 模拟出现异常
38
+ int result = 1 / 0 ;
35
39
36
- // 提交事务
37
- channel. txCommit
40
+ // 提交事务
41
+ channel. txCommit();
42
+ } catch (IOException | TimeoutException e) {
43
+ // 捕捉异常,回滚事务
44
+ channel. txRollback();
45
+ }
38
46
```
39
47
40
48
但是问题是,RabbitMQ 事务机制(同步)一搞,基本上** 吞吐量会下来,因为太耗性能** 。
@@ -45,19 +53,73 @@ channel.txCommit
45
53
46
54
所以一般在生产者这块** 避免数据丢失** ,都是用 ` confirm ` 机制的。
47
55
56
+ > 已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。
57
+
58
+ 客户端实现生产者 ` confirm ` 有 3 种方式:
59
+
60
+ 1.** 普通 confirm 模式** :每发送一条消息后,调用 ` waitForConfirms() ` 方法,等待服务器端 confirm,如果服务端返回 false 或者在一段时间内都没返回,客户端可以进行消息重发。
61
+
62
+ ``` java
63
+ channel. basicPublish(ConfirmConfig . exchangeName, ConfirmConfig . routingKey, MessageProperties . PERSISTENT_TEXT_PLAIN , ConfirmConfig . msg_10B. getBytes());
64
+ if (! channel. waitForConfirms()) {
65
+ // 消息发送失败
66
+ // ...
67
+ }
68
+ ```
69
+
70
+ 2.** 批量 confirm 模式** :每发送一批消息后,调用 ` waitForConfirms() ` 方法,等待服务端 confirm。
71
+
72
+ ``` java
73
+ channel. confirmSelect();
74
+ for (int i = 0 ; i < batchCount; ++ i) {
75
+ channel. basicPublish(ConfirmConfig . exchangeName, ConfirmConfig . routingKey, MessageProperties . PERSISTENT_TEXT_PLAIN , ConfirmConfig . msg_10B. getBytes());
76
+ }
77
+ if (! channel. waitForConfirms()) {
78
+ // 消息发送失败
79
+ // ...
80
+ }
81
+ ```
82
+
83
+ 3.** 异步 confirm 模式** :提供一个回调方法,服务端 confirm 了一条或者多条消息后客户端会回调这个方法。
84
+
85
+ ``` java
86
+ SortedSet<Long > confirmSet = Collections . synchronizedSortedSet(new TreeSet<Long > ());
87
+ channel. confirmSelect();
88
+ channel. addConfirmListener(new ConfirmListener () {
89
+ public void handleAck (long deliveryTag , boolean multiple ) throws IOException {
90
+ if (multiple) {
91
+ confirmSet. headSet(deliveryTag + 1 ). clear();
92
+ } else {
93
+ confirmSet. remove(deliveryTag);
94
+ }
95
+ }
96
+
97
+ public void handleNack (long deliveryTag , boolean multiple ) throws IOException {
98
+ System . out. println(" Nack, SeqNo: " + deliveryTag + " , multiple: " + multiple);
99
+ if (multiple) {
100
+ confirmSet. headSet(deliveryTag + 1 ). clear();
101
+ } else {
102
+ confirmSet. remove(deliveryTag);
103
+ }
104
+ }
105
+ });
106
+
107
+ while (true ) {
108
+ long nextSeqNo = channel. getNextPublishSeqNo();
109
+ channel. basicPublish(ConfirmConfig . exchangeName, ConfirmConfig . routingKey, MessageProperties . PERSISTENT_TEXT_PLAIN , ConfirmConfig . msg_10B. getBytes());
110
+ confirmSet. add(nextSeqNo);
111
+ }
112
+ ```
113
+
48
114
#### RabbitMQ 弄丢了数据
49
115
50
116
就是 RabbitMQ 自己弄丢了数据,这个你必须** 开启 RabbitMQ 的持久化** ,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,** 恢复之后会自动读取之前存储的数据** ,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,** 可能导致少量数据丢失** ,但是这个概率较小。
51
117
52
118
设置持久化有** 两个步骤** :
53
119
54
- - 创建 queue 的时候将其设置为持久化< br >
120
+ - 创建 queue 的时候将其设置为持久化。这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
55
121
56
- 这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
57
-
58
- - 第二个是发送消息的时候将消息的 ` deliveryMode ` 设置为 2<br >
59
-
60
- 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
122
+ - 第二个是发送消息的时候将消息的 ` deliveryMode ` 设置为 2。就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
61
123
62
124
必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
63
125
@@ -71,6 +133,8 @@ RabbitMQ 如果丢失了数据,主要是因为你消费的时候,**刚消费
71
133
72
134
这个时候得用 RabbitMQ 提供的 ` ack ` 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ` ack ` ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ` ack ` 一把。这样的话,如果你还没处理完,不就没有 ` ack ` 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
73
135
136
+ > 为了保证消息从队列种可靠地到达消费者,RabbitMQ 提供了消息确认机制。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false,RabbitMQ 会等待消费者显式发回 ack 信号后,才从内存(和磁盘,如果是持久化消息)中移去消息。否则,一旦消息被消费者消费,RabbitMQ 会在队列中立即删除它。
137
+
74
138
![ rabbitmq-message-lose-solution] ( ./images/rabbitmq-message-lose-solution.png )
75
139
76
140
### Kafka
0 commit comments