|
1 |
| -/* |
2 |
| -* Licensed to the Apache Software Foundation (ASF) under one or more |
3 |
| -* contributor license agreements. See the NOTICE file distributed with |
4 |
| -* this work for additional information regarding copyright ownership. |
5 |
| -* The ASF licenses this file to You under the Apache License, Version 2.0 |
6 |
| -* (the "License"); you may not use this file except in compliance with |
7 |
| -* the License. You may obtain a copy of the License at |
8 |
| -* |
9 |
| -* http://www.apache.org/licenses/LICENSE-2.0 |
10 |
| -* |
11 |
| -* Unless required by applicable law or agreed to in writing, software |
12 |
| -* distributed under the License is distributed on an "AS IS" BASIS, |
13 |
| -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 |
| -* See the License for the specific language governing permissions and |
15 |
| -* limitations under the License. |
16 |
| -*/ |
17 | 1 |
|
18 |
| -package org.apache.spark.sql.hbase |
19 |
| - |
20 |
| -import org.apache.hadoop.hbase._ |
21 |
| -import org.apache.hadoop.hbase.client._ |
22 |
| -import org.apache.hadoop.hbase.coprocessor._ |
23 |
| -import org.apache.hadoop.hbase.regionserver._ |
24 |
| -import org.apache.hadoop.hbase.util.Bytes |
25 |
| -import org.apache.log4j.Logger |
26 |
| -import org.apache.spark._ |
27 |
| -import org.apache.spark.executor.TaskMetrics |
28 |
| -import org.apache.spark.rdd.RDD |
29 |
| -import org.apache.spark.sql.catalyst.expressions._ |
30 |
| -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate |
31 |
| -import org.apache.spark.sql.hbase.util.DataTypeUtils |
32 |
| -import org.apache.spark.sql.types._ |
33 |
| -import org.apache.spark.sql.{Row, SQLContext} |
34 |
| - |
35 |
| -/** |
36 |
| - * HBaseCoprocessorSQLReaderRDD: |
37 |
| - */ |
38 |
| -class HBaseCoprocessorSQLReaderRDD(var relation: HBaseRelation, |
39 |
| - val codegenEnabled: Boolean, |
40 |
| - var finalOutput: Seq[Attribute], |
41 |
| - var otherFilters: Option[Expression], |
42 |
| - @transient sqlContext: SQLContext) |
43 |
| - extends RDD[Row](sqlContext.sparkContext, Nil) with Logging { |
44 |
| - |
45 |
| - @transient var scanner: RegionScanner = _ |
46 |
| - |
47 |
| - private def createIterator(context: TaskContext): Iterator[Row] = { |
48 |
| - val otherFilter: (Row) => Boolean = { |
49 |
| - if (otherFilters.isDefined) { |
50 |
| - if (codegenEnabled) { |
51 |
| - GeneratePredicate.generate(otherFilters.get, finalOutput) |
52 |
| - } else { |
53 |
| - InterpretedPredicate.create(otherFilters.get, finalOutput) |
54 |
| - } |
55 |
| - } else null |
56 |
| - } |
57 |
| - |
58 |
| - val projections = finalOutput.zipWithIndex |
59 |
| - var finished: Boolean = false |
60 |
| - var gotNext: Boolean = false |
61 |
| - val results: java.util.ArrayList[Cell] = new java.util.ArrayList[Cell]() |
62 |
| - val row = new GenericMutableRow(finalOutput.size) |
63 |
| - |
64 |
| - val iterator = new Iterator[Row] { |
65 |
| - override def hasNext: Boolean = { |
66 |
| - if (!finished) { |
67 |
| - if (!gotNext) { |
68 |
| - results.clear() |
69 |
| - scanner.nextRaw(results) |
70 |
| - finished = results.isEmpty |
71 |
| - gotNext = true |
72 |
| - } |
73 |
| - } |
74 |
| - if (finished) { |
75 |
| - close() |
76 |
| - } |
77 |
| - !finished |
78 |
| - } |
79 |
| - |
80 |
| - override def next(): Row = { |
81 |
| - if (hasNext) { |
82 |
| - gotNext = false |
83 |
| - relation.buildRowInCoprocessor(projections, results, row) |
84 |
| - } else { |
85 |
| - null |
86 |
| - } |
87 |
| - } |
88 |
| - |
89 |
| - def close() = { |
90 |
| - try { |
91 |
| - scanner.close() |
92 |
| - relation.closeHTable() |
93 |
| - } catch { |
94 |
| - case e: Exception => logWarning("Exception in scanner.close", e) |
95 |
| - } |
96 |
| - } |
97 |
| - } |
98 |
| - |
99 |
| - if (otherFilter == null) { |
100 |
| - new InterruptibleIterator(context, iterator) |
101 |
| - } else { |
102 |
| - new InterruptibleIterator(context, iterator.filter(otherFilter)) |
103 |
| - } |
104 |
| - } |
105 |
| - |
106 |
| - override def getPartitions: Array[Partition] = { |
107 |
| - Array() |
108 |
| - } |
109 |
| - |
110 |
| - override def compute(split: Partition, context: TaskContext): Iterator[Row] = { |
111 |
| - scanner = split.asInstanceOf[HBasePartition].newScanner |
112 |
| - createIterator(context) |
113 |
| - } |
114 |
| -} |
115 |
| - |
116 |
| -abstract class BaseRegionScanner extends RegionScanner { |
117 |
| - override def getBatch={0}//Achieve this function inherited from RegionScanner |
118 |
| - |
119 |
| - override def isFilterDone = false |
120 |
| - |
121 |
| - override def next(result: java.util.List[Cell], scannerContext: ScannerContext)= next(result)// limit: Int=>scannerContext: ScannerContext |
122 |
| - |
123 |
| - override def reseek(row: Array[Byte]) = throw new DoNotRetryIOException("Unsupported") |
124 |
| - |
125 |
| - override def getMvccReadPoint = Long.MaxValue |
126 |
| - |
127 |
| - override def nextRaw(result: java.util.List[Cell]) = next(result) |
128 |
| - |
129 |
| - override def nextRaw(result: java.util.List[Cell], scannerContext: ScannerContext) = next(result, scannerContext) //limit: Int=>scannerContext: ScannerContext |
130 |
| -} |
131 |
| - |
132 |
| -class SparkSqlRegionObserver extends BaseRegionObserver { |
133 |
| - lazy val logger = Logger.getLogger(getClass.getName) |
134 |
| - lazy val EmptyArray = Array[Byte]() |
135 |
| - |
136 |
| - override def postScannerOpen(e: ObserverContext[RegionCoprocessorEnvironment], |
137 |
| - scan: Scan, |
138 |
| - s: RegionScanner) = { |
139 |
| - val serializedPartitionIndex = scan.getAttribute(CoprocessorConstants.COINDEX) |
140 |
| - if (serializedPartitionIndex == null) { |
141 |
| - logger.debug("Work without coprocessor") |
142 |
| - super.postScannerOpen(e, scan, s) |
143 |
| - } else { |
144 |
| - logger.debug("Work with coprocessor") |
145 |
| - val partitionIndex: Int = Bytes.toInt(serializedPartitionIndex) |
146 |
| - val serializedOutputDataType = scan.getAttribute(CoprocessorConstants.COTYPE) |
147 |
| - val outputDataType: Seq[DataType] = |
148 |
| - HBaseSerializer.deserialize(serializedOutputDataType).asInstanceOf[Seq[DataType]] |
149 |
| - |
150 |
| - val serializedRDD = scan.getAttribute(CoprocessorConstants.COKEY) |
151 |
| - val subPlanRDD: RDD[Row] = HBaseSerializer.deserialize(serializedRDD).asInstanceOf[RDD[Row]] |
152 |
| - |
153 |
| - val taskParaInfo = scan.getAttribute(CoprocessorConstants.COTASK) |
154 |
| - val (stageId, partitionId, taskAttemptId, attemptNumber) = |
155 |
| - HBaseSerializer.deserialize(taskParaInfo).asInstanceOf[(Int, Int, Long, Int)] |
156 |
| - val taskContext = new TaskContextImpl( |
157 |
| - stageId, partitionId, taskAttemptId, attemptNumber, null, false, new TaskMetrics) |
158 |
| - |
159 |
| - val regionInfo = s.getRegionInfo |
160 |
| - val startKey = if (regionInfo.getStartKey.isEmpty) None else Some(regionInfo.getStartKey) |
161 |
| - val endKey = if (regionInfo.getEndKey.isEmpty) None else Some(regionInfo.getEndKey) |
162 |
| - |
163 |
| - val result = subPlanRDD.compute( |
164 |
| - new HBasePartition(partitionIndex, partitionIndex, startKey, endKey, newScanner = s), |
165 |
| - taskContext) |
166 |
| - |
167 |
| - new BaseRegionScanner() { |
168 |
| - override def getRegionInfo: HRegionInfo = regionInfo |
169 |
| - |
170 |
| - override def getMaxResultSize: Long = s.getMaxResultSize |
171 |
| - |
172 |
| - override def close(): Unit = s.close() |
173 |
| - |
174 |
| - override def next(results: java.util.List[Cell]): Boolean = { |
175 |
| - val hasMore: Boolean = result.hasNext |
176 |
| - if (hasMore) { |
177 |
| - val nextRow = result.next() |
178 |
| - val numOfCells = outputDataType.length |
179 |
| - for (i <- 0 until numOfCells) { |
180 |
| - val data = nextRow(i) |
181 |
| - val dataType = outputDataType(i) |
182 |
| - val dataOfBytes: HBaseRawType = { |
183 |
| - if (data == null) null else DataTypeUtils.dataToBytes(data, dataType) |
184 |
| - } |
185 |
| - results.add(new KeyValue(EmptyArray, EmptyArray, EmptyArray, dataOfBytes)) |
186 |
| - } |
187 |
| - } |
188 |
| - hasMore |
189 |
| - } |
190 |
| - } |
191 |
| - } |
192 |
| - } |
193 |
| -} |
0 commit comments