Skip to content

Commit 25c3da6

Browse files
committed
adding structrued streaming support
1 parent 2d1d8a3 commit 25c3da6

File tree

11 files changed

+403
-21
lines changed

11 files changed

+403
-21
lines changed

pom.xml

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>streaming.king</groupId>
66
<artifactId>streamingpro</artifactId>
7-
<version>0.4.1-SNAPSHOT-${mode}-${spark.version}</version>
7+
<version>0.4.6-SNAPSHOT-${mode}-${spark.version}</version>
88

99
<properties>
1010
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -26,30 +26,23 @@
2626

2727
<profiles>
2828

29-
<profile>
30-
<id>spark-1.5.1</id>
31-
<properties>
32-
<spark.version>1.5.1</spark.version>
33-
<spark.streaming.kafka.artifactId>spark-streaming-kafka_2.10</spark.streaming.kafka.artifactId>
34-
<serviceframework.version>1.3.0-SNAPSHOT</serviceframework.version>
35-
</properties>
36-
</profile>
37-
3829
<profile>
3930
<id>spark-1.6.1</id>
4031
<!-- default -->
4132
<properties>
4233
<spark.version>1.6.1</spark.version>
4334
<spark.streaming.kafka.artifactId>spark-streaming-kafka_2.10</spark.streaming.kafka.artifactId>
35+
<spark.sql.kafka.artifactId>spark-streaming-kafka_2.10</spark.sql.kafka.artifactId>
4436
<serviceframework.version>1.3.0-SNAPSHOT</serviceframework.version>
4537
</properties>
4638
</profile>
4739

4840
<profile>
49-
<id>spark-2.0.0</id>
41+
<id>spark-2.0.2</id>
5042
<properties>
51-
<spark.version>2.0.0</spark.version>
43+
<spark.version>2.0.2</spark.version>
5244
<spark.streaming.kafka.artifactId>spark-streaming-kafka-0-8_2.10</spark.streaming.kafka.artifactId>
45+
<spark.sql.kafka.artifactId>spark-sql-kafka-0-10_2.10</spark.sql.kafka.artifactId>
5346
<serviceframework.version>1.3.2-SNAPSHOT-9.2.16</serviceframework.version>
5447
</properties>
5548
</profile>
@@ -83,6 +76,13 @@
8376
<artifactId>${spark.streaming.kafka.artifactId}</artifactId>
8477
<version>${spark.version}</version>
8578
</dependency>
79+
80+
<dependency>
81+
<groupId>org.apache.spark</groupId>
82+
<artifactId>${spark.sql.kafka.artifactId}</artifactId>
83+
<version>${spark.version}</version>
84+
</dependency>
85+
8686
<dependency>
8787
<groupId>org.apache.spark</groupId>
8888
<artifactId>spark-core_2.10</artifactId>
@@ -138,6 +138,11 @@
138138
<artifactId>${spark.streaming.kafka.artifactId}</artifactId>
139139
<version>${spark.version}</version>
140140
</dependency>
141+
<dependency>
142+
<groupId>org.apache.spark</groupId>
143+
<artifactId>${spark.sql.kafka.artifactId}</artifactId>
144+
<version>${spark.version}</version>
145+
</dependency>
141146
<dependency>
142147
<groupId>org.apache.spark</groupId>
143148
<artifactId>spark-sql_2.10</artifactId>
@@ -168,6 +173,11 @@
168173
<artifactId>${spark.streaming.kafka.artifactId}</artifactId>
169174
<version>${spark.version}</version>
170175
</dependency>
176+
<dependency>
177+
<groupId>org.apache.spark</groupId>
178+
<artifactId>${spark.sql.kafka.artifactId}</artifactId>
179+
<version>${spark.version}</version>
180+
</dependency>
171181
<dependency>
172182
<groupId>org.apache.spark</groupId>
173183
<artifactId>spark-hive_2.10</artifactId>
@@ -587,5 +597,15 @@
587597
</plugins>
588598
</build>
589599

590-
600+
<distributionManagement>
601+
<repository>
602+
<id>releases</id>
603+
<name>releases</name>
604+
<url>http://mvn.letv.com/nexus/content/repositories/releases/</url>
605+
</repository>
606+
<snapshotRepository>
607+
<id>snapshots</id>
608+
<url>http://mvn.letv.com/nexus/content/repositories/snapshots</url>
609+
</snapshotRepository>
610+
</distributionManagement>
591611
</project>
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package streaming.common
2+
3+
import serviceframework.dispatcher.ShortNameMapping
4+
5+
/**
6+
* 11/21/16 WilliamZhu(allwefantasy@gmail.com)
7+
*/
8+
9+
class DefaultShortNameMapping extends ShortNameMapping {
10+
private val compositorNameMap: Map[String, String] = Map[String, String](
11+
"spark" -> "streaming.core.strategy.SparkStreamingStrategy",
12+
13+
"batch.source" -> "streaming.core.compositor.spark.source.SQLSourceCompositor",
14+
"batch.sql" -> "streaming.core.compositor.spark.transformation.SQLCompositor",
15+
"batch.table" -> "streaming.core.compositor.spark.transformation.JSONTableCompositor",
16+
"batch.refTable" -> "streaming.core.compositor.spark.transformation.JSONRefTableCompositor",
17+
18+
"stream.source.kafka" -> "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
19+
"stream.sql" -> "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
20+
"stream.table" -> "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
21+
"stream.refTable" -> "streaming.core.compositor.spark.streaming.transformation.JSONRefTableCompositor",
22+
23+
"ss.source" -> "streaming.core.compositor.spark.ss.source.SQLSourceCompositor",
24+
"ss.source.mock" -> "streaming.core.compositor.spark.ss.source.MockSQLSourceCompositor",
25+
"ss.sql" -> "streaming.core.compositor.spark.transformation.SQLCompositor",
26+
"ss.table" -> "streaming.core.compositor.spark.transformation.JSONTableCompositor",
27+
"ss.refTable" -> "streaming.core.compositor.spark.transformation.JSONRefTableCompositor",
28+
"ss.output" -> "streaming.core.compositor.spark.ss.output.SQLOutputCompositor"
29+
)
30+
31+
override def forName(shortName: String): String = {
32+
if (compositorNameMap.contains(shortName)) {
33+
compositorNameMap(shortName)
34+
} else {
35+
shortName
36+
}
37+
}
38+
}

src/main/java/streaming/core/Dispatcher.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package streaming.core
33
import java.util.{Map => JMap}
44

55
import serviceframework.dispatcher.StrategyDispatcher
6-
import streaming.core.strategy.platform.{PlatformManager, SparkRuntime, SparkStreamingRuntime}
6+
import streaming.common.DefaultShortNameMapping
7+
import streaming.core.strategy.platform.{SparkStructuredStreamingRuntime, PlatformManager, SparkRuntime, SparkStreamingRuntime}
78

89
import scala.collection.JavaConversions._
910

@@ -12,12 +13,14 @@ import scala.collection.JavaConversions._
1213
*/
1314
object Dispatcher {
1415
def dispatcher(contextParams: JMap[Any, Any]): StrategyDispatcher[Any] = {
16+
val defaultShortNameMapping = new DefaultShortNameMapping()
1517
if (contextParams!=null && contextParams.containsKey("streaming.job.file.path")) {
1618
val runtime = contextParams.get("_runtime_")
1719

1820
val sparkContext = runtime match {
1921
case s: SparkStreamingRuntime => s.streamingContext.sparkContext
2022
case s2: SparkRuntime => s2.sparkContext
23+
case s3: SparkStructuredStreamingRuntime => s3.sparkSession.sparkContext
2124
}
2225

2326
val jobFilePath = contextParams.get("streaming.job.file.path").toString
@@ -34,9 +37,9 @@ object Dispatcher {
3437
textFile(jobFilePath).collect().mkString("\n")
3538
}
3639

37-
StrategyDispatcher.getOrCreate(jobConfigStr)
40+
StrategyDispatcher.getOrCreate(jobConfigStr,defaultShortNameMapping)
3841
} else {
39-
StrategyDispatcher.getOrCreate(null)
42+
StrategyDispatcher.getOrCreate(null,defaultShortNameMapping)
4043
}
4144

4245
}

src/main/java/streaming/core/LocalStreamingApp.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ object LocalStreamingApp {
1515
"-streaming.duration", "20",
1616
"-streaming.name", "god",
1717
"-streaming.rest", "false"
18-
// ,"-streaming.job.file.path","hdfs://cdn237:8020/tmp/bb.yml"
19-
,"-streaming.driver.port","9902"
18+
,"-streaming.job.file.path","classpath:///test/ss-test.json"
19+
,"-streaming.driver.port","9902",
20+
"-streaming.platform", "ss",
21+
"-streaming.checkpoint","file:///tmp/ss"
2022
// ,"-streaming.testinputstream.offsetPath", "hdfs://cdn237:8020/tmp/localstreampingapp"
2123
// ,"-streaming.spark.hadoop.fs.defaultFS","hdfs://cdn237:8020"
2224
))
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package streaming.core.compositor.spark.ss.output
2+
3+
import java.util
4+
import java.util.concurrent.TimeUnit
5+
6+
import org.apache.log4j.Logger
7+
import org.apache.spark.sql.DataFrame
8+
import org.apache.spark.sql.streaming.ProcessingTime
9+
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
10+
import streaming.core.compositor.spark.streaming.CompositorHelper
11+
import streaming.core.strategy.ParamsValidator
12+
13+
import scala.collection.JavaConversions._
14+
15+
/**
16+
* 5/11/16 WilliamZhu(allwefantasy@gmail.com)
17+
*/
18+
class SQLOutputCompositor[T] extends Compositor[T] with CompositorHelper with ParamsValidator {
19+
20+
private var _configParams: util.List[util.Map[Any, Any]] = _
21+
val logger = Logger.getLogger(classOf[SQLOutputCompositor[T]].getName)
22+
23+
24+
override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
25+
this._configParams = configParams
26+
}
27+
28+
def path = {
29+
config[String]("path", _configParams)
30+
}
31+
32+
def format = {
33+
config[String]("format", _configParams)
34+
}
35+
36+
def mode = {
37+
config[String]("mode", _configParams)
38+
}
39+
40+
def cfg = {
41+
val _cfg = _configParams(0).map(f => (f._1.asInstanceOf[String], f._2.asInstanceOf[String])).toMap
42+
_cfg - "path" - "mode" - "format"
43+
}
44+
45+
override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
46+
val oldDf = middleResult.get(0).asInstanceOf[DataFrame]
47+
val func = params.get("_func_").asInstanceOf[(DataFrame) => DataFrame]
48+
val _resource = if (params.containsKey("streaming.sql.out.path")) Some(params("streaming.sql.out.path").toString) else path
49+
50+
val duration = params.getOrElse("streaming.duration", "10").toString.toInt
51+
val _mode = if (mode.isDefined) mode.get else "append"
52+
53+
54+
val _cfg = cfg
55+
val _format = format.get
56+
57+
try {
58+
val df = func(oldDf)
59+
val query = df.writeStream
60+
if (params.containsKey("streaming.checkpoint")) {
61+
val checkpointDir = params.get("streaming.checkpoint").toString
62+
query.option("checkpointLocation", checkpointDir)
63+
}
64+
65+
_resource match {
66+
case Some(p) => query.option("path", p)
67+
case None =>
68+
}
69+
70+
query.outputMode(_mode)
71+
72+
val rt = query.trigger(ProcessingTime(duration, TimeUnit.SECONDS)).options(_cfg).format(_format).start()
73+
rt.awaitTermination()
74+
} catch {
75+
case e: Exception => e.printStackTrace()
76+
}
77+
params.remove("sql")
78+
new util.ArrayList[T]()
79+
}
80+
81+
override def valid(params: util.Map[Any, Any]): (Boolean, String) = {
82+
(true, "")
83+
}
84+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package streaming.core.compositor.spark.ss.source
2+
3+
import java.util
4+
5+
import org.apache.log4j.Logger
6+
import org.apache.spark.sql.execution.streaming.MemoryStream
7+
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
8+
import streaming.core.compositor.spark.streaming.CompositorHelper
9+
import streaming.core.strategy.platform.SparkStructuredStreamingRuntime
10+
11+
import scala.collection.JavaConversions._
12+
13+
/**
14+
* 11/21/16 WilliamZhu(allwefantasy@gmail.com)
15+
*/
16+
class MockSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
17+
private var _configParams: util.List[util.Map[Any, Any]] = _
18+
19+
val logger = Logger.getLogger(classOf[MockSQLSourceCompositor[T]].getName)
20+
21+
override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
22+
this._configParams = configParams
23+
}
24+
25+
override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
26+
val sparkSSRt = sparkStructuredStreamingRuntime(params)
27+
val sparkSession = sparkSSRt.sparkSession
28+
import sparkSession.implicits._
29+
implicit val sqlContext = sparkSession.sqlContext
30+
val inputData = MemoryStream[Int]
31+
inputData.addData(Seq(1, 2, 3, 4))
32+
val df = inputData.toDS()
33+
List(df.asInstanceOf[T])
34+
}
35+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package streaming.core.compositor.spark.ss.source
2+
3+
import java.util
4+
5+
import org.apache.log4j.Logger
6+
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
7+
import streaming.core.compositor.spark.streaming.CompositorHelper
8+
import streaming.core.strategy.platform.SparkStructuredStreamingRuntime
9+
10+
import scala.collection.JavaConversions._
11+
12+
/**
13+
* 11/21/16 WilliamZhu(allwefantasy@gmail.com)
14+
*/
15+
class SQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
16+
private var _configParams: util.List[util.Map[Any, Any]] = _
17+
18+
val logger = Logger.getLogger(classOf[SQLSourceCompositor[T]].getName)
19+
20+
override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
21+
this._configParams = configParams
22+
}
23+
24+
override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
25+
val sparkSSRt = sparkRuntime(params).asInstanceOf[SparkStructuredStreamingRuntime]
26+
val sourcePath = if (params.containsKey("streaming.sql.source.path")) params("streaming.sql.source.path").toString else _configParams(0)("path").toString
27+
val df = sparkSSRt.sparkSession.readStream.format(_configParams(0)("format").toString).options(
28+
(_configParams(0) - "format" - "path").map(f => (f._1.asInstanceOf[String], f._2.asInstanceOf[String])).toMap).load(sourcePath)
29+
List(df.asInstanceOf[T])
30+
}
31+
}

src/main/java/streaming/core/compositor/spark/streaming/CompositorHelper.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package streaming.core.compositor.spark.streaming
33
import java.util
44

55
import streaming.common.SQLContextHolder
6-
import streaming.core.strategy.platform.{SparkRuntime, SparkStreamingRuntime}
6+
import streaming.core.strategy.platform.{SparkStructuredStreamingRuntime, SparkRuntime, SparkStreamingRuntime}
77

88
import scala.collection.JavaConversions._
99

@@ -30,10 +30,15 @@ trait CompositorHelper {
3030
params.get("_runtime_").asInstanceOf[SparkRuntime]
3131
}
3232

33+
def sparkStructuredStreamingRuntime(params: util.Map[Any, Any]) = {
34+
params.get("_runtime_").asInstanceOf[SparkStructuredStreamingRuntime]
35+
}
36+
3337
def sparkContext(params: util.Map[Any, Any]) = {
3438
params.get("_runtime_") match {
3539
case a: SparkStreamingRuntime => a.streamingContext.sparkContext
3640
case b: SparkRuntime => b.sparkContext
41+
case c: SparkStructuredStreamingRuntime => c.sparkSession.sparkContext
3742
case _ => throw new RuntimeException("get _runtime_ fail")
3843
}
3944
}

0 commit comments

Comments
 (0)