Skip to content

Commit 471d481

Browse files
committed
Update SparkSqlRegionObserver.scala
1 parent 48886fc commit 471d481

File tree

1 file changed

+0
-192
lines changed

1 file changed

+0
-192
lines changed
Lines changed: 0 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -1,193 +1 @@
1-
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
8-
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10-
*
11-
* Unless required by applicable law or agreed to in writing, software
12-
* distributed under the License is distributed on an "AS IS" BASIS,
13-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
* See the License for the specific language governing permissions and
15-
* limitations under the License.
16-
*/
171

18-
package org.apache.spark.sql.hbase
19-
20-
import org.apache.hadoop.hbase._
21-
import org.apache.hadoop.hbase.client._
22-
import org.apache.hadoop.hbase.coprocessor._
23-
import org.apache.hadoop.hbase.regionserver._
24-
import org.apache.hadoop.hbase.util.Bytes
25-
import org.apache.log4j.Logger
26-
import org.apache.spark._
27-
import org.apache.spark.executor.TaskMetrics
28-
import org.apache.spark.rdd.RDD
29-
import org.apache.spark.sql.catalyst.expressions._
30-
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
31-
import org.apache.spark.sql.hbase.util.DataTypeUtils
32-
import org.apache.spark.sql.types._
33-
import org.apache.spark.sql.{Row, SQLContext}
34-
35-
/**
36-
* HBaseCoprocessorSQLReaderRDD:
37-
*/
38-
class HBaseCoprocessorSQLReaderRDD(var relation: HBaseRelation,
39-
val codegenEnabled: Boolean,
40-
var finalOutput: Seq[Attribute],
41-
var otherFilters: Option[Expression],
42-
@transient sqlContext: SQLContext)
43-
extends RDD[Row](sqlContext.sparkContext, Nil) with Logging {
44-
45-
@transient var scanner: RegionScanner = _
46-
47-
private def createIterator(context: TaskContext): Iterator[Row] = {
48-
val otherFilter: (Row) => Boolean = {
49-
if (otherFilters.isDefined) {
50-
if (codegenEnabled) {
51-
GeneratePredicate.generate(otherFilters.get, finalOutput)
52-
} else {
53-
InterpretedPredicate.create(otherFilters.get, finalOutput)
54-
}
55-
} else null
56-
}
57-
58-
val projections = finalOutput.zipWithIndex
59-
var finished: Boolean = false
60-
var gotNext: Boolean = false
61-
val results: java.util.ArrayList[Cell] = new java.util.ArrayList[Cell]()
62-
val row = new GenericMutableRow(finalOutput.size)
63-
64-
val iterator = new Iterator[Row] {
65-
override def hasNext: Boolean = {
66-
if (!finished) {
67-
if (!gotNext) {
68-
results.clear()
69-
scanner.nextRaw(results)
70-
finished = results.isEmpty
71-
gotNext = true
72-
}
73-
}
74-
if (finished) {
75-
close()
76-
}
77-
!finished
78-
}
79-
80-
override def next(): Row = {
81-
if (hasNext) {
82-
gotNext = false
83-
relation.buildRowInCoprocessor(projections, results, row)
84-
} else {
85-
null
86-
}
87-
}
88-
89-
def close() = {
90-
try {
91-
scanner.close()
92-
relation.closeHTable()
93-
} catch {
94-
case e: Exception => logWarning("Exception in scanner.close", e)
95-
}
96-
}
97-
}
98-
99-
if (otherFilter == null) {
100-
new InterruptibleIterator(context, iterator)
101-
} else {
102-
new InterruptibleIterator(context, iterator.filter(otherFilter))
103-
}
104-
}
105-
106-
override def getPartitions: Array[Partition] = {
107-
Array()
108-
}
109-
110-
override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
111-
scanner = split.asInstanceOf[HBasePartition].newScanner
112-
createIterator(context)
113-
}
114-
}
115-
116-
abstract class BaseRegionScanner extends RegionScanner {
117-
override def getBatch={0}//Achieve this function inherited from RegionScanner
118-
119-
override def isFilterDone = false
120-
121-
override def next(result: java.util.List[Cell], scannerContext: ScannerContext)= next(result)// limit: Int=>scannerContext: ScannerContext
122-
123-
override def reseek(row: Array[Byte]) = throw new DoNotRetryIOException("Unsupported")
124-
125-
override def getMvccReadPoint = Long.MaxValue
126-
127-
override def nextRaw(result: java.util.List[Cell]) = next(result)
128-
129-
override def nextRaw(result: java.util.List[Cell], scannerContext: ScannerContext) = next(result, scannerContext) //limit: Int=>scannerContext: ScannerContext
130-
}
131-
132-
class SparkSqlRegionObserver extends BaseRegionObserver {
133-
lazy val logger = Logger.getLogger(getClass.getName)
134-
lazy val EmptyArray = Array[Byte]()
135-
136-
override def postScannerOpen(e: ObserverContext[RegionCoprocessorEnvironment],
137-
scan: Scan,
138-
s: RegionScanner) = {
139-
val serializedPartitionIndex = scan.getAttribute(CoprocessorConstants.COINDEX)
140-
if (serializedPartitionIndex == null) {
141-
logger.debug("Work without coprocessor")
142-
super.postScannerOpen(e, scan, s)
143-
} else {
144-
logger.debug("Work with coprocessor")
145-
val partitionIndex: Int = Bytes.toInt(serializedPartitionIndex)
146-
val serializedOutputDataType = scan.getAttribute(CoprocessorConstants.COTYPE)
147-
val outputDataType: Seq[DataType] =
148-
HBaseSerializer.deserialize(serializedOutputDataType).asInstanceOf[Seq[DataType]]
149-
150-
val serializedRDD = scan.getAttribute(CoprocessorConstants.COKEY)
151-
val subPlanRDD: RDD[Row] = HBaseSerializer.deserialize(serializedRDD).asInstanceOf[RDD[Row]]
152-
153-
val taskParaInfo = scan.getAttribute(CoprocessorConstants.COTASK)
154-
val (stageId, partitionId, taskAttemptId, attemptNumber) =
155-
HBaseSerializer.deserialize(taskParaInfo).asInstanceOf[(Int, Int, Long, Int)]
156-
val taskContext = new TaskContextImpl(
157-
stageId, partitionId, taskAttemptId, attemptNumber, null, false, new TaskMetrics)
158-
159-
val regionInfo = s.getRegionInfo
160-
val startKey = if (regionInfo.getStartKey.isEmpty) None else Some(regionInfo.getStartKey)
161-
val endKey = if (regionInfo.getEndKey.isEmpty) None else Some(regionInfo.getEndKey)
162-
163-
val result = subPlanRDD.compute(
164-
new HBasePartition(partitionIndex, partitionIndex, startKey, endKey, newScanner = s),
165-
taskContext)
166-
167-
new BaseRegionScanner() {
168-
override def getRegionInfo: HRegionInfo = regionInfo
169-
170-
override def getMaxResultSize: Long = s.getMaxResultSize
171-
172-
override def close(): Unit = s.close()
173-
174-
override def next(results: java.util.List[Cell]): Boolean = {
175-
val hasMore: Boolean = result.hasNext
176-
if (hasMore) {
177-
val nextRow = result.next()
178-
val numOfCells = outputDataType.length
179-
for (i <- 0 until numOfCells) {
180-
val data = nextRow(i)
181-
val dataType = outputDataType(i)
182-
val dataOfBytes: HBaseRawType = {
183-
if (data == null) null else DataTypeUtils.dataToBytes(data, dataType)
184-
}
185-
results.add(new KeyValue(EmptyArray, EmptyArray, EmptyArray, dataOfBytes))
186-
}
187-
}
188-
hasMore
189-
}
190-
}
191-
}
192-
}
193-
}

0 commit comments

Comments
 (0)