|
| 1 | +1. RocketMQ 简介 |
| 2 | + |
| 3 | +RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。 |
| 4 | + |
| 5 | +RocketMQ 特点 |
| 6 | + |
| 7 | +- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型; |
| 8 | +- 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递,RocketMQ 可以保证严格的消息顺序,而ActiveMQ 无法保证; |
| 9 | +- 支持拉(Pull)和推(Push)两种消息模式;Push 好理解,比如在消费者端设置 Listener 回调;而 Pull,控制权在于应用,即应用需要主动的调用拉消息方法从 Broker 获取消息,这里面存在一个消费位置记录的问题(如果不记录,会导致消息重复消费); |
| 10 | +- 单一队列百万消息的堆积能力;RocketMQ 提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟; |
| 11 | +- 支持多种消息协议,如 JMS、MQTT 等; |
| 12 | +- 分布式高可用的部署架构,满足至少一次消息传递语义;RocketMQ 原生就是支持分布式的,而ActiveMQ 原生存在单点性; |
| 13 | +- 提供 docker 镜像用于隔离测试和云集群部署; |
| 14 | +- 提供配置、指标和监控等功能丰富的 Dashboard。 |
| 15 | + |
| 16 | +Broker |
| 17 | + |
| 18 | +Broker 其实就是 RocketMQ 服务器,负责存储消息、转发消息。Broker 在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 |
| 19 | + |
| 20 | +Broker Server 是 RocketMQ 真正的业务核心,包含了多个重要的子模块: |
| 21 | + |
| 22 | +- 路由模块:整个 Broker 的实体,负责处理来自 clients 端的请求。 |
| 23 | +- 客户端管理:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息 |
| 24 | +- 存储服务:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。 |
| 25 | +- 高可用服务:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。 |
| 26 | +- 消息索引服务:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。 |
| 27 | + |
| 28 | +NameServer |
| 29 | + |
| 30 | +NameServer 是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。 |
| 31 | + |
| 32 | +主要包括两个功能: |
| 33 | + |
| 34 | +- Broker 管理:NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活; |
| 35 | +- 路由信息管理:给 Producer 和 Consumer 提供服务获取 Broker 列表。每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。 |
| 36 | + |
| 37 | +2. 使用 Docker 快速搭建 RocketMQ 4.4 |
| 38 | + |
| 39 | +rocketmq 需要部署 broker 与 nameserver ,考虑到分开部署比较麻烦,这里将会使用 docker-compose。另外,还需要搭建一个 web 可视化控制台,可以监控 mq 服务状态,以及消息消费情况,这里使用 rocketmq-console,同样该程序也将使用 docker 安装。 |
| 40 | + |
| 41 | +1. 在 linux 服务器上选择并建立目录; |
| 42 | + |
| 43 | + mkdir rocketmq-docker |
| 44 | + 复制代码 |
| 45 | + |
| 46 | +1. 进入 rocketmq-docker 目录,建立一个名为 broker.conf 的配置文件,内容如下: |
| 47 | + |
| 48 | + # 所属集群名称,如果节点较多可以配置多个 |
| 49 | + brokerClusterName = DefaultCluster |
| 50 | + # broker名称,master和slave使用相同的名称,表明他们的主从关系 |
| 51 | + brokerName = broker-a |
| 52 | + # 0表示Master,大于0表示不同的slave |
| 53 | + brokerId = 0 |
| 54 | + # 表示几点做消息删除动作,默认是凌晨4点 |
| 55 | + deleteWhen = 04 |
| 56 | + # 在磁盘上保留消息的时长,单位是小时 |
| 57 | + fileReservedTime = 48 |
| 58 | + # 有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制; |
| 59 | + brokerRole = ASYNC_MASTER |
| 60 | + # 刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要; |
| 61 | + flushDiskType = ASYNC_FLUSH |
| 62 | + # 设置broker节点所在服务器的ip地址 |
| 63 | + # brokerIP1 = 192.168.138.131 |
| 64 | + 复制代码 |
| 65 | + |
| 66 | +注意:这里的 brokerIP1 请务必设置,否则默认会成为 docker 容器内部IP导致外网链接不上。 |
| 67 | + |
| 68 | +1. 还是在 rocketmq-docker 目录,建立一个名为 rocketmq.yaml 的脚本文件; |
| 69 | + |
| 70 | + |
| 71 | + |
| 72 | +rocketmq.yaml 内容如下: |
| 73 | + |
| 74 | + version: '2' |
| 75 | + services: |
| 76 | + namesrv: |
| 77 | + image: rocketmqinc/rocketmq |
| 78 | + container_name: rmqnamesrv |
| 79 | + ports: |
| 80 | + - 9876:9876 |
| 81 | + volumes: |
| 82 | + - /docker/rocketmq/data/namesrv/logs:/home/rocketmq/logs |
| 83 | + - /docker/rocketmq/data/namesrv/store:/home/rocketmq/store |
| 84 | + command: sh mqnamesrv |
| 85 | + broker: |
| 86 | + image: rocketmqinc/rocketmq |
| 87 | + container_name: rmqbroker |
| 88 | + ports: |
| 89 | + - 10909:10909 |
| 90 | + - 10911:10911 |
| 91 | + - 10912:10912 |
| 92 | + volumes: |
| 93 | + - /docker/rocketmq/data/broker/logs:/home/rocketmq/logs |
| 94 | + - /docker/rocketmq/data/broker/store:/home/rocketmq/store |
| 95 | + - /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf |
| 96 | + command: sh mqbroker -n namesrv:9876 -c /opt/rocketmq-4.4.0/conf/broker.conf |
| 97 | + depends_on: |
| 98 | + - namesrv |
| 99 | + environment: |
| 100 | + - JAVA_HOME=/usr/lib/jvm/jre |
| 101 | + console: |
| 102 | + image: styletang/rocketmq-console-ng |
| 103 | + container_name: rocketmq-console-ng |
| 104 | + ports: |
| 105 | + - 8087:8080 |
| 106 | + depends_on: |
| 107 | + - namesrv |
| 108 | + environment: |
| 109 | + - JAVA_OPTS= -Dlogging.level.root=info -Drocketmq.namesrv.addr=rmqnamesrv:9876 |
| 110 | + - Dcom.rocketmq.sendMessageWithVIPChannel=false |
| 111 | + 复制代码 |
| 112 | + |
| 113 | +1. 开启 broker 用到的防火墙端口,方便后续使用: |
| 114 | + |
| 115 | + firewall-cmd --zone=public --add-port=10909-10912/tcp --permanent |
| 116 | + 复制代码 |
| 117 | + |
| 118 | +1. 执行 sentinel-dashboard.yaml 脚本启动容器: |
| 119 | + |
| 120 | + docker-compose -f rocketmq.yaml up |
| 121 | + 复制代码 |
| 122 | + |
| 123 | +1. 进入 rocketmq 控制台,我们会发现类似的图表(当然开始肯定是空的): |
| 124 | + |
| 125 | + http://(安装RocketMQ机器的IP):8087 |
| 126 | + 复制代码 |
| 127 | + |
| 128 | + |
| 129 | + |
| 130 | +1. 我们选择 “集群” 这一栏,进入就能看见我们设置的 broker 的外网IP了; |
| 131 | + |
| 132 | + |
| 133 | + |
| 134 | +到这里我们理论上已经完成 RocketMQ 服务端的部署,现在就可以去 Spring 项目中使用客户端了。 |
| 135 | + |
| 136 | +3. 在 Spring 项目中引入 RocketMQ 客户端 |
| 137 | + |
| 138 | +1. 添加 pom 文件依赖: |
| 139 | + |
| 140 | + <dependency> |
| 141 | + <groupId>org.apache.rocketmq</groupId> |
| 142 | + rocketmq-spring-boot-starter |
| 143 | + <version>2.0.4</version> |
| 144 | + </dependency> |
| 145 | + 复制代码 |
| 146 | + |
| 147 | +1. 在 application.yml 添加配置: |
| 148 | + |
| 149 | + server: |
| 150 | + port: 10801 |
| 151 | + |
| 152 | + spring: |
| 153 | + application: |
| 154 | + name: (项目名称)-service |
| 155 | + |
| 156 | + rocketmq: |
| 157 | + name-server: (安装RocketMQ机器的IP):9876 |
| 158 | + producer: |
| 159 | + group: (项目名称)-group |
| 160 | + 复制代码 |
| 161 | + |
| 162 | +1. 新建一个消息发送类 MessageProducer 作为消息的 生产者: |
| 163 | + |
| 164 | + @Service |
| 165 | + public class MessageProducer implements CommandLineRunner { |
| 166 | + |
| 167 | + @Resource |
| 168 | + private RocketMQTemplate rocketMQTemplate; |
| 169 | + |
| 170 | + @Override |
| 171 | + public void run(String... args) throws Exception { |
| 172 | + rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build()); |
| 173 | + } |
| 174 | + |
| 175 | + } |
| 176 | + 复制代码 |
| 177 | + |
| 178 | +1. 新建一个消息接收类 MessageListener 作为消息的 消费者: |
| 179 | + |
| 180 | + @Slf4j |
| 181 | + @Service |
| 182 | + @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") |
| 183 | + public class MessageListener implements RocketMQListener<String> { |
| 184 | + |
| 185 | + @Override |
| 186 | + public void onMessage(String message) { |
| 187 | + log.info("received message: {}", message); |
| 188 | + } |
| 189 | + |
| 190 | + } |
| 191 | + 复制代码 |
| 192 | + |
| 193 | +1. 新建一个 MessageProducer 的调用控制器类: |
| 194 | + |
| 195 | + @RestController |
| 196 | + @RequestMapping |
| 197 | + public class HelloController { |
| 198 | + |
| 199 | + @Resource |
| 200 | + private MessageProducer messageProducer; |
| 201 | + |
| 202 | + @RequestMapping("/message") |
| 203 | + public void message() throws Exception { |
| 204 | + messageProducer.run(""); |
| 205 | + } |
| 206 | + 复制代码 |
| 207 | + |
| 208 | +1. 启动 Spring 项目,我们来测试一下最简单的消息发送: |
| 209 | + |
| 210 | + GET http://localhost:10801/message |
| 211 | + Accept: */* |
| 212 | + Cache-Control: no-cache |
| 213 | + 复制代码 |
| 214 | + |
| 215 | + |
| 216 | + |
| 217 | +我们也能在 RocketMQ 的管理控制台,大致看到消息的消费情况: |
| 218 | + |
| 219 | + |
| 220 | + |
| 221 | +实战到了这里并没有解释一些概念,可能大家会有点犯迷糊。 |
| 222 | + |
| 223 | +Topic(主题)相当于一种类型的消息,比如可以设置 Topic1 专门是分销的业务, Topic2 专门是优惠券的业务;而 Group(分组)相当于对生产者和消费者的分组,那我们的微服务既能做生产者也能做消费者,比如可以设置 Group1 是商品的微服务,Group2 是订单的微服务,当然还要区分下是生产者分组还是消费者分组。 |
| 224 | + |
| 225 | +- Group:分为ProducerGroup 和 ConsumerGroup, 代表某一类的生产者和消费者,一般来说同一个服务可以作为 Group,同一个 Group 一般来说发送和消费的消息都是一样的。 |
| 226 | +- Topic:消息主题,一级消息类型,生产者向其发送消息,消费者读取其消息。 |
| 227 | +- Queue: 分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。 |
| 228 | + |
| 229 | +Topic 中有分为了多个 Queue,这其实是我们发送/读取消息通道的最小单位。我们发送消息都需要指定某个写入某个 Queue,拉取消息的时候也需要指定拉取某个 Queue,所以我们的顺序消息可以基于我们的 Queue 维度保持队列有序,如果想做到全局有序那么需要将 Queue 大小设置为1,这样所有的数据都会在 Queue 中有序。 |
| 230 | + |
| 231 | + |
| 232 | + |
| 233 | +作者:白菜说技术 |
| 234 | +链接:https://juejin.cn/post/6930869079217717256 |
| 235 | +来源:稀土掘金 |
| 236 | +著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 |
| 237 | + |
| 238 | +# 参考文章 |
| 239 | +https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF |
| 240 | +https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA |
| 241 | +https://juejin.cn/post/6931922457741770760 |
| 242 | +https://github.com/D2C-Cai/herring |
| 243 | +http://c.biancheng.net/springcloud |
0 commit comments