Skip to content

Commit 2519a5d

Browse files
committed
add springcloud alibaba
1 parent f3db1ac commit 2519a5d

File tree

6 files changed

+3739
-0
lines changed

6 files changed

+3739
-0
lines changed
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
1. RocketMQ 简介
2+
3+
RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
4+
5+
RocketMQ 特点
6+
7+
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型;
8+
- 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递,RocketMQ 可以保证严格的消息顺序,而ActiveMQ 无法保证;
9+
- 支持拉(Pull)和推(Push)两种消息模式;Push 好理解,比如在消费者端设置 Listener 回调;而 Pull,控制权在于应用,即应用需要主动的调用拉消息方法从 Broker 获取消息,这里面存在一个消费位置记录的问题(如果不记录,会导致消息重复消费);
10+
- 单一队列百万消息的堆积能力;RocketMQ 提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟;
11+
- 支持多种消息协议,如 JMS、MQTT 等;
12+
- 分布式高可用的部署架构,满足至少一次消息传递语义;RocketMQ 原生就是支持分布式的,而ActiveMQ 原生存在单点性;
13+
- 提供 docker 镜像用于隔离测试和云集群部署;
14+
- 提供配置、指标和监控等功能丰富的 Dashboard。
15+
16+
Broker
17+
18+
Broker 其实就是 RocketMQ 服务器,负责存储消息、转发消息。Broker 在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
19+
20+
Broker Server 是 RocketMQ 真正的业务核心,包含了多个重要的子模块:
21+
22+
- 路由模块:整个 Broker 的实体,负责处理来自 clients 端的请求。
23+
- 客户端管理:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息
24+
- 存储服务:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。
25+
- 高可用服务:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
26+
- 消息索引服务:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。
27+
28+
NameServer
29+
30+
NameServer 是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。
31+
32+
主要包括两个功能:
33+
34+
- Broker 管理:NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
35+
- 路由信息管理:给 Producer 和 Consumer 提供服务获取 Broker 列表。每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。
36+
37+
2. 使用 Docker 快速搭建 RocketMQ 4.4
38+
39+
rocketmq 需要部署 broker 与 nameserver ,考虑到分开部署比较麻烦,这里将会使用 docker-compose。另外,还需要搭建一个 web 可视化控制台,可以监控 mq 服务状态,以及消息消费情况,这里使用 rocketmq-console,同样该程序也将使用 docker 安装。
40+
41+
1. 在 linux 服务器上选择并建立目录;
42+
43+
mkdir rocketmq-docker
44+
复制代码
45+
46+
1. 进入 rocketmq-docker 目录,建立一个名为 broker.conf 的配置文件,内容如下:
47+
48+
# 所属集群名称,如果节点较多可以配置多个
49+
brokerClusterName = DefaultCluster
50+
# broker名称,master和slave使用相同的名称,表明他们的主从关系
51+
brokerName = broker-a
52+
# 0表示Master,大于0表示不同的slave
53+
brokerId = 0
54+
# 表示几点做消息删除动作,默认是凌晨4点
55+
deleteWhen = 04
56+
# 在磁盘上保留消息的时长,单位是小时
57+
fileReservedTime = 48
58+
# 有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
59+
brokerRole = ASYNC_MASTER
60+
# 刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
61+
flushDiskType = ASYNC_FLUSH
62+
# 设置broker节点所在服务器的ip地址
63+
# brokerIP1 = 192.168.138.131
64+
复制代码
65+
66+
注意:这里的 brokerIP1 请务必设置,否则默认会成为 docker 容器内部IP导致外网链接不上。
67+
68+
1. 还是在 rocketmq-docker 目录,建立一个名为 rocketmq.yaml 的脚本文件;
69+
70+
71+
72+
rocketmq.yaml 内容如下:
73+
74+
version: '2'
75+
services:
76+
namesrv:
77+
image: rocketmqinc/rocketmq
78+
container_name: rmqnamesrv
79+
ports:
80+
- 9876:9876
81+
volumes:
82+
- /docker/rocketmq/data/namesrv/logs:/home/rocketmq/logs
83+
- /docker/rocketmq/data/namesrv/store:/home/rocketmq/store
84+
command: sh mqnamesrv
85+
broker:
86+
image: rocketmqinc/rocketmq
87+
container_name: rmqbroker
88+
ports:
89+
- 10909:10909
90+
- 10911:10911
91+
- 10912:10912
92+
volumes:
93+
- /docker/rocketmq/data/broker/logs:/home/rocketmq/logs
94+
- /docker/rocketmq/data/broker/store:/home/rocketmq/store
95+
- /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
96+
command: sh mqbroker -n namesrv:9876 -c /opt/rocketmq-4.4.0/conf/broker.conf
97+
depends_on:
98+
- namesrv
99+
environment:
100+
- JAVA_HOME=/usr/lib/jvm/jre
101+
console:
102+
image: styletang/rocketmq-console-ng
103+
container_name: rocketmq-console-ng
104+
ports:
105+
- 8087:8080
106+
depends_on:
107+
- namesrv
108+
environment:
109+
- JAVA_OPTS= -Dlogging.level.root=info -Drocketmq.namesrv.addr=rmqnamesrv:9876
110+
- Dcom.rocketmq.sendMessageWithVIPChannel=false
111+
复制代码
112+
113+
1. 开启 broker 用到的防火墙端口,方便后续使用:
114+
115+
firewall-cmd --zone=public --add-port=10909-10912/tcp --permanent
116+
复制代码
117+
118+
1. 执行 sentinel-dashboard.yaml 脚本启动容器:
119+
120+
docker-compose -f rocketmq.yaml up
121+
复制代码
122+
123+
1. 进入 rocketmq 控制台,我们会发现类似的图表(当然开始肯定是空的):
124+
125+
http://(安装RocketMQ机器的IP):8087
126+
复制代码
127+
128+
129+
130+
1. 我们选择 “集群” 这一栏,进入就能看见我们设置的 broker 的外网IP了;
131+
132+
133+
134+
到这里我们理论上已经完成 RocketMQ 服务端的部署,现在就可以去 Spring 项目中使用客户端了。
135+
136+
3. 在 Spring 项目中引入 RocketMQ 客户端
137+
138+
1. 添加 pom 文件依赖:
139+
140+
<dependency>
141+
<groupId>org.apache.rocketmq</groupId>
142+
rocketmq-spring-boot-starter
143+
<version>2.0.4</version>
144+
</dependency>
145+
复制代码
146+
147+
1. 在 application.yml 添加配置:
148+
149+
server:
150+
port: 10801
151+
152+
spring:
153+
application:
154+
name: (项目名称)-service
155+
156+
rocketmq:
157+
name-server: (安装RocketMQ机器的IP):9876
158+
producer:
159+
group: (项目名称)-group
160+
复制代码
161+
162+
1. 新建一个消息发送类 MessageProducer 作为消息的 生产者:
163+
164+
@Service
165+
public class MessageProducer implements CommandLineRunner {
166+
167+
@Resource
168+
private RocketMQTemplate rocketMQTemplate;
169+
170+
@Override
171+
public void run(String... args) throws Exception {
172+
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
173+
}
174+
175+
}
176+
复制代码
177+
178+
1. 新建一个消息接收类 MessageListener 作为消息的 消费者:
179+
180+
@Slf4j
181+
@Service
182+
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
183+
public class MessageListener implements RocketMQListener<String> {
184+
185+
@Override
186+
public void onMessage(String message) {
187+
log.info("received message: {}", message);
188+
}
189+
190+
}
191+
复制代码
192+
193+
1. 新建一个 MessageProducer 的调用控制器类:
194+
195+
@RestController
196+
@RequestMapping
197+
public class HelloController {
198+
199+
@Resource
200+
private MessageProducer messageProducer;
201+
202+
@RequestMapping("/message")
203+
public void message() throws Exception {
204+
messageProducer.run("");
205+
}
206+
复制代码
207+
208+
1. 启动 Spring 项目,我们来测试一下最简单的消息发送:
209+
210+
GET http://localhost:10801/message
211+
Accept: */*
212+
Cache-Control: no-cache
213+
复制代码
214+
215+
216+
217+
我们也能在 RocketMQ 的管理控制台,大致看到消息的消费情况:
218+
219+
220+
221+
实战到了这里并没有解释一些概念,可能大家会有点犯迷糊。
222+
223+
Topic(主题)相当于一种类型的消息,比如可以设置 Topic1 专门是分销的业务, Topic2 专门是优惠券的业务;而 Group(分组)相当于对生产者和消费者的分组,那我们的微服务既能做生产者也能做消费者,比如可以设置 Group1 是商品的微服务,Group2 是订单的微服务,当然还要区分下是生产者分组还是消费者分组。
224+
225+
- Group:分为ProducerGroup 和 ConsumerGroup, 代表某一类的生产者和消费者,一般来说同一个服务可以作为 Group,同一个 Group 一般来说发送和消费的消息都是一样的。
226+
- Topic:消息主题,一级消息类型,生产者向其发送消息,消费者读取其消息。
227+
- Queue: 分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。
228+
229+
Topic 中有分为了多个 Queue,这其实是我们发送/读取消息通道的最小单位。我们发送消息都需要指定某个写入某个 Queue,拉取消息的时候也需要指定拉取某个 Queue,所以我们的顺序消息可以基于我们的 Queue 维度保持队列有序,如果想做到全局有序那么需要将 Queue 大小设置为1,这样所有的数据都会在 Queue 中有序。
230+
231+
232+
233+
作者:白菜说技术
234+
链接:https://juejin.cn/post/6930869079217717256
235+
来源:稀土掘金
236+
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
237+
238+
# 参考文章
239+
https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF
240+
https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA
241+
https://juejin.cn/post/6931922457741770760
242+
https://github.com/D2C-Cai/herring
243+
http://c.biancheng.net/springcloud

0 commit comments

Comments
 (0)