Skip to content

Commit cf94e14

Browse files
committed
make MockSQLSourceCompositor support cofinguration
1 parent 25c3da6 commit cf94e14

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

src/main/java/streaming/core/compositor/spark/ss/source/MockSQLSourceCompositor.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package streaming.core.compositor.spark.ss.source
22

33
import java.util
44

5+
import net.sf.json.JSONArray
56
import org.apache.log4j.Logger
67
import org.apache.spark.sql.execution.streaming.MemoryStream
78
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
@@ -22,13 +23,17 @@ class MockSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
2223
this._configParams = configParams
2324
}
2425

26+
def data = {
27+
_configParams(0).map(f => f._2.asInstanceOf[JSONArray].map(k => k.asInstanceOf[String]).toSeq).toSeq
28+
}
29+
2530
override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
2631
val sparkSSRt = sparkStructuredStreamingRuntime(params)
2732
val sparkSession = sparkSSRt.sparkSession
2833
import sparkSession.implicits._
2934
implicit val sqlContext = sparkSession.sqlContext
30-
val inputData = MemoryStream[Int]
31-
inputData.addData(Seq(1, 2, 3, 4))
35+
val inputData = MemoryStream[String]
36+
inputData.addData(data.flatMap(f=>f).seq)
3237
val df = inputData.toDS()
3338
List(df.asInstanceOf[T])
3439
}

src/main/resources-debug/test/ss-test.json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@
88
"compositor": [
99
{
1010
"name": "ss.source.mock",
11-
"params": [
12-
{
13-
}
14-
]
11+
"params": [{"duration1":["1","2","3"]}]
1512
},
1613
{
1714
"name": "ss.table",

0 commit comments

Comments
 (0)