File tree Expand file tree Collapse file tree 2 files changed +6
-4
lines changed
src/main/java/streaming/core/compositor/spark Expand file tree Collapse file tree 2 files changed +6
-4
lines changed Original file line number Diff line number Diff line change @@ -39,6 +39,7 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
39
39
40
40
require(sql.isDefined, " please set sql by variable `sql` in config file" )
41
41
val _sql = translateSQL(sql.get,params)
42
+ val _outputTableName = outputTableName
42
43
if (params.containsKey(TABLE )) {
43
44
44
45
// parent compositor is tableCompositor
@@ -47,7 +48,7 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
47
48
params.put(FUNC , (df : Any ) => {
48
49
val sqlContext = func(df)
49
50
val newDF = sqlContext.sql(_sql)
50
- outputTableName match {
51
+ _outputTableName match {
51
52
case Some (tableName) =>
52
53
newDF.registerTempTable(tableName)
53
54
case None =>
@@ -68,7 +69,7 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
68
69
}
69
70
70
71
val newDF = func(oldDF).sqlContext.sql(_sql)
71
- outputTableName match {
72
+ _outputTableName match {
72
73
case Some (tableName) =>
73
74
newDF.registerTempTable(tableName)
74
75
case None =>
Original file line number Diff line number Diff line change @@ -39,14 +39,15 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
39
39
40
40
require(sql.isDefined, " please set sql by variable `sql` in config file" )
41
41
val _sql = translateSQL(sql.get,params)
42
+ val _outputTableName = outputTableName
42
43
if (params.containsKey(TABLE )) {
43
44
// parent compositor is tableCompositor
44
45
45
46
val func = params.get(TABLE ).asInstanceOf [(Any ) => SQLContext ]
46
47
params.put(FUNC , (rddOrDF : Any ) => {
47
48
val sqlContext = func(rddOrDF)
48
49
val newDF = sqlContext.sql(_sql)
49
- outputTableName match {
50
+ _outputTableName match {
50
51
case Some (tableName) =>
51
52
newDF.registerTempTable(tableName)
52
53
case None =>
@@ -59,7 +60,7 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
59
60
val func = params.get(FUNC ).asInstanceOf [(DataFrame ) => DataFrame ]
60
61
params.put(FUNC , (df : DataFrame ) => {
61
62
val newDF = func(df).sqlContext.sql(_sql)
62
- outputTableName match {
63
+ _outputTableName match {
63
64
case Some (tableName) =>
64
65
newDF.registerTempTable(tableName)
65
66
case None =>
You can’t perform that action at this time.
0 commit comments