@@ -26,13 +26,15 @@ import org.apache.log4j.Logger
26
26
import org .apache .spark ._
27
27
import org .apache .spark .executor .TaskMetrics
28
28
import org .apache .spark .rdd .RDD
29
+ import org .apache .spark .scheduler .LiveListenerBus
30
+ import org .apache .spark .shuffle .ShuffleMemoryManager
29
31
import org .apache .spark .sql .catalyst .InternalRow
30
32
import org .apache .spark .sql .catalyst .expressions ._
31
33
import org .apache .spark .sql .catalyst .expressions .codegen .GeneratePredicate
32
34
import org .apache .spark .sql .hbase .util .DataTypeUtils
33
35
import org .apache .spark .sql .types ._
34
36
import org .apache .spark .sql .{Row , SQLContext }
35
- import org .apache .spark .unsafe .memory .TaskMemoryManager
37
+ import org .apache .spark .unsafe .memory .{ MemoryAllocator , ExecutorMemoryManager , TaskMemoryManager }
36
38
import org .apache .spark .util .collection .unsafe .sort .UnsafeExternalSorter
37
39
38
40
/**
@@ -144,6 +146,12 @@ class SparkSqlRegionObserver extends BaseRegionObserver {
144
146
super .postScannerOpen(e, scan, s)
145
147
} else {
146
148
logger.debug(" Work with coprocessor" )
149
+ if (SparkEnv .get == null ) {
150
+ val sparkConf = new SparkConf (true ).set(" spark.driver.host" , " 127.0.0.1" ).set(" spark.driver.port" , " 0" )
151
+ val newSparkEnv = SparkEnv .createDriverEnv(sparkConf, false , new LiveListenerBus )
152
+ SparkEnv .set(newSparkEnv)
153
+ }
154
+
147
155
val partitionIndex : Int = Bytes .toInt(serializedPartitionIndex)
148
156
val serializedOutputDataType = scan.getAttribute(CoprocessorConstants .COTYPE )
149
157
val outputDataType : Seq [DataType ] =
0 commit comments