Skip to content

Commit 44ee920

Browse files
committed
Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow"
This reverts commit 0da7bd5.
1 parent 0da7bd5 commit 44ee920

34 files changed

+574
-74
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,8 @@ class SQLContext private[sql](
904904
@transient
905905
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
906906
val batches = Seq(
907-
Batch("Add exchange", Once, EnsureRequirements(self))
907+
Batch("Add exchange", Once, EnsureRequirements(self)),
908+
Batch("Add row converters", Once, EnsureRowFormats)
908909
)
909910
}
910911

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SQLContext
2828
import org.apache.spark.sql.catalyst.InternalRow
2929
import org.apache.spark.sql.catalyst.errors.attachTree
3030
import org.apache.spark.sql.catalyst.expressions._
31+
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
3132
import org.apache.spark.sql.catalyst.plans.physical._
3233
import org.apache.spark.sql.catalyst.rules.Rule
3334
import org.apache.spark.util.MutablePair
@@ -49,14 +50,26 @@ case class Exchange(
4950
case None => ""
5051
}
5152

52-
val simpleNodeName = "Exchange"
53+
val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
5354
s"$simpleNodeName$extraInfo"
5455
}
5556

57+
/**
58+
* Returns true iff we can support the data type, and we are not doing range partitioning.
59+
*/
60+
private lazy val tungstenMode: Boolean = !newPartitioning.isInstanceOf[RangePartitioning]
61+
5662
override def outputPartitioning: Partitioning = newPartitioning
5763

5864
override def output: Seq[Attribute] = child.output
5965

66+
// This setting is somewhat counterintuitive:
67+
// If the schema works with UnsafeRow, then we tell the planner that we don't support safe row,
68+
// so the planner inserts a converter to convert data into UnsafeRow if needed.
69+
override def outputsUnsafeRows: Boolean = tungstenMode
70+
override def canProcessSafeRows: Boolean = !tungstenMode
71+
override def canProcessUnsafeRows: Boolean = tungstenMode
72+
6073
/**
6174
* Determines whether records must be defensively copied before being sent to the shuffle.
6275
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
@@ -117,7 +130,15 @@ case class Exchange(
117130
}
118131
}
119132

120-
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
133+
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
134+
135+
private val serializer: Serializer = {
136+
if (tungstenMode) {
137+
new UnsafeRowSerializer(child.output.size)
138+
} else {
139+
new SparkSqlSerializer(sparkConf)
140+
}
141+
}
121142

122143
override protected def doPrepare(): Unit = {
123144
// If an ExchangeCoordinator is needed, we register this Exchange operator

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2222
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
23-
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, AttributeSet, GenericMutableRow}
23+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow}
2424
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
2525
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
2626
import org.apache.spark.sql.types.DataType
@@ -99,19 +99,10 @@ private[sql] case class PhysicalRDD(
9999
rdd: RDD[InternalRow],
100100
override val nodeName: String,
101101
override val metadata: Map[String, String] = Map.empty,
102-
isUnsafeRow: Boolean = false)
102+
override val outputsUnsafeRows: Boolean = false)
103103
extends LeafNode {
104104

105-
protected override def doExecute(): RDD[InternalRow] = {
106-
if (isUnsafeRow) {
107-
rdd
108-
} else {
109-
rdd.mapPartitionsInternal { iter =>
110-
val proj = UnsafeProjection.create(schema)
111-
iter.map(proj)
112-
}
113-
}
114-
}
105+
protected override def doExecute(): RDD[InternalRow] = rdd
115106

116107
override def simpleString: String = {
117108
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"

sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,20 @@ case class Expand(
4141
// as UNKNOWN partitioning
4242
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
4343

44+
override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
45+
override def canProcessUnsafeRows: Boolean = true
46+
override def canProcessSafeRows: Boolean = true
47+
4448
override def references: AttributeSet =
4549
AttributeSet(projections.flatten.flatMap(_.references))
4650

47-
private[this] val projection =
48-
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
51+
private[this] val projection = {
52+
if (outputsUnsafeRows) {
53+
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
54+
} else {
55+
(exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
56+
}
57+
}
4958

5059
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
5160
child.execute().mapPartitions { iter =>

sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ case class Generate(
6464
child.execute().mapPartitionsInternal { iter =>
6565
val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
6666
val joinedRow = new JoinedRow
67-
val proj = UnsafeProjection.create(output, output)
6867

6968
iter.flatMap { row =>
7069
// we should always set the left (child output)
@@ -78,14 +77,13 @@ case class Generate(
7877
} ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
7978
// we leave the left side as the last element of its child output
8079
// keep it the same as Hive does
81-
proj(joinedRow.withRight(row))
80+
joinedRow.withRight(row)
8281
}
8382
}
8483
} else {
8584
child.execute().mapPartitionsInternal { iter =>
86-
val proj = UnsafeProjection.create(output, output)
87-
(iter.flatMap(row => boundGenerator.eval(row)) ++
88-
LazyIterator(() => boundGenerator.terminate())).map(proj)
85+
iter.flatMap(row => boundGenerator.eval(row)) ++
86+
LazyIterator(() => boundGenerator.terminate())
8987
}
9088
}
9189
}

sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.catalyst.InternalRow
22-
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
2323

2424

2525
/**
@@ -29,20 +29,15 @@ private[sql] case class LocalTableScan(
2929
output: Seq[Attribute],
3030
rows: Seq[InternalRow]) extends LeafNode {
3131

32-
private val unsafeRows: Array[InternalRow] = {
33-
val proj = UnsafeProjection.create(output, output)
34-
rows.map(r => proj(r).copy()).toArray
35-
}
36-
37-
private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
32+
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
3833

3934
protected override def doExecute(): RDD[InternalRow] = rdd
4035

4136
override def executeCollect(): Array[InternalRow] = {
42-
unsafeRows
37+
rows.toArray
4338
}
4439

4540
override def executeTake(limit: Int): Array[InternalRow] = {
46-
unsafeRows.take(limit)
41+
rows.take(limit).toArray
4742
}
4843
}

sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ case class Sort(
3939
testSpillFrequency: Int = 0)
4040
extends UnaryNode {
4141

42+
override def outputsUnsafeRows: Boolean = true
43+
override def canProcessUnsafeRows: Boolean = true
44+
override def canProcessSafeRows: Boolean = false
45+
4246
override def output: Seq[Attribute] = child.output
4347

4448
override def outputOrdering: Seq[SortOrder] = sortOrder

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,36 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
9797
/** Specifies sort order for each partition requirements on the input data for this operator. */
9898
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
9999

100+
/** Specifies whether this operator outputs UnsafeRows */
101+
def outputsUnsafeRows: Boolean = false
102+
103+
/** Specifies whether this operator is capable of processing UnsafeRows */
104+
def canProcessUnsafeRows: Boolean = false
105+
106+
/**
107+
* Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
108+
* that are not UnsafeRows).
109+
*/
110+
def canProcessSafeRows: Boolean = true
100111

101112
/**
102113
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
103114
* after adding query plan information to created RDDs for visualization.
104115
* Concrete implementations of SparkPlan should override doExecute instead.
105116
*/
106117
final def execute(): RDD[InternalRow] = {
118+
if (children.nonEmpty) {
119+
val hasUnsafeInputs = children.exists(_.outputsUnsafeRows)
120+
val hasSafeInputs = children.exists(!_.outputsUnsafeRows)
121+
assert(!(hasSafeInputs && hasUnsafeInputs),
122+
"Child operators should output rows in the same format")
123+
assert(canProcessSafeRows || canProcessUnsafeRows,
124+
"Operator must be able to process at least one row format")
125+
assert(!hasSafeInputs || canProcessSafeRows,
126+
"Operator will receive safe rows as input but cannot process safe rows")
127+
assert(!hasUnsafeInputs || canProcessUnsafeRows,
128+
"Operator will receive unsafe rows as input but cannot process unsafe rows")
129+
}
107130
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
108131
prepare()
109132
doExecute()

sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ case class Window(
100100

101101
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
102102

103+
override def canProcessUnsafeRows: Boolean = true
104+
103105
/**
104106
* Create a bound ordering object for a given frame type and offset. A bound ordering object is
105107
* used to determine which input row lies within the frame boundaries of an output row.
@@ -257,16 +259,16 @@ case class Window(
257259
* @return the final resulting projection.
258260
*/
259261
private[this] def createResultProjection(
260-
expressions: Seq[Expression]): UnsafeProjection = {
262+
expressions: Seq[Expression]): MutableProjection = {
261263
val references = expressions.zipWithIndex.map{ case (e, i) =>
262264
// Results of window expressions will be on the right side of child's output
263265
BoundReference(child.output.size + i, e.dataType, e.nullable)
264266
}
265267
val unboundToRefMap = expressions.zip(references).toMap
266268
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
267-
UnsafeProjection.create(
269+
newMutableProjection(
268270
projectList ++ patchedWindowExpression,
269-
child.output)
271+
child.output)()
270272
}
271273

272274
protected override def doExecute(): RDD[InternalRow] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ case class SortBasedAggregate(
4949
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
5050
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
5151

52+
override def outputsUnsafeRows: Boolean = true
53+
override def canProcessUnsafeRows: Boolean = false
54+
override def canProcessSafeRows: Boolean = true
55+
5256
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
5357

5458
override def requiredChildDistribution: List[Distribution] = {

0 commit comments

Comments
 (0)