Skip to content

Commit 2d1d8a3

Browse files
committed
fix SQLCompositor error when checkpoint enabled
1 parent 8d647f2 commit 2d1d8a3

File tree

2 files changed

+6
-4
lines changed

2 files changed

+6
-4
lines changed

src/main/java/streaming/core/compositor/spark/streaming/transformation/SQLCompositor.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
3939

4040
require(sql.isDefined, "please set sql by variable `sql` in config file")
4141
val _sql = translateSQL(sql.get,params)
42+
val _outputTableName = outputTableName
4243
if (params.containsKey(TABLE)) {
4344

4445
//parent compositor is tableCompositor
@@ -47,7 +48,7 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
4748
params.put(FUNC, (df: Any) => {
4849
val sqlContext = func(df)
4950
val newDF = sqlContext.sql(_sql)
50-
outputTableName match {
51+
_outputTableName match {
5152
case Some(tableName) =>
5253
newDF.registerTempTable(tableName)
5354
case None =>
@@ -68,7 +69,7 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
6869
}
6970

7071
val newDF = func(oldDF).sqlContext.sql(_sql)
71-
outputTableName match {
72+
_outputTableName match {
7273
case Some(tableName) =>
7374
newDF.registerTempTable(tableName)
7475
case None =>

src/main/java/streaming/core/compositor/spark/transformation/SQLCompositor.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,15 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
3939

4040
require(sql.isDefined, "please set sql by variable `sql` in config file")
4141
val _sql = translateSQL(sql.get,params)
42+
val _outputTableName = outputTableName
4243
if (params.containsKey(TABLE)) {
4344
//parent compositor is tableCompositor
4445

4546
val func = params.get(TABLE).asInstanceOf[(Any) => SQLContext]
4647
params.put(FUNC, (rddOrDF: Any) => {
4748
val sqlContext = func(rddOrDF)
4849
val newDF = sqlContext.sql(_sql)
49-
outputTableName match {
50+
_outputTableName match {
5051
case Some(tableName) =>
5152
newDF.registerTempTable(tableName)
5253
case None =>
@@ -59,7 +60,7 @@ class SQLCompositor[T] extends Compositor[T] with CompositorHelper {
5960
val func = params.get(FUNC).asInstanceOf[(DataFrame) => DataFrame]
6061
params.put(FUNC, (df: DataFrame) => {
6162
val newDF = func(df).sqlContext.sql(_sql)
62-
outputTableName match {
63+
_outputTableName match {
6364
case Some(tableName) =>
6465
newDF.registerTempTable(tableName)
6566
case None =>

0 commit comments

Comments
 (0)