|
| 1 | +# Wormhole Flink 最佳实践 |
| 2 | + |
| 3 | +## 前置声明 |
| 4 | + |
| 5 | +- 需要对流计算的一些基础概念有基础了解,比如:Source、Sink、YARN、Kafka 等 |
| 6 | + |
| 7 | + |
| 8 | +## 基础环境 |
| 9 | + |
| 10 | +- 参考官网:<https://edp963.github.io/wormhole/deployment.html> |
| 11 | +- 三台 4C8G 服务器 CentOS 7.4 |
| 12 | + - hostname:`linux-05` |
| 13 | + - hostname:`linux-06` |
| 14 | + - hostname:`linux-07` |
| 15 | +- 必须(版本请不要随便用,而是按照如下说明来): |
| 16 | + - 一般情况下,我组件都是放在:`/usr/local` |
| 17 | + - JDK 1.8(三台) |
| 18 | + - Hadoop 集群(HDFS,YARN)(三台):2.6.5 |
| 19 | + - Spark 单点(linux-05):2.2.0 |
| 20 | + - Flink 单点(linux-05):1.5.1 |
| 21 | + - Zookeeper(linux-05):3.4.13 |
| 22 | + - Kafka(linux-05):0.10.2.2 |
| 23 | + - MySQL(linux-05):5.7 |
| 24 | + - 以上组件安装教程可以查看该教程:[点击我](https://github.com/judasn/Linux-Tutorial) |
| 25 | +- 非必须: |
| 26 | + - Elasticsearch(支持版本 5.x)(非必须,若无则无法查看 wormhole 处理数据的吞吐和延时) |
| 27 | + - Grafana (支持版本 4.x)(非必须,若无则无法查看 wormhole 处理数据的吞吐和延时的图形化展示) |
| 28 | + |
| 29 | +------------------------------------------------------------------- |
| 30 | + |
| 31 | +## Wormhole 安装 + 配置 |
| 32 | + |
| 33 | +- 参考官网:<https://edp963.github.io/wormhole/deployment.html> |
| 34 | +- 最终环境 application.conf 配置文件参考 |
| 35 | + |
| 36 | +``` |
| 37 | +
|
| 38 | +akka.http.server.request-timeout = 120s |
| 39 | +
|
| 40 | +wormholeServer { |
| 41 | + cluster.id = "" #optional global uuid |
| 42 | + host = "linux-05" |
| 43 | + port = 8989 |
| 44 | + ui.default.language = "Chinese" |
| 45 | + token.timeout = 1 |
| 46 | + token.secret.key = "iytr174395lclkb?lgj~8u;[=L:ljg" |
| 47 | + admin.username = "admin" #default admin user name |
| 48 | + admin.password = "admin" #default admin user password |
| 49 | +} |
| 50 | +
|
| 51 | +mysql = { |
| 52 | + driver = "slick.driver.MySQLDriver$" |
| 53 | + db = { |
| 54 | + driver = "com.mysql.jdbc.Driver" |
| 55 | + user = "root" |
| 56 | + password = "123456" |
| 57 | + url = "jdbc:mysql://localhost:3306/wormhole?useUnicode=true&characterEncoding=UTF-8&useSSL=false" |
| 58 | + numThreads = 4 |
| 59 | + minConnections = 4 |
| 60 | + maxConnections = 10 |
| 61 | + connectionTimeout = 3000 |
| 62 | + } |
| 63 | +} |
| 64 | +
|
| 65 | +ldap = { |
| 66 | + enabled = false |
| 67 | + user = "" |
| 68 | + pwd = "" |
| 69 | + url = "" |
| 70 | + dc = "" |
| 71 | + read.timeout = 3000 |
| 72 | + read.timeout = 5000 |
| 73 | + connect = { |
| 74 | + timeout = 5000 |
| 75 | + pool = true |
| 76 | + } |
| 77 | +} |
| 78 | +
|
| 79 | +spark = { |
| 80 | + wormholeServer.user = "root" #WormholeServer linux user |
| 81 | + wormholeServer.ssh.port = 22 #ssh port, please set WormholeServer linux user can password-less login itself remote |
| 82 | + spark.home = "/usr/local/spark" |
| 83 | + yarn.queue.name = "default" #WormholeServer submit spark streaming/job queue |
| 84 | + wormhole.hdfs.root.path = "hdfs://linux-05/wormhole" #WormholeServer hdfslog data default hdfs root path |
| 85 | + yarn.rm1.http.url = "linux-05:8088" #Yarn ActiveResourceManager address |
| 86 | + yarn.rm2.http.url = "linux-05:8088" #Yarn StandbyResourceManager address |
| 87 | +} |
| 88 | +
|
| 89 | +flink = { |
| 90 | + home = "/usr/local/flink" |
| 91 | + yarn.queue.name = "default" |
| 92 | + feedback.state.count=100 |
| 93 | + checkpoint.enable=false |
| 94 | + checkpoint.interval=60000 |
| 95 | + stateBackend="hdfs://linux-05/flink-checkpoints" |
| 96 | + feedback.interval=30 |
| 97 | +} |
| 98 | +
|
| 99 | +zookeeper = { |
| 100 | + connection.url = "localhost:2181" #WormholeServer stream and flow interaction channel |
| 101 | + wormhole.root.path = "/wormhole" #zookeeper |
| 102 | +} |
| 103 | +
|
| 104 | +kafka = { |
| 105 | + #brokers.url = "localhost:6667" #WormholeServer feedback data store |
| 106 | + brokers.url = "linux-05:9092" |
| 107 | + zookeeper.url = "localhost:2181" |
| 108 | + #topic.refactor = 3 |
| 109 | + topic.refactor = 1 |
| 110 | + using.cluster.suffix = false #if true, _${cluster.id} will be concatenated to consumer.feedback.topic |
| 111 | + consumer = { |
| 112 | + feedback.topic = "wormhole_feedback" |
| 113 | + poll-interval = 20ms |
| 114 | + poll-timeout = 1s |
| 115 | + stop-timeout = 30s |
| 116 | + close-timeout = 20s |
| 117 | + commit-timeout = 70s |
| 118 | + wakeup-timeout = 60s |
| 119 | + max-wakeups = 10 |
| 120 | + session.timeout.ms = 60000 |
| 121 | + heartbeat.interval.ms = 50000 |
| 122 | + max.poll.records = 1000 |
| 123 | + request.timeout.ms = 80000 |
| 124 | + max.partition.fetch.bytes = 10485760 |
| 125 | + } |
| 126 | +} |
| 127 | +
|
| 128 | +#kerberos = { |
| 129 | +# keyTab="" #the keyTab will be used on yarn |
| 130 | +# spark.principal="" #the principal of spark |
| 131 | +# spark.keyTab="" #the keyTab of spark |
| 132 | +# server.config="" #the path of krb5.conf |
| 133 | +# jaas.startShell.config="" #the path of jaas config file which should be used by start.sh |
| 134 | +# jaas.yarn.config="" #the path of jaas config file which will be uploaded to yarn |
| 135 | +# server.enabled=false #enable wormhole connect to Kerberized cluster |
| 136 | +#} |
| 137 | +
|
| 138 | +# choose monitor method among ES、MYSQL |
| 139 | +monitor ={ |
| 140 | + database.type="ES" |
| 141 | +} |
| 142 | +
|
| 143 | +#Wormhole feedback data store, if doesn't want to config, you will not see wormhole processing delay and throughput |
| 144 | +#if not set, please comment it |
| 145 | +
|
| 146 | +#elasticSearch.http = { |
| 147 | +# url = "http://localhost:9200" |
| 148 | +# user = "" |
| 149 | +# password = "" |
| 150 | +#} |
| 151 | +
|
| 152 | +#display wormhole processing delay and throughput data, get admin user token from grafana |
| 153 | +#garfana should set to be anonymous login, so you can access the dashboard through wormhole directly |
| 154 | +#if not set, please comment it |
| 155 | +
|
| 156 | +#grafana = { |
| 157 | +# url = "http://localhost:3000" |
| 158 | +# admin.token = "jihefouglokoj" |
| 159 | +#} |
| 160 | +
|
| 161 | +#delete feedback history data on time |
| 162 | +maintenance = { |
| 163 | + mysql.feedback.remain.maxDays = 7 |
| 164 | + elasticSearch.feedback.remain.maxDays = 7 |
| 165 | +} |
| 166 | +
|
| 167 | +
|
| 168 | +#Dbus integration, support serveral DBus services, if not set, please comment it |
| 169 | +
|
| 170 | +#dbus = { |
| 171 | +# api = [ |
| 172 | +# { |
| 173 | +# login = { |
| 174 | +# url = "http://localhost:8080/keeper/login" |
| 175 | +# email = "" |
| 176 | +# password = "" |
| 177 | +# } |
| 178 | +# synchronization.namespace.url = "http://localhost:8080/keeper/tables/riderSearch" |
| 179 | +# } |
| 180 | +# ] |
| 181 | +#} |
| 182 | +``` |
| 183 | + |
| 184 | +- 初始化数据库: |
| 185 | + - 创建表:`create database wormhole character set utf8;` |
| 186 | +- 初始化表结构脚本路径:<https://github.com/edp963/wormhole/blob/master/rider/conf/wormhole.sql> |
| 187 | + - 该脚本存在一个问题:初始化脚本和补丁脚本混在一起,所以直接复制执行会有报错,但是报错的部分是不影响 |
| 188 | + - 我是直接把基础 sql 和补丁 sql 分开执行,方便判断。 |
| 189 | +- 部署完成,浏览器访问:<http://linux-05:8989> |
| 190 | + |
| 191 | +------------------------------------------------------------------- |
| 192 | + |
| 193 | +## 创建用户 |
| 194 | + |
| 195 | +- **参考官网,必须先了解下**:<https://edp963.github.io/wormhole/quick-start.html> |
| 196 | +- 必须创建用户,后面才能进入 Project 里面创建 Stream / Flow |
| 197 | +- 创建的用户类型必须是:`user` |
| 198 | + |
| 199 | + |
| 200 | +------------------------------------------------------------------- |
| 201 | + |
| 202 | +## 创建 Source 需要涉及的概念 |
| 203 | + |
| 204 | +#### 创建 Instance |
| 205 | + |
| 206 | +- Instance 用于绑定各个组件的所在服务连接 |
| 207 | +- 一般我们都会选择 Kafka 作为 source,后面的基础也是基于 Kafka 作为 Source 的场景 |
| 208 | +- 假设填写实例名:`source_kafka` |
| 209 | + |
| 210 | +#### 创建 Database |
| 211 | + |
| 212 | +- 各个组件的具体数据库、Topic 等信息 |
| 213 | +- 假设填写 topic:`source` |
| 214 | + |
| 215 | + |
| 216 | +#### 创建 Namespace |
| 217 | + |
| 218 | +- wormhole 抽象出来的概念 |
| 219 | +- 用于数据分类 |
| 220 | +- 假设填写 Tables:`ums_extension id` |
| 221 | +- 配置 schema,记得配置上 ums_ts |
| 222 | + |
| 223 | +``` |
| 224 | +{ |
| 225 | + "id": 1, |
| 226 | + "name": "test", |
| 227 | + "phone": "18074546423", |
| 228 | + "city": "Beijing", |
| 229 | + "time": "2017-12-22 10:00:00" |
| 230 | +} |
| 231 | +``` |
| 232 | + |
| 233 | + |
| 234 | +------------------------------------------------------------------- |
| 235 | + |
| 236 | +## 创建 Sink 需要涉及的概念 |
| 237 | + |
| 238 | +#### 创建 Instance |
| 239 | + |
| 240 | +- 假设填写实例名:`sink_mysql` |
| 241 | + |
| 242 | +#### 创建 Database |
| 243 | + |
| 244 | +- 假设填写 Database Name:`sink` |
| 245 | +- config 参数:`useUnicode=true&characterEncoding=UTF-8&useSSL=false&rewriteBatchedStatements=true` |
| 246 | + |
| 247 | +#### 创建 Namespace |
| 248 | + |
| 249 | +- 假设填写 Tables: `user id` |
| 250 | + |
| 251 | + |
| 252 | +------------------------------------------------------------------- |
| 253 | + |
| 254 | +## 创建 Project |
| 255 | + |
| 256 | +- 项目标识:`demo` |
| 257 | + |
| 258 | +------------------------------------------------------------------- |
| 259 | + |
| 260 | + |
| 261 | +## Flink Stream |
| 262 | + |
| 263 | +- Stream 是在 Project 内容页下才能创建 |
| 264 | +- 一个 Stream 可以有多个 Flow |
| 265 | +- 并且是 Project 下面的用户才能创建,admin 用户没有权限 |
| 266 | +- 要删除 Project 必须先进入 Project 内容页删除所有 Stream 之后 admin 才能删除 Project |
| 267 | +- 新建 Stream |
| 268 | + - Stream type 类型选择:`Flink` |
| 269 | + - 假设填写 Name:`wormhole_stream_test` |
| 270 | + |
| 271 | +## Flink Flow(流式作业) |
| 272 | + |
| 273 | +- Flow 是在 Project 内容页下才能创建 |
| 274 | +- 并且是 Project 下面的用户才能创建,admin 用户没有权限 |
| 275 | +- Flow 会关联 source 和 sink |
| 276 | +- 要删除 Project 必须先进入 Project 内容页删除所有 Stream 之后 admin 才能删除 Project |
| 277 | +- 基于 Stream 新建 Flow |
| 278 | + - Pipeline |
| 279 | + - Transformation |
| 280 | + - <https://edp963.github.io/wormhole/user-guide.html#cep> |
| 281 | + - NO_SKIP 滑动窗口 |
| 282 | + - SKIP_PAST_LAST_EVENT 滚动窗口 |
| 283 | + - KeyBy 分组字段 |
| 284 | + - Output |
| 285 | + - Agg:将匹配的多条数据做聚合,生成一条数据输出,例:field1:avg,field2:max(目前支持 max/min/avg/sum) |
| 286 | + - Detail:将匹配的多条数据逐一输出 |
| 287 | + - FilteredRow:按条件选择指定的一条数据输出,例:head/last/ field1:min/max |
| 288 | + - Confirmation |
| 289 | +- 注意:Stream 处于 running 状态时,才可以启动 Flow |
| 290 | + |
| 291 | + |
| 292 | +------------------------------------------------------------------- |
| 293 | + |
| 294 | +## Kafka 发送测试数据 |
| 295 | + |
| 296 | +- `cd /usr/local/kafka/bin` |
| 297 | +- `./kafka-console-producer.sh --broker-list linux-05:9092 --topic source --property "parse.key=true" --property "key.separator=@@@"` |
| 298 | +- 发送 UMS 流消息协议规范格式: |
| 299 | + |
| 300 | +``` |
| 301 | +data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 1, "name": "test", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:00:00"} |
| 302 | +``` |
| 303 | + |
0 commit comments