Skip to content

Commit 67f74ec

Browse files
committed
🚧 Kafka
1 parent 7e18268 commit 67f74ec

File tree

1 file changed

+303
-0
lines changed

1 file changed

+303
-0
lines changed
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
# Wormhole Flink 最佳实践
2+
3+
## 前置声明
4+
5+
- 需要对流计算的一些基础概念有基础了解,比如:Source、Sink、YARN、Kafka 等
6+
7+
8+
## 基础环境
9+
10+
- 参考官网:<https://edp963.github.io/wormhole/deployment.html>
11+
- 三台 4C8G 服务器 CentOS 7.4
12+
- hostname:`linux-05`
13+
- hostname:`linux-06`
14+
- hostname:`linux-07`
15+
- 必须(版本请不要随便用,而是按照如下说明来):
16+
- 一般情况下,我组件都是放在:`/usr/local`
17+
- JDK 1.8(三台)
18+
- Hadoop 集群(HDFS,YARN)(三台):2.6.5
19+
- Spark 单点(linux-05):2.2.0
20+
- Flink 单点(linux-05):1.5.1
21+
- Zookeeper(linux-05):3.4.13
22+
- Kafka(linux-05):0.10.2.2
23+
- MySQL(linux-05):5.7
24+
- 以上组件安装教程可以查看该教程:[点击我](https://github.com/judasn/Linux-Tutorial)
25+
- 非必须:
26+
- Elasticsearch(支持版本 5.x)(非必须,若无则无法查看 wormhole 处理数据的吞吐和延时)
27+
- Grafana (支持版本 4.x)(非必须,若无则无法查看 wormhole 处理数据的吞吐和延时的图形化展示)
28+
29+
-------------------------------------------------------------------
30+
31+
## Wormhole 安装 + 配置
32+
33+
- 参考官网:<https://edp963.github.io/wormhole/deployment.html>
34+
- 最终环境 application.conf 配置文件参考
35+
36+
```
37+
38+
akka.http.server.request-timeout = 120s
39+
40+
wormholeServer {
41+
cluster.id = "" #optional global uuid
42+
host = "linux-05"
43+
port = 8989
44+
ui.default.language = "Chinese"
45+
token.timeout = 1
46+
token.secret.key = "iytr174395lclkb?lgj~8u;[=L:ljg"
47+
admin.username = "admin" #default admin user name
48+
admin.password = "admin" #default admin user password
49+
}
50+
51+
mysql = {
52+
driver = "slick.driver.MySQLDriver$"
53+
db = {
54+
driver = "com.mysql.jdbc.Driver"
55+
user = "root"
56+
password = "123456"
57+
url = "jdbc:mysql://localhost:3306/wormhole?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
58+
numThreads = 4
59+
minConnections = 4
60+
maxConnections = 10
61+
connectionTimeout = 3000
62+
}
63+
}
64+
65+
ldap = {
66+
enabled = false
67+
user = ""
68+
pwd = ""
69+
url = ""
70+
dc = ""
71+
read.timeout = 3000
72+
read.timeout = 5000
73+
connect = {
74+
timeout = 5000
75+
pool = true
76+
}
77+
}
78+
79+
spark = {
80+
wormholeServer.user = "root" #WormholeServer linux user
81+
wormholeServer.ssh.port = 22 #ssh port, please set WormholeServer linux user can password-less login itself remote
82+
spark.home = "/usr/local/spark"
83+
yarn.queue.name = "default" #WormholeServer submit spark streaming/job queue
84+
wormhole.hdfs.root.path = "hdfs://linux-05/wormhole" #WormholeServer hdfslog data default hdfs root path
85+
yarn.rm1.http.url = "linux-05:8088" #Yarn ActiveResourceManager address
86+
yarn.rm2.http.url = "linux-05:8088" #Yarn StandbyResourceManager address
87+
}
88+
89+
flink = {
90+
home = "/usr/local/flink"
91+
yarn.queue.name = "default"
92+
feedback.state.count=100
93+
checkpoint.enable=false
94+
checkpoint.interval=60000
95+
stateBackend="hdfs://linux-05/flink-checkpoints"
96+
feedback.interval=30
97+
}
98+
99+
zookeeper = {
100+
connection.url = "localhost:2181" #WormholeServer stream and flow interaction channel
101+
wormhole.root.path = "/wormhole" #zookeeper
102+
}
103+
104+
kafka = {
105+
#brokers.url = "localhost:6667" #WormholeServer feedback data store
106+
brokers.url = "linux-05:9092"
107+
zookeeper.url = "localhost:2181"
108+
#topic.refactor = 3
109+
topic.refactor = 1
110+
using.cluster.suffix = false #if true, _${cluster.id} will be concatenated to consumer.feedback.topic
111+
consumer = {
112+
feedback.topic = "wormhole_feedback"
113+
poll-interval = 20ms
114+
poll-timeout = 1s
115+
stop-timeout = 30s
116+
close-timeout = 20s
117+
commit-timeout = 70s
118+
wakeup-timeout = 60s
119+
max-wakeups = 10
120+
session.timeout.ms = 60000
121+
heartbeat.interval.ms = 50000
122+
max.poll.records = 1000
123+
request.timeout.ms = 80000
124+
max.partition.fetch.bytes = 10485760
125+
}
126+
}
127+
128+
#kerberos = {
129+
# keyTab="" #the keyTab will be used on yarn
130+
# spark.principal="" #the principal of spark
131+
# spark.keyTab="" #the keyTab of spark
132+
# server.config="" #the path of krb5.conf
133+
# jaas.startShell.config="" #the path of jaas config file which should be used by start.sh
134+
# jaas.yarn.config="" #the path of jaas config file which will be uploaded to yarn
135+
# server.enabled=false #enable wormhole connect to Kerberized cluster
136+
#}
137+
138+
# choose monitor method among ES、MYSQL
139+
monitor ={
140+
database.type="ES"
141+
}
142+
143+
#Wormhole feedback data store, if doesn't want to config, you will not see wormhole processing delay and throughput
144+
#if not set, please comment it
145+
146+
#elasticSearch.http = {
147+
# url = "http://localhost:9200"
148+
# user = ""
149+
# password = ""
150+
#}
151+
152+
#display wormhole processing delay and throughput data, get admin user token from grafana
153+
#garfana should set to be anonymous login, so you can access the dashboard through wormhole directly
154+
#if not set, please comment it
155+
156+
#grafana = {
157+
# url = "http://localhost:3000"
158+
# admin.token = "jihefouglokoj"
159+
#}
160+
161+
#delete feedback history data on time
162+
maintenance = {
163+
mysql.feedback.remain.maxDays = 7
164+
elasticSearch.feedback.remain.maxDays = 7
165+
}
166+
167+
168+
#Dbus integration, support serveral DBus services, if not set, please comment it
169+
170+
#dbus = {
171+
# api = [
172+
# {
173+
# login = {
174+
# url = "http://localhost:8080/keeper/login"
175+
# email = ""
176+
# password = ""
177+
# }
178+
# synchronization.namespace.url = "http://localhost:8080/keeper/tables/riderSearch"
179+
# }
180+
# ]
181+
#}
182+
```
183+
184+
- 初始化数据库:
185+
- 创建表:`create database wormhole character set utf8;`
186+
- 初始化表结构脚本路径:<https://github.com/edp963/wormhole/blob/master/rider/conf/wormhole.sql>
187+
- 该脚本存在一个问题:初始化脚本和补丁脚本混在一起,所以直接复制执行会有报错,但是报错的部分是不影响
188+
- 我是直接把基础 sql 和补丁 sql 分开执行,方便判断。
189+
- 部署完成,浏览器访问:<http://linux-05:8989>
190+
191+
-------------------------------------------------------------------
192+
193+
## 创建用户
194+
195+
- **参考官网,必须先了解下**<https://edp963.github.io/wormhole/quick-start.html>
196+
- 必须创建用户,后面才能进入 Project 里面创建 Stream / Flow
197+
- 创建的用户类型必须是:`user`
198+
199+
200+
-------------------------------------------------------------------
201+
202+
## 创建 Source 需要涉及的概念
203+
204+
#### 创建 Instance
205+
206+
- Instance 用于绑定各个组件的所在服务连接
207+
- 一般我们都会选择 Kafka 作为 source,后面的基础也是基于 Kafka 作为 Source 的场景
208+
- 假设填写实例名:`source_kafka`
209+
210+
#### 创建 Database
211+
212+
- 各个组件的具体数据库、Topic 等信息
213+
- 假设填写 topic:`source`
214+
215+
216+
#### 创建 Namespace
217+
218+
- wormhole 抽象出来的概念
219+
- 用于数据分类
220+
- 假设填写 Tables:`ums_extension id`
221+
- 配置 schema,记得配置上 ums_ts
222+
223+
```
224+
{
225+
"id": 1,
226+
"name": "test",
227+
"phone": "18074546423",
228+
"city": "Beijing",
229+
"time": "2017-12-22 10:00:00"
230+
}
231+
```
232+
233+
234+
-------------------------------------------------------------------
235+
236+
## 创建 Sink 需要涉及的概念
237+
238+
#### 创建 Instance
239+
240+
- 假设填写实例名:`sink_mysql`
241+
242+
#### 创建 Database
243+
244+
- 假设填写 Database Name:`sink`
245+
- config 参数:`useUnicode=true&characterEncoding=UTF-8&useSSL=false&rewriteBatchedStatements=true`
246+
247+
#### 创建 Namespace
248+
249+
- 假设填写 Tables: `user id`
250+
251+
252+
-------------------------------------------------------------------
253+
254+
## 创建 Project
255+
256+
- 项目标识:`demo`
257+
258+
-------------------------------------------------------------------
259+
260+
261+
## Flink Stream
262+
263+
- Stream 是在 Project 内容页下才能创建
264+
- 一个 Stream 可以有多个 Flow
265+
- 并且是 Project 下面的用户才能创建,admin 用户没有权限
266+
- 要删除 Project 必须先进入 Project 内容页删除所有 Stream 之后 admin 才能删除 Project
267+
- 新建 Stream
268+
- Stream type 类型选择:`Flink`
269+
- 假设填写 Name:`wormhole_stream_test`
270+
271+
## Flink Flow(流式作业)
272+
273+
- Flow 是在 Project 内容页下才能创建
274+
- 并且是 Project 下面的用户才能创建,admin 用户没有权限
275+
- Flow 会关联 source 和 sink
276+
- 要删除 Project 必须先进入 Project 内容页删除所有 Stream 之后 admin 才能删除 Project
277+
- 基于 Stream 新建 Flow
278+
- Pipeline
279+
- Transformation
280+
- <https://edp963.github.io/wormhole/user-guide.html#cep>
281+
- NO_SKIP 滑动窗口
282+
- SKIP_PAST_LAST_EVENT 滚动窗口
283+
- KeyBy 分组字段
284+
- Output
285+
- Agg:将匹配的多条数据做聚合,生成一条数据输出,例:field1:avg,field2:max(目前支持 max/min/avg/sum)
286+
- Detail:将匹配的多条数据逐一输出
287+
- FilteredRow:按条件选择指定的一条数据输出,例:head/last/ field1:min/max
288+
- Confirmation
289+
- 注意:Stream 处于 running 状态时,才可以启动 Flow
290+
291+
292+
-------------------------------------------------------------------
293+
294+
## Kafka 发送测试数据
295+
296+
- `cd /usr/local/kafka/bin`
297+
- `./kafka-console-producer.sh --broker-list linux-05:9092 --topic source --property "parse.key=true" --property "key.separator=@@@"`
298+
- 发送 UMS 流消息协议规范格式:
299+
300+
```
301+
data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 1, "name": "test", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:00:00"}
302+
```
303+

0 commit comments

Comments
 (0)