diff --git a/README.md b/README.md index 2b21482..46f6ee0 100644 --- a/README.md +++ b/README.md @@ -11,5 +11,6 @@ Coolplay Spark 将包含 Spark 源代码解析、Spark 类库、Spark 代码等 - [Spark Streaming 源码解析系列](https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97) - Spark Streaming 是 Spark 1.x 的流数据处理系统 - [Spark 资源集合](https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88) - - 包括 Spark Streaming 中文微信群、Spark Summit 视频等资源集合 + - 包括 Spark 技术交流(中文社群)
![wechat_spark_streaming_small](Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88/resources/wechat_spark_streaming_small_.PNG) + - Spark Summit 视频等资源集合 diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/0.1 Spark Streaming \345\256\236\347\216\260\346\200\235\350\267\257\344\270\216\346\250\241\345\235\227\346\246\202\350\277\260.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/0.1 Spark Streaming \345\256\236\347\216\260\346\200\235\350\267\257\344\270\216\346\250\241\345\235\227\346\246\202\350\277\260.md" index 49c74a7..136edb4 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/0.1 Spark Streaming \345\256\236\347\216\260\346\200\235\350\267\257\344\270\216\346\250\241\345\235\227\346\246\202\350\277\260.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/0.1 Spark Streaming \345\256\236\347\216\260\346\200\235\350\267\257\344\270\216\346\250\241\345\235\227\346\246\202\350\277\260.md" @@ -2,16 +2,15 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` -

@@ -87,7 +86,7 @@ Spark Streaming 与 Spark Core 的关系可以用下面的经典部件图来表 还真不一样。 -比如,`DStream` 维护了对每个产出的 `RDD` 实例的指针。比如下图里,`DStream A` 在 3 个 batch 里分别实例化了 3 个 `RDD`,分别是 `a[1]`, `a[2]`, `a[3]`,那么 `DStream A` 就保留了一个 `batch → 所产出的 RDD` 的哈希表,即包含 `batch 1 → a[1]`, `batch 2 → a[2]`, `batch 3 → a[3]` 这 3 项。 +比如,`DStream` 维护了对每个产出的 `RDD` 实例的引用。比如下图里,`DStream A` 在 3 个 batch 里分别实例化了 3 个 `RDD`,分别是 `a[1]`, `a[2]`, `a[3]`,那么 `DStream A` 就保留了一个 `batch → 所产出的 RDD` 的哈希表,即包含 `batch 1 → a[1]`, `batch 2 → a[2]`, `batch 3 → a[3]` 这 3 项。 ![image](0.imgs/045.png) @@ -183,7 +182,7 @@ Spark Streaming 与 Spark Core 的关系可以用下面的经典部件图来表 先看 executor 端。 -在 executor 端,`ReceiverSupervisor` 和 `Receiver` 失效后直接重启就 OK 了,关联是保障收到的块数据的安全。保障了源头块数据,就能够保障 RDD DAG (Spark Core 的 lineage)重做。 +在 executor 端,`ReceiverSupervisor` 和 `Receiver` 失效后直接重启就 OK 了,关键是保障收到的块数据的安全。保障了源头块数据,就能够保障 RDD DAG (Spark Core 的 lineage)重做。 Spark Streaming 对源头块数据的保障,分为 4 个层次,全面、相互补充,又可根据不同场景灵活设置: @@ -317,7 +316,7 @@ ssc.awaitTermination() ## 四、总结与回顾 -在最后我们再把 [Sark Streaming 官方 Programming Guide] (http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example) 的部分内容放在这里,作为本文的一个回顾和总结。请大家看一看,如果看懂了本文的内容,是不是读下面这些比较 high-level 的介绍会清晰化很多 :-) +在最后我们再把 [Sark Streaming 官方 Programming Guide](http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example) 的部分内容放在这里,作为本文的一个回顾和总结。请大家看一看,如果看懂了本文的内容,是不是读下面这些比较 high-level 的介绍会清晰化很多 :-) > **Spark Streaming** is an extension of the **core Spark API** that enables **scalable**, **high-throughput**, **fault-tolerant stream processing of live data streams**. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams. diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.1 DStream, DStreamGraph \350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.1 DStream, DStreamGraph \350\257\246\350\247\243.md" index bd1b42c..ca74e93 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.1 DStream, DStreamGraph \350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.1 DStream, DStreamGraph \350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

@@ -153,7 +153,7 @@ new ForeachDStream(wordCounts, cnt => cnt.print()) // 类型是 ForeachDStr 这里背后的原因是,在 Spark Core 的 RDD API 里,RDD 的计算是被触发了以后才进行 lazy 求值的,即当真正求 `d` 的值的时候,先计算上游 dependency `c`;而计算 `c` 则先进一步计算 `c` 的上游 dependency `a` 和 `b`。Spark Streaming 里则与 RDD DAG 的反向表示保持了一致,对 DStream 也采用的反向表示。 -所以,这里 `d` 对 `c` 的引用(即指针),表达的是一个上游*依赖*(***dependency***)的关系;也就是说,不求值则已,一旦 `d.print()` 这个 *output* 操作触发了对 `d` 的求值,那么就需要从 `d` 开始往上游进行追溯计算。 +所以,这里 `d` 对 `c` 的引用,表达的是一个上游*依赖*(***dependency***)的关系;也就是说,不求值则已,一旦 `d.print()` 这个 *output* 操作触发了对 `d` 的求值,那么就需要从 `d` 开始往上游进行追溯计算。 具体的过程是,`d.print()` 将 `new` 一个 `d` 的一个下游 `ForEachDStream x` —— `x` 中记明了需要做的操作 `func = print()` —— 然后在每个 batch 动态生成 RDD 实例时,以 `x` 为根节点、进行一次 BFS(宽度优先遍历),就可以快速得到需要进行实际计算的最小集合。如下图所示,这个最小集合就是 {`a`, `b`, `c`, `d`}。 diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.2 DStream \347\224\237\346\210\220 RDD \345\256\236\344\276\213\350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.2 DStream \347\224\237\346\210\220 RDD \345\256\236\344\276\213\350\257\246\350\247\243.md" index d85206b..2939c3d 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.2 DStream \347\224\237\346\210\220 RDD \345\256\236\344\276\213\350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.2 DStream \347\224\237\346\210\220 RDD \345\256\236\344\276\213\350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 JobScheduler, Job, JobSet \350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 JobScheduler, Job, JobSet \350\257\246\350\247\243.md" index 216db65..afdd1ba 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 JobScheduler, Job, JobSet \350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 JobScheduler, Job, JobSet \350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

@@ -18,9 +18,9 @@ ## 引言 -前面在 [Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) 和 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) 里我们分析了 `DStreamGraph` 和 `DStream` 具有能够实例化 `RDD` 和 `RDD` DAG 的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。 +前面在 [Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) 和 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) 里我们分析了 `DStream` 和 `DStreamGraph` 具有能够实例化 `RDD` 和 `RDD` DAG 的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。 -在 Spark Streaming 程序的入口,我们都会定义一个 `batchDuration`,就是需要每隔多长时间就比照静态的 `DStreamGraph` 来动态生成一个 RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是 `JobScheduler`,在 Spark Streaming 程序在 `ssc.start()` 开始运行时,会生成一个 `JobScheduler` 的实例,并被 start() 运行起来。 +在 Spark Streaming 程序的入口,我们都会定义一个 `batchDuration`,就是需要每隔多长时间就比照静态的 `DStreamGraph` 来动态生成一个 RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是 `JobScheduler`,在 Spark Streaming 程序在 `ssc.start()` 开始运行时,将 `JobScheduler` 的实例给 start() 运行起来。 ```scala // 来自 StreamingContext @@ -122,7 +122,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { 这里 `jobExecutor` 的线程池大小,是由 `spark.streaming.concurrentJobs` 参数来控制的,当没有显式设置时,其取值为 `1`。 -进一步说,这里 `jobExecutor` 的线程池大小,就是能够并行执行的 `Job` 数。而回想前文讲解的 `DStreamGraph.generateJobs(time)` 过程,一次 batch 产生一个 `Seq[Job}`,里面可能包含多个 `Job` —— 所以,确切的,**有几个 *output* 操作,就调用几次 `ForEachDStream.generatorJob(time)`,就产生出几个 `Job` **。 +进一步说,这里 `jobExecutor` 的线程池大小,就是能够并行执行的 `Job` 数。而回想前文讲解的 `DStreamGraph.generateJobs(time)` 过程,一次 batch 产生一个 `Seq[Job}`,里面可能包含多个 `Job` —— 所以,确切的,**有几个 *output* 操作,就调用几次 `ForEachDStream.generatorJob(time)`,就产生出几个 `Job`**。 为了验证这个结果,我们做一个简单的小测试:先设置 `spark.streaming.concurrentJobs = 10`,然后在每个 batch 里做 `2` 次 `foreachRDD()` 这样的 *output* 操作: diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.2 JobGenerator \350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.2 JobGenerator \350\257\246\350\247\243.md" index 75e82bd..eae1799 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.2 JobGenerator \350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.2 JobGenerator \350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

@@ -18,7 +18,7 @@ ## 引言 -前面在 [Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) 和 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) 里我们分析了 `DStreamGraph` 和 `DStream` 具有能够实例化 `RDD` 和 `RDD` DAG 的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。 +前面在 [Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) 和 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) 里我们分析了 `DStream` 和 `DStreamGraph` 具有能够实例化 `RDD` 和 `RDD` DAG 的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。 在 Spark Streaming 程序的入口,我们都会定义一个 `batchDuration`,就是需要每隔多长时间就比照静态的 `DStreamGraph` 来动态生成一个 RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是 `JobScheduler`, diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.1 Receiver \345\210\206\345\217\221\350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.1 Receiver \345\210\206\345\217\221\350\257\246\350\247\243.md" index db33c2a..fe8b43d 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.1 Receiver \345\210\206\345\217\221\350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.1 Receiver \345\210\206\345\217\221\350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

@@ -101,7 +101,7 @@ Spark 1.4.0 的 `launchReceivers()` 的过程如下: - 然后就调用 `ReceiverSchedulingPolicy.scheduleReceivers(receivers, executors)` 来计算每个 `Receiver` 的目的地 executor 列表 - 在 Streaming 程序运行过程中,如果需要重启某个 `Receiver`: - - 将首先看一看之前计算过的目的地 executor 还没有还 alive 的 + - 将首先看一看之前计算过的目的地 executor 有没有还 alive 的 - 如果没有,就需要 `ReceiverSchedulingPolicy.rescheduleReceiver(receiver, ...)` 来重新计算这个 `Receiver` 的目的地 executor 列表 [默认的 `ReceiverSchedulingPolicy`](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala) 是实现为 `round-robin` 式的了。我们举例说明下这两个方法: diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.2 Receiver, ReceiverSupervisor, BlockGenerator, ReceivedBlockHandler \350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.2 Receiver, ReceiverSupervisor, BlockGenerator, ReceivedBlockHandler \350\257\246\350\247\243.md" index e7d9c3e..d69fef5 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.2 Receiver, ReceiverSupervisor, BlockGenerator, ReceivedBlockHandler \350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.2 Receiver, ReceiverSupervisor, BlockGenerator, ReceivedBlockHandler \350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.3 ReceiverTraker, ReceivedBlockTracker \350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.3 ReceiverTraker, ReceivedBlockTracker \350\257\246\350\247\243.md" index 9bb4456..b79aee5 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.3 ReceiverTraker, ReceivedBlockTracker \350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.3 ReceiverTraker, ReceivedBlockTracker \350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

@@ -109,7 +109,7 @@ `RpcEndPoint` 可以理解为 RPC 的 server 端,供 client 调用。 -`ReceiverTracker` 作为 `RpcEndPoint` 的地址 —— 即 driver 的地址 —— 是公开的,可供 `Receiver` 连接;如果某个 `Receiver` 连接成功,那么 `ReceiverTracker` 也就持有了这个 `Receiver` 的 `RpcEndPoint`。这样以来,通过发送消息,就可以实现双向通信。 +`ReceiverTracker` 作为 `RpcEndPoint` 的地址 —— 即 driver 的地址 —— 是公开的,可供 `Receiver` 连接;如果某个 `Receiver` 连接成功,那么 `ReceiverTracker` 也就持有了这个 `Receiver` 的 `RpcEndPoint`。这样一来,通过发送消息,就可以实现双向通信。 1.5.0 版本以来,`ReceiverTracker` 支持的消息有 10 种,我们进行一个总结: @@ -165,13 +165,13 @@ ### (3) ReceiverTracker 管理块数据的 meta 信息 -一方面 `Receiver` 将通过 `AddBlock` 消息上报 meta 信息给 `ReceiverTracker`,另一方面 `JobGenerator` 将在每个 batch 开始时要求 `ReceiverTracker` 将已上报的块信息进行 batch 划分,`ReceiverTracker` 完整了块数据的 meta 信息管理工作。 +一方面 `Receiver` 将通过 `AddBlock` 消息上报 meta 信息给 `ReceiverTracker`,另一方面 `JobGenerator` 将在每个 batch 开始时要求 `ReceiverTracker` 将已上报的块信息进行 batch 划分,`ReceiverTracker` 完成了块数据的 meta 信息管理工作。 具体的,`ReceiverTracker` 有一个成员 `ReceivedBlockTracker`,专门负责已上报的块数据 meta 信息管理。 ## ReceivedBlockTracker 详解 -我们刚刚将,`ReceivedBlockTracker` 专门负责已上报的块数据 meta 信息管理,但 `ReceivedBlockTracker` 本身不负责对外交互,一切都是通过 `ReceiverTracker` 来转发 —— 这里 `ReceiverTracker` 相当于是 `ReceivedBlockTracker` 的门面(可参考 [门面模式](http://www.cnblogs.com/zhenyulu/articles/55992.html))。 +我们刚刚讲到,`ReceivedBlockTracker` 专门负责已上报的块数据 meta 信息管理,但 `ReceivedBlockTracker` 本身不负责对外交互,一切都是通过 `ReceiverTracker` 来转发 —— 这里 `ReceiverTracker` 相当于是 `ReceivedBlockTracker` 的门面(可参考 [门面模式](http://www.cnblogs.com/zhenyulu/articles/55992.html))。 在 `ReceivedBlockTracker` 内部,有几个重要的成员,它们的关系如下: diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.1 Executor \347\253\257\351\225\277\346\227\266\345\256\271\351\224\231\350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.1 Executor \347\253\257\351\225\277\346\227\266\345\256\271\351\224\231\350\257\246\350\247\243.md" index 2d42f95..18d76ad 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.1 Executor \347\253\257\351\225\277\346\227\266\345\256\271\351\224\231\350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.1 Executor \347\253\257\351\225\277\346\227\266\345\256\271\351\224\231\350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

@@ -26,7 +26,7 @@ 本文我们详解 executor 端的保障。 -在 executor 端,`ReceiverSupervisor` 和 `Receiver` 失效后直接重启就 OK 了,关联是保障收到的块数据的安全。保障了源头块数据,就能够保障 RDD DAG (Spark Core 的 lineage)重做。 +在 executor 端,`ReceiverSupervisor` 和 `Receiver` 失效后直接重启就 OK 了,关键是保障收到的块数据的安全。保障了源头块数据,就能够保障 RDD DAG (Spark Core 的 lineage)重做。 Spark Streaming 对源头块数据的保障,分为 4 个层次,全面、相互补充,又可根据不同场景灵活设置: - (1) 热备 @@ -114,7 +114,7 @@ private def doPut(blockId: BlockId, data: BlockValues, level: StorageLevel, ...) 简单总结本小节的解析,`Receiver` 收到的数据,通过 `ReceiverSupervisorImpl`,将数据交给 `BlockManager` 存储;而 `BlockManager` 本身支持将数据 `replicate()` 到另外的 executor 上,这样就完成了 `Receiver` 源头数据的热备过程。 -而在计算时,计算任务首先将获取需要的块数据,这是如果一个 executor 失效导致一份数据丢失,那么计算任务将转而向另一个 executor 上的同一份数据获取数据。因为另一份块数据是现成的、不需要像冷备那样重新读取的,所以这里不会有 recovery time。 +而在计算时,计算任务首先将获取需要的块数据,这时如果一个 executor 失效导致一份数据丢失,那么计算任务将转而向另一个 executor 上的同一份数据获取数据。因为另一份块数据是现成的、不需要像冷备那样重新读取的,所以这里不会有 recovery time。 ## (2) 冷备 diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.2 Driver \347\253\257\351\225\277\346\227\266\345\256\271\351\224\231\350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.2 Driver \347\253\257\351\225\277\346\227\266\345\256\271\351\224\231\350\257\246\350\247\243.md" index 46291bb..e1383a9 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.2 Driver \347\253\257\351\225\277\346\227\266\345\256\271\351\224\231\350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.2 Driver \347\253\257\351\225\277\346\227\266\345\256\271\351\224\231\350\257\246\350\247\243.md" @@ -2,14 +2,14 @@ ***[酷玩 Spark] Spark Streaming 源码解析系列*** ,返回目录请 [猛戳这里](readme.md) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ```

diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/Q&A \344\273\200\344\271\210\346\230\257 end-to-end exactly-once.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/Q&A \344\273\200\344\271\210\346\230\257 end-to-end exactly-once.md" index 4d013ed..950e5cc 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/Q&A \344\273\200\344\271\210\346\230\257 end-to-end exactly-once.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/Q&A \344\273\200\344\271\210\346\230\257 end-to-end exactly-once.md" @@ -11,7 +11,7 @@ 而在 Spark 2.0 引入的 Structured Streaming 里,将把常见的下游 end 也管理起来(比如通过 batch id 来原生支持幂等),那么不需要 user code 做什么就可以保证 end-to-end 的 exactly-once 了,请见下面一张来自 databricks 的 slide[1]: - ![end-to-end exactly-once](q&a.imgs/end-to-end exactly-once.png) + ![end-to-end exactly-once](q%26a.imgs/end-to-end%20exactly-once.png) - [1] Reynold Xin (Databricks), *"the Future of Real-time in Spark"*, 2016.02, http://www.slideshare.net/rxin/the-future-of-realtime-in-spark. diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/readme.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/readme.md" index c04146a..d7b9e40 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/readme.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/readme.md" @@ -1,48 +1,55 @@ ## Spark Streaming 源码解析系列 -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本系列内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0) -* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2) -* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3) +* 2022.02.10 update, Spark 3.1.3 √ +* 2021.10.13 update, Spark 3.2 全系列 √ +* 2021.01.07 update, Spark 3.1 全系列 √ +* 2020.06.18 update, Spark 3.0 全系列 √ +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` - *概述* - - [0.1 Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) + - [0.1 Spark Streaming 实现思路与模块概述](0.1%20Spark%20Streaming%20实现思路与模块概述.md) - *模块 1:DAG 静态定义* - - [1.1 DStream, DStreamGraph 详解](1.1 DStream, DStreamGraph 详解.md) - - [1.2 DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) + - [1.1 DStream, DStreamGraph 详解](1.1%20DStream%2C%20DStreamGraph%20详解.md) + - [1.2 DStream 生成 RDD 实例详解](1.2%20DStream%20生成%20RDD%20实例详解.md) - *模块 2:Job 动态生成* - - [2.1 JobScheduler, Job, JobSet 详解](2.1 JobScheduler, Job, JobSet 详解.md) - - [2.2 JobGenerator 详解](2.2 JobGenerator 详解.md) + - [2.1 JobScheduler, Job, JobSet 详解](2.1%20JobScheduler%2C%20Job%2C%20JobSet%20详解.md) + - [2.2 JobGenerator 详解](2.2%20JobGenerator%20详解.md) - *模块 3:数据产生与导入* - - [3.1 Receiver 分发详解](3.1 Receiver 分发详解.md) - - [3.2 Receiver, ReceiverSupervisor, BlockGenerator, ReceivedBlockHandler 详解](3.2 Receiver, ReceiverSupervisor, BlockGenerator, ReceivedBlockHandler 详解.md) - - [3.3 ReceiverTraker, ReceivedBlockTracker 详解](3.3 ReceiverTraker, ReceivedBlockTracker 详解.md) + - [3.1 Receiver 分发详解](3.1%20Receiver%20分发详解.md) + - [3.2 Receiver, ReceiverSupervisor, BlockGenerator, ReceivedBlockHandler 详解](3.2%20Receiver%2C%20ReceiverSupervisor%2C%20BlockGenerator%2C%20ReceivedBlockHandler%20详解.md) + - [3.3 ReceiverTraker, ReceivedBlockTracker 详解](3.3%20ReceiverTraker%2C%20ReceivedBlockTracker%20详解.md) - *模块 4:长时容错* - - [4.1 Executor 端长时容错详解](4.1 Executor 端长时容错详解.md) - - [4.2 Driver 端长时容错详解](4.2 Driver 端长时容错详解.md) + - [4.1 Executor 端长时容错详解](4.1%20Executor%20端长时容错详解.md) + - [4.2 Driver 端长时容错详解](4.2%20Driver%20端长时容错详解.md) - *StreamingContext* - 5.1 StreamingContext 详解 - *一些资源和 Q&A* - - [Spark 资源集合](https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88) (包括 Spark Summit 视频,Spark 中文微信群等资源集合) - - [(Q&A) 什么是 end-to-end exactly-once?](Q&A 什么是 end-to-end exactly-once.md) + - [Spark 资源集合](https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88) (包括 Spark Summit 视频,Spark 中文微信群等资源集合)
![wechat_spark_streaming_small](../Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88/resources/wechat_spark_streaming_small_.PNG) + - [(Q&A) 什么是 end-to-end exactly-once?](Q%26A%20什么是%20end-to-end%20exactly-once.md) ## 致谢 - Github @wongxingjun 同学指出 3 处 typo,并提 Pull Request 修正(PR 已合并) - Github @endymecy 同学指出 2 处 typo,并提 Pull Request 修正(PR 已合并) +- Github @Lemonjing 同学指出几处 typo,并提 Pull Request 修正(PR 已合并) +- Github @xiaoguoqiang 同学指出 1 处 typo,并提 Pull Request 修正(PR 已合并) +- Github 张瀚 (@AntikaSmith) 同学指出 1 处 问题(已修正) - Github Tao Meng (@mtunique) 同学指出 1 处 typo,并提 Pull Request 修正(PR 已合并) - Github @ouyangshourui 同学指出 1 处问题,并提 Pull Request 修正(PR 已合并) - Github @jacksu 同学指出 1 处问题,并提 Pull Request 修正(PR 已合并) -- Github @klion26 同学指出 1 处 typo -- Github @397090770 同学指出 1 处配图笔误 -- Github @ubtaojiang1982 同学指出 1 处 typo -- Github @marlin5555 同学指出 1 处配图遗漏信息 -- Weibo @wyggggo 同学指出 1 处 typo +- Github @klion26 同学指出 1 处 typo(已修正) +- Github @397090770 同学指出 1 处配图笔误(已修正) +- Github @ubtaojiang1982 同学指出 1 处 typo(已修正) +- Github @marlin5555 同学指出 1 处配图遗漏信息(已修正) +- Weibo @wyggggo 同学指出 1 处 typo(已修正) ## Spark Streaming 史前史(1) diff --git "a/Spark \350\265\204\346\272\220\351\233\206\345\220\210/README.md" "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/README.md" index 8488ffd..ccee8e2 100644 --- "a/Spark \350\265\204\346\272\220\351\233\206\345\220\210/README.md" +++ "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/README.md" @@ -1,41 +1,28 @@ -# Spark Summit 视频 (2017 最新) +# Spark+AI Summit 资源 (2019 最新) -![spark_summit_east_2017](resources/spark_summit_east_2017.png) -- 日程及 slides => [Spark Summit East 2017 官方日程](https://spark-summit.org/east-2017/schedule/) -- 视频集合 => [墙外地址@Youtube](https://www.youtube.com/user/TheApacheSpark) -- 视频集合 => [墙内地址@百度云盘](http://pan.baidu.com/s/1jHD7yey) +![spark_ai summit_2019](resources/spark_ai_summit_2019_san_francisco.png) +- [2019.04.23~25] 官方日程 => [官方日程](https://databricks.com/sparkaisummit/north-america/schedule-static) +- [2019.04.23~25] PPT 合集 => [PPT 合集 from 示说网](https://mp.weixin.qq.com/s/CSTqXHCpJPvlkVAeaY1mIw) +- [2019.04.23~25] 视频集合 => [墙内地址@百度云](https://pan.baidu.com/s/10HmEy1zbVnfsZQrllTwl8A)
# Spark 中文微信交流群 -![wechat_spark_streaming_small](resources/wechat_spark_streaming_small.PNG) ![wechat_sh_meetup_small](resources/wechat_sh_meetup_small.PNG) +![wechat_spark_streaming_small](resources/wechat_spark_streaming_small_.PNG)
-# Spark 资源(since 2016.07.01) +# Spark 资源 - [Databricks 的博客](https://databricks.com/blog) - Spark 背后的大数据公司的博客,包括 Spark 技术剖析、Spark 案例、行业动态等 - [Apache Spark JIRA issues](https://issues.apache.org/jira/issues/?jql=project+%3D+SPARK) - 开发人员经常关注一下,可以知道未来 3 ~ 6 个月 Spark 的发展方向 - [Structured Streaming 源码解析系列](https://github.com/lw-lin/CoolplaySpark/tree/master/Structured%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97) - - 作者会按照最新 Spark 版本(目前是 2.1.0)持续更新和修订 + - 作者会按照最新 Spark 版本持续更新和修订 - [Spark Streaming 源码解析系列](https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97) - - 作者会按照最新 Spark 版本(目前是 2.1.0)持续更新和修订 + - 作者会按照最新 Spark 版本持续更新和修订
-# Spark 资源(prior to 2016.06.31) - -*应当将下面的内容看做一个目录和索引,可以按需找到相关资源* - -# Spark Summit 视频 (2016 年) - -- 过去的 Spark Summit (2016 年共 3 次) - - Spark Summit Europe 2016,2016.10.25-10.27,at 布鲁塞尔,视频集合 => [墙内地址@百度云盘](https://pan.baidu.com/s/1dE4OB4H) - - Spark Summit 2016,2016.06.06-06.08,at 旧金山,视频集合 => [墙内地址@百度云盘](https://pan.baidu.com/s/1bZpurW) - - Spark Summit East 2016,2016.02.16-02.18,at 纽约,视频集合 => [墙内地址@百度云盘](https://pan.baidu.com/s/1jHyMj46) - -
- # 各种资源持续更新 ing *欢迎大家提供资源索引(在本 repo 直接发 issue 即可),thanks!* diff --git "a/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/spark_ai_summit_2019_san_francisco.png" "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/spark_ai_summit_2019_san_francisco.png" new file mode 100644 index 0000000..a159b25 Binary files /dev/null and "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/spark_ai_summit_2019_san_francisco.png" differ diff --git "a/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/spark_summit_europe_2017.png" "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/spark_summit_europe_2017.png" new file mode 100644 index 0000000..833f748 Binary files /dev/null and "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/spark_summit_europe_2017.png" differ diff --git "a/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/wechat_spark_streaming_small.PNG" "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/wechat_spark_streaming_small.PNG" index 80cdc7a..98336d5 100644 Binary files "a/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/wechat_spark_streaming_small.PNG" and "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/wechat_spark_streaming_small.PNG" differ diff --git "a/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/wechat_spark_streaming_small_.PNG" "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/wechat_spark_streaming_small_.PNG" new file mode 100644 index 0000000..98336d5 Binary files /dev/null and "b/Spark \350\265\204\346\272\220\351\233\206\345\220\210/resources/wechat_spark_streaming_small_.PNG" differ diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.1 Structured Streaming \345\256\236\347\216\260\346\200\235\350\267\257\344\270\216\345\256\236\347\216\260\346\246\202\350\277\260.md" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.1 Structured Streaming \345\256\236\347\216\260\346\200\235\350\267\257\344\270\216\345\256\236\347\216\260\346\246\202\350\277\260.md" index cd8d53a..17a7544 100644 --- "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.1 Structured Streaming \345\256\236\347\216\260\346\200\235\350\267\257\344\270\216\345\256\236\347\216\260\346\246\202\350\277\260.md" +++ "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.1 Structured Streaming \345\256\236\347\216\260\346\200\235\350\267\257\344\270\216\345\256\236\347\216\260\346\246\202\350\277\260.md" @@ -2,11 +2,13 @@ ***[酷玩 Spark] Structured Streaming 源码解析系列*** ,返回目录请 [猛戳这里](.) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本文内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (已发布:2.1.0, 开发中:2.1.1-SNAPSHOT) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` 本文目录 @@ -56,7 +58,7 @@ Spark 2.x 则咔咔咔精简到只保留一个 SparkSession 作为主程序入 - 多条 json 对象比如 `{name: "Alice", age: 20, height: 1.68}, {name: "Bob", age: 25, height: 1.76}` 可以方便地转化为 `Dataset/DataFrame` - 或者 MySQL 表、行式存储文件、列式存储文件等等等都可以方便地转化为 `Dataset/DataFrame` -Spark 2.0 更进一步,使用 Dataset/Dataframe 的行列数据表格来扩展表达 streaming data —— 所以便横空出世了 Structured Streaming 、《Structured Streaming 源码解析系列》—— 与静态的 structured data 不同,动态的 streaming data 的行列数据表格是一直无限增长的(因为 streaming data 在远远不断地产生)! +Spark 2.0 更进一步,使用 Dataset/Dataframe 的行列数据表格来扩展表达 streaming data —— 所以便横空出世了 Structured Streaming 、《Structured Streaming 源码解析系列》—— 与静态的 structured data 不同,动态的 streaming data 的行列数据表格是一直无限增长的(因为 streaming data 在源源不断地产生)!

single API: Dataset/DataFrame

@@ -106,8 +108,9 @@ query.awaitTermination() // 当前用户主线程挂 StreamExection 另外的重要成员变量是: - `currentBatchId`: 当前执行的 id +- `batchCommitLog`: 已经成功处理过的批次有哪些 - `offsetLog`, `availableOffsets`, `committedOffsets`: 当前执行需要处理的 source data 的 meta 信息 -- `...watermark`: 当前执行的 watermark 信息(event time 相关,本文暂不涉及、另文解析) +- `offsetSeqMetadata`: 当前执行的 watermark 信息(event time 相关,本文暂不涉及、另文解析)等 我们将 Source, Sink, StreamExecution 及其重要成员变量标识在下图,接下来将逐个详细解析。 @@ -131,7 +134,9 @@ StreamExection 另外的重要成员变量是: - 物理计划的生成与选择:结果是可以直接用于执行的 RDD DAG - 逻辑计划、优化的逻辑计划、物理计划、及最后结果 RDD DAG,合并起来就是 IncrementalExecution 5. 将表示计算结果的 Dataset/DataFrame (包含 IncrementalExecution) 交给 Sink,即调用 Sink.add(ds/df) -6. 通过 Source.commit() 告知 Source 数据已经完整处理结束;Source 可按需完成数据的 garbage-collection +6. 计算完成后的 commit + - (6a) 通过 Source.commit() 告知 Source 数据已经完整处理结束;Source 可按需完成数据的 garbage-collection + - (6b) 将本次执行的批次 id 写入到 batchCommitLog 里 ### 3. StreamExecution 的持续查询(增量) @@ -162,8 +167,9 @@ Structured Streaming 的做法是: 如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution: -- 重读 WAL offsetlog 恢复出最新的 offsets 等;相当于取代正常流程里的 (1)(2) 步 -- 重做 (3a), (3b), (4), (5), (6) 步 +- 读取 WAL offsetlog 恢复出最新的 offsets 等;相当于取代正常流程里的 (1)(2) 步 +- 读取 batchCommitLog 决定是否需要重做最近一个批次 +- 如果需要,那么重做 (3a), (3b), (4), (5), (6a), (6b) 步 - 这里第 (5) 步需要分两种情况讨论 - (i) 如果上次执行在 (5) ***结束前即失效***,那么本次执行里 sink 应该完整写出计算结果 - (ii) 如果上次执行在 (5) ***结束后才失效***,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了) @@ -178,6 +184,7 @@ Structured Streaming 的做法是: | :-----------------------------: | :------------------------------: | :-----: | :--------------------------------------: | | **HDFS-compatible file system** | ![checked](1.imgs/checked.png) | 已支持 | 包括但不限于 text, json, csv, parquet, orc, ... | | **Kafka** | ![checked](1.imgs/checked.png) | 已支持 | Kafka 0.10.0+ | +| **RateStream** | ![checked](1.imgs/checked.png) | 已支持 | 以一定速率产生数据 | | **RDBMS** | ![checked](1.imgs/checked.png) | *(待支持)* | 预计后续很快会支持 | | **Socket** | ![negative](1.imgs/negative.png) | 已支持 | 主要用途是在技术会议/讲座上做 demo | | **Receiver-based** | ![negative](1.imgs/negative.png) | 不会支持 | 就让这些前浪被拍在沙滩上吧 | @@ -189,6 +196,7 @@ Structured Streaming 的做法是: | **HDFS-compatible file system** | ![checked](1.imgs/checked.png) | 已支持 | 包括但不限于 text, json, csv, parquet, orc, ... | | **ForeachSink** (自定操作幂等) | ![checked](1.imgs/checked.png) | 已支持 | 可定制度非常高的 sink | | **RDBMS** | ![checked](1.imgs/checked.png) | *(待支持)* | 预计后续很快会支持 | +| **Kafka** | ![negative](1.imgs/negative.png) | 已支持 | Kafka 目前不支持幂等写入,所以可能会有重复写入
(但推荐接着 Kafka 使用 streaming de-duplication 来去重) | | **ForeachSink** (自定操作不幂等) | ![negative](1.imgs/negative.png) | 已支持 | 不推荐使用不幂等的自定操作 | | **Console** | ![negative](1.imgs/negative.png) | 已支持 | 主要用途是在技术会议/讲座上做 demo | diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/100.png" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/100.png" index d64435d..4ae4456 100644 Binary files "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/100.png" and "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/100.png" differ diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/110.png" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/110.png" index 55c1b6a..70671f6 100644 Binary files "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/110.png" and "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/110.png" differ diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/120.png" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/120.png" index 15a6b96..778dad9 100644 Binary files "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/120.png" and "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/1.imgs/120.png" differ diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 Structured Streaming \344\271\213 Source \350\247\243\346\236\220.md" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 Structured Streaming \344\271\213 Source \350\247\243\346\236\220.md" index da5b6c2..99b5c3b 100644 --- "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 Structured Streaming \344\271\213 Source \350\247\243\346\236\220.md" +++ "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 Structured Streaming \344\271\213 Source \350\247\243\346\236\220.md" @@ -2,11 +2,13 @@ ***[酷玩 Spark] Structured Streaming 源码解析系列*** ,返回目录请 [猛戳这里](.) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本文内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (已发布:2.1.0, 开发中:2.1.1-SNAPSHOT) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` @@ -36,6 +38,7 @@ trait Source { - 已支持 - Kafka,具体实现是 KafkaSource extends Source - HDFS-compatible file system,具体实现是 FileStreamSource extends Source + - RateStream,具体实现是 RateStreamSource extends Source - 预计后续很快会支持 - RDBMS @@ -57,7 +60,7 @@ trait Source { 4. StreamExecution 触发计算逻辑 logicalPlan 的优化与编译 5. 把计算结果写出给 Sink - 注意这时才会由 Sink 触发发生实际的取数据操作,以及计算过程 -6. 在数据完整写出到 Sink 后,StreamExecution 通知 Source 可以废弃数据 +6. 在数据完整写出到 Sink 后,StreamExecution 通知 Source 可以废弃数据;然后把成功的批次 id 写入到 batchCommitLog - 这里是由 StreamExecution 调用 Source 的 `def commit(end: Offset): Unit`,即方法 (4) - `commit()` 方法主要是帮助 Source 完成 garbage-collection,如果外部数据源本身即具有 garbage-collection 功能,如 Kafka,那么在 Source 的具体 `commit()` 实现上即可为空、留给外部数据源去自己管理 @@ -71,7 +74,7 @@ trait Source { - 方法 (5) `def stop(): Unit` - 当一个持续查询结束时,Source 会被调用此方法 -## Source 的具体实现:HDFS-compatible file system, Kafka +## Source 的具体实现:HDFS-compatible file system, Kafka, Rate 我们总结一下截至目前,Source 已有的具体实现: @@ -79,6 +82,7 @@ trait Source { | :-----------------------------: | :------------------------------: | :----: | :--------------------------------------: | | **HDFS-compatible file system** | ![checked](1.imgs/checked.png) | 已支持 | 包括但不限于 text, json, csv, parquet, orc, ... | | **Kafka** | ![checked](1.imgs/checked.png) | 已支持 | Kafka 0.10.0+ | +| **RateStream** | ![checked](1.imgs/checked.png) | 已支持 | 以一定速率产生数据 | | **Socket** | ![negative](1.imgs/negative.png) | 已支持 | 主要用途是在技术会议/讲座上做 demo | 这里我们特别强调一下,虽然 Structured Streaming 也内置了 `socket` 这个 Source,但它并不能可靠重放、因而也不符合 Structured Streaming 的结构体系。它的主要用途只是在技术会议/讲座上做 demo,不应用于线上生产系统。 @@ -95,3 +99,4 @@ trait Source {

(本文完,参与本文的讨论请 [猛戳这里](https://github.com/lw-lin/CoolplaySpark/issues/31),返回目录请 [猛戳这里](.)) + diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.2 Structured Streaming \344\271\213 Sink \350\247\243\346\236\220.md" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.2 Structured Streaming \344\271\213 Sink \350\247\243\346\236\220.md" index 91f6fad..5355d07 100644 --- "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.2 Structured Streaming \344\271\213 Sink \350\247\243\346\236\220.md" +++ "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.2 Structured Streaming \344\271\213 Sink \350\247\243\346\236\220.md" @@ -2,11 +2,13 @@ ***[酷玩 Spark] Structured Streaming 源码解析系列*** ,返回目录请 [猛戳这里](.) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本文内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (已发布:2.1.0, 开发中:2.1.1-SNAPSHOT) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` @@ -29,11 +31,12 @@ trait Sink { 相比而言,前作 Spark Streaming 并没有对输出进行特别的抽象,而只是在 DStreamGraph [2] 里将一些 dstreams 标记为了 output。当需要 exactly-once 特性时,程序员可以根据当前批次的时间标识,来 ***自行维护和判断*** 一个批次是否已经执行过。 -进化到 Structured Streaming 后,显式地抽象出了 Sink,并提供了一些原生实现: +进化到 Structured Streaming 后,显式地抽象出了 Sink,并提供了一些原生幂等的 Sink 实现: - 已支持 - HDFS-compatible file system,具体实现是 FileStreamSink extends Sink - Foreach sink,具体实现是 ForeachSink extends Sink + - Kafka sink,具体实现是 KafkaSink extends Sink - 预计后续很快会支持 - RDBMS @@ -54,7 +57,7 @@ trait Sink { - 在故障恢复时,分两种情况讨论: - (i) 如果上次执行在本步 ***结束前即失效***,那么本次执行里 sink 应该完整写出计算结果 - (ii) 如果上次执行在本步 ***结束后才失效***,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了) -6. 在数据完整写出到 Sink 后,StreamExecution 通知 Source 可以废弃数据 +6. 在数据完整写出到 Sink 后,StreamExecution 通知 Source 可以废弃数据;然后把成功的批次 id 写入到 batchCommitLog ## Sink 的具体实现:HDFS-API compatible FS, Foreach @@ -172,13 +175,83 @@ writeStream } } } -} ``` 所以我们看到,foreach sink 需要使用者提供 writer,所以这里的可定制度就非常高。 但是仍然需要注意,由于 foreach 的 writer 可能被 open() 多次,可能有多个 task 同时调用一个 writer。所以推荐 writer 一定要写成幂等的,如果 writer 不幂等、那么 Structured Streaming 框架本身也没有更多的办法能够保证 end-to-end exactly-once guarantees 了。 +### (c) 具体实现: Kafka + +Spark 2.1.1 版本开始加入了 KafkaSink,使得 Spark 也能够将数据写入到 kafka 中。 + +通常我们使用如下方法写出到 kafka sink: + +```scala +writeStream + .format("kafka") + .option("checkpointLocation", ...) + .outputMode(...) + .option("kafka.bootstrap.servers", ...) // 写出到哪个集群 + .option("topic", ...) // 写出到哪个 topic +``` + +那么我们看这里 `KafkaSink` 具体的 `addBatch()` 实现是: + +```scala + // 来自:class KafkaSink extends Sink + // 版本:Spark 2.1.1, 2.2.0 + override def addBatch(batchId: Long, data: DataFrame): Unit = { + if (batchId <= latestBatchId) { + logInfo(s"Skipping already committed batch $batchId") + } else { + // 主要是通过 KafkaWriter.write() 来做写出; + // 在 KafkaWriter.write() 里,主要是继续通过 KafkaWriteTask.execute() 来做写出 + KafkaWriter.write(sqlContext.sparkSession, + data.queryExecution, executorKafkaParams, topic) + latestBatchId = batchId + } + } +``` + +那么我们继续看这里 `KafkaWriteTask` 具体的 `execute()` 实现是: + +```scala + // 来自:class KafkaWriteTask + // 版本:Spark 2.1.1, 2.2.0 + def execute(iterator: Iterator[InternalRow]): Unit = { + producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration) + while (iterator.hasNext && failedWrite == null) { + val currentRow = iterator.next() + // 这里的 projection 主要是构建 projectedRow,使得: + // 其第 0 号元素是 topic + // 其第 1 号元素是 key 的 binary 表示 + // 其第 2 号元素是 value 的 binary 表示 + val projectedRow = projection(currentRow) + val topic = projectedRow.getUTF8String(0) + val key = projectedRow.getBinary(1) + val value = projectedRow.getBinary(2) + if (topic == null) { + throw new NullPointerException(s"null topic present in the data. Use the " + + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") + } + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) + val callback = new Callback() { + override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { + if (failedWrite == null && e != null) { + failedWrite = e + } + } + } + producer.send(record, callback) + } + } +``` + +这里我们需要说明的是,由于 Spark 本身会失败重做 —— 包括单个 task 的失败重做、stage 的失败重做、整个拓扑的失败重做等 —— 那么同一条数据可能被写入到 kafka 一次以上。由于 kafka 目前还不支持 transactional write,所以多写入的数据不能被撤销,会造成一些重复。当然 kafka 自身的高可用写入(比如写入 broker 了的数据的 ack 消息没有成功送达 producer,导致 producer 重新发送数据时),也有可能造成重复。 + +在 kafka 支持 transactional write 之前,可能需要下游实现下去重机制。比如如果下游仍然是 Structured Streaming,那么可以使用 streaming deduplication 来获得去重后的结果。 + ## 总结 我们总结一下截至目前,Sink 已有的具体实现: @@ -187,6 +260,7 @@ writeStream | :-----------------------------: | :------------------------------: | :----: | :--------------------------------------: | | **HDFS-compatible file system** | ![checked](1.imgs/checked.png) | 已支持 | 包括但不限于 text, json, csv, parquet, orc, ... | | **ForeachSink** (自定操作幂等) | ![checked](1.imgs/checked.png) | 已支持 | 可定制度非常高的 sink | +| **Kafka** | ![negative](1.imgs/negative.png) | 已支持 | Kafka 目前不支持幂等写入,所以可能会有重复写入
(但推荐接着 Kafka 使用 streaming de-duplication 来去重) | | **ForeachSink** (自定操作不幂等) | ![negative](1.imgs/negative.png) | 已支持 | 不推荐使用不幂等的自定操作 | 这里我们特别强调一下,虽然 Structured Streaming 也内置了 `console` 这个 Source,但其实它的主要用途只是在技术会议/讲座上做 demo,不应用于线上生产系统。 diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.1 Structured Streaming \344\271\213\347\212\266\346\200\201\345\255\230\345\202\250\350\247\243\346\236\220.md" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.1 Structured Streaming \344\271\213\347\212\266\346\200\201\345\255\230\345\202\250\350\247\243\346\236\220.md" index c840b35..c3875f0 100644 --- "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.1 Structured Streaming \344\271\213\347\212\266\346\200\201\345\255\230\345\202\250\350\247\243\346\236\220.md" +++ "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/3.1 Structured Streaming \344\271\213\347\212\266\346\200\201\345\255\230\345\202\250\350\247\243\346\236\220.md" @@ -2,11 +2,13 @@ ***[酷玩 Spark] Structured Streaming 源码解析系列*** ,返回目录请 [猛戳这里](.) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本文内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (已发布:2.1.0, 开发中:2.1.1-SNAPSHOT) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` @@ -56,7 +58,7 @@ StateStore 模块的总体思路:

-### (a) StateStore 在不同的节点之间如何迁移### +### (a) StateStore 在不同的节点之间如何迁移 在 StreamExecution 执行过程中,随时在 operator 实际执行的 executor 节点上唤起一个状态存储分片、并读入前一个版本的数据即可(如果 executor 上已经存在一个分片,那么就直接重用,不用唤起分片、也不用读入数据了)。 @@ -67,7 +69,7 @@ StateStore 模块的总体思路: - 在一些情况下,需要从其他节点的 HDFS 数据副本上 load 状态数据,如图中 `executor c` 需要从 `executor b` 的硬盘上 load 数据; - 另外还有的情况是,同一份数据被同时 load 到不同的 executor 上,如 `executor d` 和 `executor a` 即是读入了同一份数据 —— 推测执行时就容易产生这种情况 —— 这时也不会产生问题,因为 load 进来的是同一份数据,然后在两个节点上各自修改,最终只会有一个节点能够成功提交对状态的修改。 -### (b) StateStore 的更新和查询 ### +### (b) StateStore 的更新和查询 我们前面也讲过,在一个状态存储分片里,是 key-value 的 store。这个 key-value 的 store 支持如下操作: @@ -82,11 +84,16 @@ StateStore 模块的总体思路: // 删除一条符合条件的 key-value def remove(condition: UnsafeRow => Boolean): Unit + // 根据 key 删除 key-value + def remove(key: UnsafeRow): Unit /* == 批量操作相关 =============================== */ // 提交当前执行批次的所有修改,将刷出到 HDFS,成功后版本将自增 def commit(): Long + + // 放弃当前执行批次的所有修改 + def abort(): Unit // 当前状态分片、当前版本的所有 key-value 状态 def iterator(): Iterator[(UnsafeRow, UnsafeRow)] diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.1 Structured Streaming \344\271\213 Event Time \350\247\243\346\236\220.md" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.1 Structured Streaming \344\271\213 Event Time \350\247\243\346\236\220.md" index e6616c4..a5a1ab2 100644 --- "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.1 Structured Streaming \344\271\213 Event Time \350\247\243\346\236\220.md" +++ "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.1 Structured Streaming \344\271\213 Event Time \350\247\243\346\236\220.md" @@ -2,11 +2,13 @@ ***[酷玩 Spark] Structured Streaming 源码解析系列*** ,返回目录请 [猛戳这里](.) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本文内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (已发布:2.1.0, 开发中:2.1.1-SNAPSHOT) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` @@ -78,7 +80,7 @@ Append 的语义将保证,一旦输出了某条 key,未来就不会再输出 #### (c) Update -> 小节注:本节 Update 模式的相关内容适用于 ≥ Spark 2.1.1 版本 —— 官方已开发完成,但尚未正式发布 +Update 模式已在 Spark 2.1.1 及以后版本获得正式支持。

@@ -94,7 +96,7 @@ Append 的语义将保证,一旦输出了某条 key,未来就不会再输出 ## 扩展阅读 -1. [Github: org/apache/spark/sql/catalyst/analysis/Analyzer.scala#TimeWindowing](https://github.com/apache/spark/blob/branch-2.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2223) +1. [Github: org/apache/spark/sql/catalyst/analysis/Analyzer.scala#TimeWindowing](https://github.com/apache/spark/blob/v2.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2232) 2. [Github: org/apache/spark/sql/catalyst/expressions/TimeWindow](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala) ## 参考资料 diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.2 Structured Streaming \344\271\213 Watermark \350\247\243\346\236\220.md" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.2 Structured Streaming \344\271\213 Watermark \350\247\243\346\236\220.md" index a463138..6d9585e 100644 --- "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.2 Structured Streaming \344\271\213 Watermark \350\247\243\346\236\220.md" +++ "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.2 Structured Streaming \344\271\213 Watermark \350\247\243\346\236\220.md" @@ -2,22 +2,24 @@ ***[酷玩 Spark] Structured Streaming 源码解析系列*** ,返回目录请 [猛戳这里](.) -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本文内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (已发布:2.1.0, 开发中:2.1.1-SNAPSHOT) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` -阅读本文前,请一定先阅读  [Structured Streaming 之 Event Time 解析](4.1 Structured Streaming 之 Event Time 解析.md),其中解析了 Structured Streaming 的 Event Time 及为什么需要 Watermark。 +阅读本文前,请一定先阅读  [Structured Streaming 之 Event Time 解析](4.1%20Structured%20Streaming%20之%20Event%20Time%20解析.md),其中解析了 Structured Streaming 的 Event Time 及为什么需要 Watermark。 ## 引言

-我们在前文 [Structured Streaming 之 Event Time 解析](4.1 Structured Streaming 之 Event Time 解析.md) 中的例子,在: +我们在前文 [Structured Streaming 之 Event Time 解析](4.1%20Structured%20Streaming%20之%20Event%20Time%20解析.md) 中的例子,在: - (a) 对 event time 做 *window()* + *groupBy().count()* 即利用状态做跨执行批次的聚合,并且 - (b) 输出模式为 Append 模式 @@ -169,11 +171,11 @@ lastExecution.executedPlan.collect { ## 关于 watermark 的一些说明 -关于 Structured Streaming 的目前 watermark 机制,我们有几点一些说明: +关于 Structured Streaming 的目前 watermark 机制,我们有几点说明: 1. 再次强调,(a+) 在对 event time 做 *window()* + *groupBy().aggregation()* 即利用状态做跨执行批次的聚合,并且 (b+) 输出模式为 Append 模式或 Update 模式时,才需要 watermark,其它时候不需要; 2. watermark 的本质是要帮助 StateStore 清理状态、不至于使 StateStore 无限增长;同时,维护 Append 正确的语义(即判断在何时某条结果不再改变、从而将其输出); -3. 目前 2.1 版本的 watermark 实现,是依靠最大 event time 减去一定 late threshold 得到的,尚未支持 Source 端提供的 watermark; +3. 目前版本(Spark 2.2)的 watermark 实现,是依靠最大 event time 减去一定 late threshold 得到的,尚未支持 Source 端提供的 watermark; - 未来可能的改进是,从 Source 端即开始提供关于 watermark 的特殊信息,传递到 StreamExecution 中使用 [2],这样可以加快 watermark 的进展,从而能更早的得到输出数据 4. Structured Streaming 对于 watermark 的承诺是:(a) watermark 值不后退(包括正常运行和发生故障恢复时);(b) watermark 值达到后,大多时候会在下一个执行批次输出结果,但也有可能延迟一两个批次(发生故障恢复时),上层应用不应该对此有依赖。 diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.imgs/100.png" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.imgs/100.png" index 169999a..5558e10 100644 Binary files "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.imgs/100.png" and "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/4.imgs/100.png" differ diff --git "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/README.md" "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/README.md" index 7b39a7f..1e68fd1 100644 --- "a/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/README.md" +++ "b/Structured Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/README.md" @@ -1,31 +1,33 @@ ## Structured Streaming 源码解析系列 -[「腾讯·广点通」](http://e.qq.com)技术团队荣誉出品 +[「腾讯广告」](http://e.qq.com)技术团队(原腾讯广点通技术团队)荣誉出品 ``` 本文内容适用范围: -* 2016.12.28 update, Spark 2.1 全系列 √ (已发布:2.1.0, 开发中:2.1.1-SNAPSHOT) +* 2018.11.02 update, Spark 2.4 全系列 √ (已发布:2.4.0) +* 2018.02.28 update, Spark 2.3 全系列 √ (已发布:2.3.0 ~ 2.3.2) +* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0 ~ 2.2.3) ``` - *一、概述* - - [1.1 Structured Streaming 实现思路与实现概述](1.1 Structured Streaming 实现思路与实现概述.md) - - 1.2 Structured Streaming 之 Output Modes 解析 + - [1.1 Structured Streaming 实现思路与实现概述](1.1%20Structured%20Streaming%20实现思路与实现概述.md) - *二、Sources 与 Sinks* - - [2.1 Structured Streaming 之 Source 解析](2.1 Structured Streaming 之 Source 解析.md) - - [2.2 Structured Streaming 之 Sink 解析](2.2 Structured Streaming 之 Sink 解析.md) + - [2.1 Structured Streaming 之 Source 解析](2.1%20Structured%20Streaming%20之%20Source%20解析.md) + - [2.2 Structured Streaming 之 Sink 解析](2.2%20Structured%20Streaming%20之%20Sink%20解析.md) - *三、状态存储* - - [3.1 Structured Streaming 之状态存储解析](3.1 Structured Streaming 之状态存储解析.md) + - [3.1 Structured Streaming 之状态存储解析](3.1%20Structured%20Streaming%20之状态存储解析.md) - *四、Event Time 与 Watermark* - - [4.1 Structured Streaming 之 Event Time 解析](4.1 Structured Streaming 之 Event Time 解析.md) - - [4.2 Structured Streaming 之 Watermark 解析](4.2 Structured Streaming 之 Watermark 解析.md) + - [4.1 Structured Streaming 之 Event Time 解析](4.1%20Structured%20Streaming%20之%20Event%20Time%20解析.md) + - [4.2 Structured Streaming 之 Watermark 解析](4.2%20Structured%20Streaming%20之%20Watermark%20解析.md) - *#、一些资源和 Q&A* - - [Spark 资源集合](https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88) (包括 Spark Streaming 中文微信群、Spark Summit 视频等资源集合) + - [Spark 资源集合](https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88) (包括 Spark Streaming 中文微信群、Spark Summit 视频等资源集合)
![wechat_spark_streaming_small](../Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88/resources/wechat_spark_streaming_small_.PNG) - [Q&A] Structured Streaming 与 Spark Streaming 的区别 ## 致谢 - Github [@wongxingjun](http://github.com/wongxingjun) 同学指出 2 处 typo,并提 Pull Request 修正(PR 已合并) - Github [@wangmiao1981](http://github.com/wangmiao1981) 同学指出几处 typo,并提 Pull Request 修正(PR 已合并) +- Github [@zilutang](http://github.com/zilutang) 同学指出 1 处 typo,并提 Pull Request 修正(PR 已合并) > 谨以此《Structured Streaming 源码解析系列》和以往的《Spark Streaming 源码解析系列》,向“把大数据变得更简单 (make big data simple) ”的创新者们,表达感谢和敬意。