Skip to content

Commit 84e5749

Browse files
committed
kafka series
1 parent 8aa85dd commit 84e5749

8 files changed

+2503
-0
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
**前言**
2+
3+
上篇文章讲解了 Kafka 的基础概念和架构,了解了基本概念之后,必须得实践一波了,所谓“实践才是检验真理的唯一办法”,后续系列关于 Kafka 的文章都以 kafka_2.11-0.9.0.0 为例;另外为了让大家快速入门,本文只提供单机版的安装实战教程,如果有想尝试集群方案的,后面在出一篇集群安装的教程,废话不多说了,直接开干。
4+
5+
## **安装**
6+
7+
### **1\. 下载**
8+
9+
版本号:kafka_2.11-0.9.0.0
10+
11+
下载地址:[http://kafka.apache.org/downloads](https://link.zhihu.com/?target=http%3A//kafka.apache.org/downloads)
12+
13+
### **2\. 安装**
14+
15+
16+
17+
```
18+
# 安装目录
19+
$ pwd
20+
/Users/my/software/study
21+
22+
# 减压
23+
$ sudo tar -zxvf kafka_2.11-0.9.0.0.tgz
24+
25+
# 重命名
26+
$ sudo mv kafka_2.11-0.9.0.0.tgz kafka-0.9
27+
28+
# 查看目录结构
29+
$ cd kafka-0.9 && ls
30+
LICENSE NOTICE bin config libs site-docs
31+
32+
# 目录结构介绍:
33+
# bin: 存放kafka 客户端和服务端的执行脚本
34+
# config: 存放kafka的一些配置文件
35+
# libs: 存放kafka运行的的jar包
36+
# site-docs: 存放kafka的配置文档说明
37+
38+
# 配置环境变量,方便在任意目录下运行kafka命令
39+
# 博主使用的Mac,所以配置在了 ~/.bash_profile文件中,
40+
# Linux中则配置在 ~/.bashrc 或者 ~/.zshrc文件中
41+
$ vim ~/.bash_profile
42+
43+
export KAFKA_HOME=/Users/haikuan1/software/study/kafka-0.9
44+
export PATH=$PATH:$JAVA_HOME:$KAFKA_HOME/bin
45+
46+
# 使得环境变量生效
47+
$ source ~/.bash_profile
48+
49+
```
50+
51+
52+
53+
### **3.运行**
54+
55+
### **3.1 启动 zookeeper**
56+
57+
58+
59+
```
60+
# 启动zookeeper,因为kafka的元数据需要保存到zookeeper中
61+
$ bin/zookeeper-server-start.sh config/zookeeper.properties
62+
63+
# 若出现如下信息,则证明zookeeper启动成功了
64+
[2020-04-25 16:23:44,493] INFO Server environment:user.dir=/Users/haikuan1/software/study/kafka-0.10 (org.apache.zookeeper.server.ZooKeeperServer)
65+
[2020-04-25 16:23:44,505] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
66+
[2020-04-25 16:23:44,505] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
67+
[2020-04-25 16:23:44,505] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
68+
[2020-04-25 16:23:44,548] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
69+
70+
```
71+
72+
73+
74+
### **3.2 启动 Kafka server**
75+
76+
77+
78+
```
79+
# 以守护进程的方式启动kafka服务端,去掉 -daemon 参数则关闭当前窗口服务端自动退出
80+
$ bin/kafka-server-start.sh -daemon config/server.properties
81+
82+
```
83+
84+
85+
86+
### **3.3 kafka 基础命令使用**
87+
88+
89+
90+
```
91+
# 1\. 创建一个topic
92+
# --replication-factor:指定副本个数
93+
# --partition:指定partition个数
94+
# --topic:指定topic的名字
95+
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic mytopic
96+
97+
# 2\. 查看创建成功的topic
98+
$ kafka-topics.sh --list --zookeeper localhost:2181
99+
100+
# 3\. 创建生产者和消费者
101+
102+
# 3.1 启动kafka消费端
103+
# --from-beginning:从头开始消费,该特性也表明kafka消息具有持久性
104+
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
105+
106+
# 3.2 启动kafka生产端
107+
# --broker-list:当前的Broker列表,即提供服务的列表
108+
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
109+
110+
```
111+
112+
113+
114+
<figure data-size="normal">
115+
116+
117+
![](https://java-tutorial.oss-cn-shanghai.aliyuncs.com/v2-a1e0c6db02c2822b2ad88db1c3b0b8a7_720w.webp)
118+
119+
</figure>
120+
121+
### **4.使用 Java 连接 kafka 进行测试**
122+
123+
### **4.1 创建一个 maven 工程,引入如下 pom 依赖**
124+
125+
126+
127+
```
128+
<dependency>
129+
<groupId>org.apache.kafka</groupId>
130+
kafka-clients
131+
<version>0.9.0.0</version>
132+
</dependency>
133+
134+
<dependency>
135+
<groupId>org.apache.kafka</groupId>
136+
kafka_2.11
137+
<version>0.9.0.0</version>
138+
</dependency>
139+
140+
```
141+
142+
143+
144+
### **4.2 消费者端代码**
145+
146+
<figure data-size="normal">
147+
148+
149+
![](https://java-tutorial.oss-cn-shanghai.aliyuncs.com/v2-5e9876ca0dc733fe8c2df51d2e42d1ce_720w.webp)
150+
151+
</figure>
152+
153+
### **4.3 生产者端代码**
154+
155+
<figure data-size="normal">
156+
157+
158+
![](https://java-tutorial.oss-cn-shanghai.aliyuncs.com/v2-d1e6bfdf23c2b42e23f30d4430c587e2_720w.webp)
159+
160+
</figure>
161+
162+
### **4.4 消费者端效果图**
163+
164+
<figure data-size="normal">
165+
166+
167+
![](https://java-tutorial.oss-cn-shanghai.aliyuncs.com/v2-1912f5b2b12ac766d746d88a04b9bd28_720w.webp)
168+
169+
</figure>
170+
171+
### **5.总结**
172+
173+
本文介绍了 kafka 单机版安装及简单命令使用,然后使用 Java 实现了生产者和消费者的简单功能,虽然内容可能比较简单,但还是**强烈建议大家手动去实践一下**,从而对 kafka 的架构有一个更深入的理解。
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
**前言**
2+
3+
经过上篇文章的简单实战之后,**今天来聊聊生产者将消息从客户端发送到 Broker 上背后发生了哪些故事**,看不看由你,但是我保证可以本篇文章你一定可以学到应用背后的一些实质东西。
4+
5+
本文我们从以下 4 个方面来探讨下一条消息如何被准确的发送到 Broker 的 partition 上。
6+
7+
**1\. 客户端组件**
8+
9+
**2\. 客户端缓存存储模型**
10+
11+
**3\. 确定消息的 partition 位置**
12+
13+
**4\. 发送线程的工作原理**
14+
15+
* * *
16+
17+
## **客户端组件**
18+
19+
* **KafkaProducer:**
20+
21+
KafkaProducer 是一个生产者客户端的进程,通过该对象启动生产者来发送消息。
22+
23+
* **RecordAccumulator:**
24+
25+
RecordAccumulator 是一个记录收集器,用于收集客户端发送的消息,并将收集到的消息暂存到客户端缓存中。
26+
27+
* **Sender:**
28+
29+
Sender 是一个发送线程,负责读取记录收集器中缓存的批量消息,经过一些中间转换操作,将要发送的数据准备好,然后交由 Selector 进行网络传输。
30+
31+
* **Selector:**
32+
33+
Selector 是一个选择器,用于处理网络连接和读写处理,使用网络连接处理客户端上的网络请求。
34+
35+
通过使用以上四大组件即可完成客户端消息的发送工作。消息在网络中传输的方式只能通过二级制的方式,所以首先需要将消息序列化为二进制形式缓存在客户端,kafka 使用了双端队列的方式将消息缓存起来,然后使用发送线程(Sender)读取队列中的消息交给 Selector 进行网络传输发送给服务端(Broker)
36+
37+
<figure data-size="normal">
38+
39+
40+
![](https://java-tutorial.oss-cn-shanghai.aliyuncs.com/v2-7d57acd1d7dc5942e999e6ffebb28679_720w.webp)
41+
42+
</figure>
43+
44+
以上为发送消息的主流程,附上部分源码供大家参考,接下来分析下几个非常重要流程的具体实现原理。
45+
46+
* * *
47+
48+
## **客户端缓存存储模型**
49+
50+
<figure data-size="normal">
51+
52+
53+
![](https://java-tutorial.oss-cn-shanghai.aliyuncs.com/v2-5da65c5f9f8c0c9082e07c6431e78cd2_720w.webp)
54+
55+
</figure>
56+
57+
从上图可以看出,一条消息首先需要确定要被存储到那个 partition 对应的双端队列上;其次,存储消息的双端队列是以批的维度存储的,即 N 条消息组成一批,一批消息最多存储 N 条,超过后则新建一个组来存储新消息;其次,新来的消息总是从左侧写入,即越靠左侧的消息产生的时间越晚;最后,只有当一批消息凑够 N 条后才会发送给 Broker,否则不会发送到 Broker 上。
58+
59+
了解了客户端存储模型后,来探讨下确定消息的 partition(分区)位置?
60+
61+
* * *
62+
63+
## **确定消息的 partition 位置**
64+
65+
消息可分为两种,一种是指定了 key 的消息,一种是没有指定 key 的消息。
66+
67+
对于指定了 key 的消息,partition 位置的计算方式为:**`Utils.murmur2(key) % numPartitions`**,即先对 key 进行哈希计算,然后在于 partition 个数求余,从而得到该条消息应该被存储在哪个 partition 上。
68+
69+
对于没有指定 key 的消息,partition 位置的计算方式为:**采用 round-robin 方式确定 partition 位置**,即采用轮询的方式,平均的将消息分布到不同的 partition 上,从而避免某些 partition 数据量过大影响 Broker 和消费端性能。
70+
71+
### **注意**
72+
73+
由于 partition 有主副的区分,此处参与计算的 partition 数量是当前有主 partition 的数量,即如果某个 partition 无主的时候,则此 partition 是不能够进行数据写入的。
74+
75+
稍微解释一下,主副 partition 的机制是为了提高 kafka 系统的容错性的,即当某个 Broker 意外宕机时,在此 Broker 上的主 partition 状态为不可读写时(只有主 partition 可对外提供读写服务,副 partition 只有数据备份的功能),kafka 会从主 partition 对应的 N 个副 partition 中挑选一个,并将其状态改为主 partition,从而继续对外提供读写操作。
76+
77+
消息被确定分配到某个 partition 对应记录收集器(即双端队列)后,接下来,发送线程(Sender)从记录收集器中收集满足条件的批数据发送给 Broker,那么发送线程是如何收集满足条件的批数据的?批数据是按照 partition 维度发送的还是按照 Broker 维度发送数据的?
78+
79+
* * *
80+
81+
## **发送线程的工作原理**
82+
83+
Sender 线程的主要工作是收集满足条件的批数据,何为满足条件的批数据?缓存数据是以批维度存储的,当一批数据量达到指定的 N 条时,就满足发送给 Broker 的条件了。
84+
85+
partition 维度和 Broker 维度发送消息模型对比。
86+
87+
<figure data-size="normal">
88+
89+
90+
![](https://java-tutorial.oss-cn-shanghai.aliyuncs.com/v2-36b7c2761f17fb2d6481747523999011_720w.webp)
91+
92+
</figure>
93+
94+
从图中可以看出,左侧按照 partition 维度发送消息,每个 partition 都需要和 Broker 建连,总共发生了四次网络连接。而右侧将分布在同一个 Broker 的 partition 按组聚合后在与 Broker 建连,只需要两次网络连接即可。所以 Kafka 选择右侧的方式。
95+
96+
### **Sender 的主要工作**
97+
98+
第一步:扫描记录收集器中满足条件的批数据,然后将 partition -> 批数据映射转换成 BrokerId -> N 批数据的映射。第二步:Sender 线程会为每个 BrokerId 创建一个客户端请求,然后将请求交给 NetWorkClient,由 NetWrokClient 去真正发送网络请求到 Broker。
99+
100+
### **NetWorkClient 的工作内容**
101+
102+
Sender 线程准备好要发送的数据后,交由 NetWorkClient 来进行网络相关操作。主要包括客户端与服务端的建连、发送客户端请求、接受服务端响应。完成如上一系列的工作主要由如下方法完成。
103+
104+
1. reday()方法。从记录收集器获取准备完毕的节点,并连接所有准备好的节点。
105+
2. send()方法。为每个节点创建一个客户端请求,然后将请求暂时存到节点对应的 Channel(通道)中。
106+
3. poll()方法。该方法会真正轮询网络请求,发送请求给服务端节点和接受服务端的响应。
107+
108+
* * *
109+
110+
## **总结**
111+
112+
以上,即为生产者客户端的一条消息从生产到发送到 Broker 上的全过程。现在是不是就很清晰了呢?也许有些朋友会比较疑惑它的**网络请求模型是什么样的**,作者就猜你会你会问,下一篇我们就来扒开它的神秘面纱看看其究竟是怎么实现的,敬请期待。

0 commit comments

Comments
 (0)