Skip to content

Commit 1e7bf33

Browse files
committed
Fix error that the SparkEnv is null in the real cluster
1 parent b853f12 commit 1e7bf33

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

src/main/scala/org/apache/spark/sql/hbase/SparkSqlRegionObserver.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ import org.apache.log4j.Logger
2626
import org.apache.spark._
2727
import org.apache.spark.executor.TaskMetrics
2828
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.scheduler.LiveListenerBus
30+
import org.apache.spark.shuffle.ShuffleMemoryManager
2931
import org.apache.spark.sql.catalyst.InternalRow
3032
import org.apache.spark.sql.catalyst.expressions._
3133
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
3234
import org.apache.spark.sql.hbase.util.DataTypeUtils
3335
import org.apache.spark.sql.types._
3436
import org.apache.spark.sql.{Row, SQLContext}
35-
import org.apache.spark.unsafe.memory.TaskMemoryManager
37+
import org.apache.spark.unsafe.memory.{MemoryAllocator, ExecutorMemoryManager, TaskMemoryManager}
3638
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
3739

3840
/**
@@ -144,6 +146,12 @@ class SparkSqlRegionObserver extends BaseRegionObserver {
144146
super.postScannerOpen(e, scan, s)
145147
} else {
146148
logger.debug("Work with coprocessor")
149+
if (SparkEnv.get == null) {
150+
val sparkConf = new SparkConf(true).set("spark.driver.host", "127.0.0.1").set("spark.driver.port", "0")
151+
val newSparkEnv = SparkEnv.createDriverEnv(sparkConf, false, new LiveListenerBus)
152+
SparkEnv.set(newSparkEnv)
153+
}
154+
147155
val partitionIndex: Int = Bytes.toInt(serializedPartitionIndex)
148156
val serializedOutputDataType = scan.getAttribute(CoprocessorConstants.COTYPE)
149157
val outputDataType: Seq[DataType] =

0 commit comments

Comments
 (0)