Skip to content

Commit 14ac8d9

Browse files
committed
remove structrued streaming to another project
1 parent cf94e14 commit 14ac8d9

File tree

9 files changed

+38
-292
lines changed

9 files changed

+38
-292
lines changed

pom.xml

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<scala.version>2.10.3</scala.version>
1212
<scala.2.version>2.10</scala.2.version>
1313
<spark.version>1.6.1</spark.version>
14+
<streamingpro.version>0.4.6</streamingpro.version>
1415
<spark.streaming.kafka.artifactId>spark-streaming-kafka_2.10</spark.streaming.kafka.artifactId>
1516
<serviceframework.version>1.3.0-SNAPSHOT</serviceframework.version>
1617
</properties>
@@ -34,6 +35,7 @@
3435
<spark.streaming.kafka.artifactId>spark-streaming-kafka_2.10</spark.streaming.kafka.artifactId>
3536
<spark.sql.kafka.artifactId>spark-streaming-kafka_2.10</spark.sql.kafka.artifactId>
3637
<serviceframework.version>1.3.0-SNAPSHOT</serviceframework.version>
38+
<structured.streaming.artifacteId>structured-streaming-stub</structured.streaming.artifacteId>
3739
</properties>
3840
</profile>
3941

@@ -44,6 +46,7 @@
4446
<spark.streaming.kafka.artifactId>spark-streaming-kafka-0-8_2.10</spark.streaming.kafka.artifactId>
4547
<spark.sql.kafka.artifactId>spark-sql-kafka-0-10_2.10</spark.sql.kafka.artifactId>
4648
<serviceframework.version>1.3.2-SNAPSHOT-9.2.16</serviceframework.version>
49+
<structured.streaming.artifacteId>structured-streaming-core</structured.streaming.artifacteId>
4750
</properties>
4851
</profile>
4952

@@ -462,6 +465,12 @@
462465
</exclusions>
463466
</dependency>
464467

468+
<dependency>
469+
<groupId>streamingpro.king</groupId>
470+
<artifactId>${structured.streaming.artifacteId}</artifactId>
471+
<version>${streamingpro.version}-SNAPSHOT-online-2.0.2</version>
472+
</dependency>
473+
465474
<dependency>
466475
<groupId>org.scalactic</groupId>
467476
<artifactId>scalactic_2.10</artifactId>
@@ -597,15 +606,5 @@
597606
</plugins>
598607
</build>
599608

600-
<distributionManagement>
601-
<repository>
602-
<id>releases</id>
603-
<name>releases</name>
604-
<url>http://mvn.letv.com/nexus/content/repositories/releases/</url>
605-
</repository>
606-
<snapshotRepository>
607-
<id>snapshots</id>
608-
<url>http://mvn.letv.com/nexus/content/repositories/snapshots</url>
609-
</snapshotRepository>
610-
</distributionManagement>
609+
611610
</project>

src/main/java/streaming/core/Dispatcher.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package streaming.core
22

33
import java.util.{Map => JMap}
44

5+
import org.apache.spark.SparkContext
56
import serviceframework.dispatcher.StrategyDispatcher
67
import streaming.common.DefaultShortNameMapping
7-
import streaming.core.strategy.platform.{SparkStructuredStreamingRuntime, PlatformManager, SparkRuntime, SparkStreamingRuntime}
8+
import streaming.core.strategy.platform.{ PlatformManager, SparkRuntime, SparkStreamingRuntime}
89

910
import scala.collection.JavaConversions._
1011

@@ -20,7 +21,10 @@ object Dispatcher {
2021
val sparkContext = runtime match {
2122
case s: SparkStreamingRuntime => s.streamingContext.sparkContext
2223
case s2: SparkRuntime => s2.sparkContext
23-
case s3: SparkStructuredStreamingRuntime => s3.sparkSession.sparkContext
24+
case _ =>
25+
Class.forName("streaming.core.strategy.platform.SparkStructuredStreamingRuntime").
26+
getMethod("sparkContext").
27+
invoke(runtime).asInstanceOf[SparkContext]
2428
}
2529

2630
val jobFilePath = contextParams.get("streaming.job.file.path").toString

src/main/java/streaming/core/compositor/spark/ss/output/SQLOutputCompositor.scala

Lines changed: 0 additions & 84 deletions
This file was deleted.

src/main/java/streaming/core/compositor/spark/ss/source/MockSQLSourceCompositor.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

src/main/java/streaming/core/compositor/spark/ss/source/SQLSourceCompositor.scala

Lines changed: 0 additions & 31 deletions
This file was deleted.

src/main/java/streaming/core/compositor/spark/streaming/CompositorHelper.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ package streaming.core.compositor.spark.streaming
22

33
import java.util
44

5+
import org.apache.spark.SparkContext
56
import streaming.common.SQLContextHolder
6-
import streaming.core.strategy.platform.{SparkStructuredStreamingRuntime, SparkRuntime, SparkStreamingRuntime}
7+
import streaming.core.strategy.platform.{ SparkRuntime, SparkStreamingRuntime}
78

89
import scala.collection.JavaConversions._
910

@@ -30,16 +31,14 @@ trait CompositorHelper {
3031
params.get("_runtime_").asInstanceOf[SparkRuntime]
3132
}
3233

33-
def sparkStructuredStreamingRuntime(params: util.Map[Any, Any]) = {
34-
params.get("_runtime_").asInstanceOf[SparkStructuredStreamingRuntime]
35-
}
36-
3734
def sparkContext(params: util.Map[Any, Any]) = {
3835
params.get("_runtime_") match {
3936
case a: SparkStreamingRuntime => a.streamingContext.sparkContext
4037
case b: SparkRuntime => b.sparkContext
41-
case c: SparkStructuredStreamingRuntime => c.sparkSession.sparkContext
42-
case _ => throw new RuntimeException("get _runtime_ fail")
38+
case _ =>
39+
Class.forName("streaming.core.strategy.platform.SparkStructuredStreamingRuntime").
40+
getMethod("sparkContext").
41+
invoke(params.get("_runtime_").asInstanceOf[SparkRuntime]).asInstanceOf[SparkContext]
4342
}
4443
}
4544

src/main/java/streaming/core/strategy/platform/PlatformManager.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import java.util.{List => JList, Map => JMap}
66
import net.csdn.ServiceFramwork
77
import net.csdn.bootstrap.Application
88
import net.csdn.common.logging.Loggers
9+
import org.apache.spark.SparkContext
910
import serviceframework.dispatcher.StrategyDispatcher
1011
import streaming.common.zk.{ZKClient, ZkRegister}
1112
import streaming.common.{ParamsUtil, SQLContextHolder, SparkCompatibility}
@@ -173,12 +174,20 @@ object PlatformManager {
173174
}
174175

175176
private def createSQLContextHolder(params: java.util.Map[Any, Any], runtime: StreamingRuntime) = {
177+
176178
val sc = runtime match {
177179
case a: SparkStreamingRuntime => a.streamingContext.sparkContext
178180
case b: SparkRuntime => b.sparkContext
179-
case c: SparkStructuredStreamingRuntime => c.sparkSession.sparkContext
180-
case _ => throw new RuntimeException("get _runtime_ fail")
181+
case _ => try {
182+
Class.forName("streaming.core.strategy.platform.SparkStructuredStreamingRuntime").
183+
getMethod("sparkContext").
184+
invoke(runtime).asInstanceOf[SparkContext]
185+
} catch {
186+
case e: Exception => throw new RuntimeException(s"No spark context is found in runtime(${runtime.getClass.getCanonicalName})")
187+
}
181188
}
189+
190+
182191
new SQLContextHolder(
183192
params.containsKey("streaming.enableHiveSupport") &&
184193
params.get("streaming.enableHiveSupport").toString.toBoolean, sc)
@@ -193,8 +202,10 @@ object PlatformManager {
193202
case platform: String if platform == SPARK =>
194203
SparkRuntime.getOrCreate(tempParams)
195204

196-
case platform: String if (platform == SparkStructuredStreamingRuntime || platform == SPAKR_S_S) =>
197-
SparkStructuredStreamingRuntime.getOrCreate(tempParams)
205+
case platform: String if (platform == SPAKR_STRUCTURED_STREAMING || platform == SPAKR_S_S) =>
206+
Class.forName("streaming.core.strategy.platform.SparkStructuredStreamingRuntime").
207+
getMethod("getOrCreate", classOf[JMap[Any, Any]]).
208+
invoke(null, tempParams).asInstanceOf[StreamingRuntime]
198209

199210
case platform: String if platform == "storm" =>
200211
null

0 commit comments

Comments
 (0)