Skip to content

Commit e541f70

Browse files
lianchengyhuai
authored andcommitted
[SPARK-12012][SQL][BRANCH-1.6] Show more comprehensive PhysicalRDD metadata when visualizing SQL query plan
This PR backports PR apache#10004 to branch-1.6 It adds a private[sql] method metadata to SparkPlan, which can be used to describe detail information about a physical plan during visualization. Specifically, this PR uses this method to provide details of PhysicalRDDs translated from a data source relation. Author: Cheng Lian <lian@databricks.com> Closes apache#10250 from liancheng/spark-12012.for-1.6.
1 parent 93ef246 commit e541f70

File tree

10 files changed

+83
-31
lines changed

10 files changed

+83
-31
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ def explain(self, extended=False):
213213
214214
>>> df.explain()
215215
== Physical Plan ==
216-
Scan PhysicalRDD[age#0,name#1]
216+
Scan ExistingRDD[age#0,name#1]
217217
218218
>>> df.explain(True)
219219
== Parsed Logical Plan ==

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,22 +97,31 @@ private[sql] case class LogicalRDD(
9797
private[sql] case class PhysicalRDD(
9898
output: Seq[Attribute],
9999
rdd: RDD[InternalRow],
100-
extraInformation: String,
100+
override val nodeName: String,
101+
override val metadata: Map[String, String] = Map.empty,
101102
override val outputsUnsafeRows: Boolean = false)
102103
extends LeafNode {
103104

104105
protected override def doExecute(): RDD[InternalRow] = rdd
105106

106-
override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
107+
override def simpleString: String = {
108+
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
109+
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
110+
}
107111
}
108112

109113
private[sql] object PhysicalRDD {
114+
// Metadata keys
115+
val INPUT_PATHS = "InputPaths"
116+
val PUSHED_FILTERS = "PushedFilters"
117+
110118
def createFromDataSource(
111119
output: Seq[Attribute],
112120
rdd: RDD[InternalRow],
113121
relation: BaseRelation,
114-
extraInformation: String = ""): PhysicalRDD = {
115-
PhysicalRDD(output, rdd, relation.toString + extraInformation,
116-
relation.isInstanceOf[HadoopFsRelation])
122+
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
123+
// All HadoopFsRelations output UnsafeRows
124+
val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
125+
PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
117126
}
118127
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
6767
super.makeCopy(newArgs)
6868
}
6969

70+
/**
71+
* Return all metadata that describes more details of this SparkPlan.
72+
*/
73+
private[sql] def metadata: Map[String, String] = Map.empty
74+
7075
/**
7176
* Return all metrics containing metrics of this SparkPlan.
7277
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
363363
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
364364
case e @ EvaluatePython(udf, child, _) =>
365365
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
366-
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil
366+
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil
367367
case BroadcastHint(child) => apply(child)
368368
case _ => Nil
369369
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20+
import scala.collection.mutable.ArrayBuffer
21+
2022
import org.apache.spark.deploy.SparkHadoopUtil
2123
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
2224
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
@@ -25,6 +27,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2527
import org.apache.spark.sql.catalyst.plans.logical
2628
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2729
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
30+
import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
2831
import org.apache.spark.sql.execution.SparkPlan
2932
import org.apache.spark.sql.sources._
3033
import org.apache.spark.sql.types.{StringType, StructType}
@@ -315,7 +318,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
315318
// `Filter`s or cannot be handled by `relation`.
316319
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
317320

318-
val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ")
321+
val metadata: Map[String, String] = {
322+
val pairs = ArrayBuffer.empty[(String, String)]
323+
324+
if (pushedFilters.nonEmpty) {
325+
pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
326+
}
327+
328+
relation.relation match {
329+
case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.paths.mkString(", ")
330+
case _ =>
331+
}
332+
333+
pairs.toMap
334+
}
319335

320336
if (projects.map(_.toAttribute) == projects &&
321337
projectSet.size == projects.size &&
@@ -334,7 +350,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
334350
val scan = execution.PhysicalRDD.createFromDataSource(
335351
projects.map(_.toAttribute),
336352
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
337-
relation.relation, pushedFiltersString)
353+
relation.relation, metadata)
338354
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
339355
} else {
340356
// Don't request columns that are only referenced by pushed filters.
@@ -344,7 +360,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
344360
val scan = execution.PhysicalRDD.createFromDataSource(
345361
requestedColumns,
346362
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
347-
relation.relation, pushedFiltersString)
363+
relation.relation, metadata)
348364
execution.Project(
349365
projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
350366
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ private[sql] class ParquetRelation(
146146
meta
147147
}
148148

149+
override def toString: String = {
150+
parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map { tableName =>
151+
s"${getClass.getSimpleName}: $tableName"
152+
}.getOrElse(super.toString)
153+
}
154+
149155
override def equals(other: Any): Boolean = other match {
150156
case that: ParquetRelation =>
151157
val schemaEquality = if (shouldMergeSchemas) {
@@ -521,6 +527,10 @@ private[sql] object ParquetRelation extends Logging {
521527
// internally.
522528
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
523529

530+
// If a ParquetRelation is converted from a Hive metastore table, this option is set to the
531+
// original Hive table name.
532+
private[sql] val METASTORE_TABLE_NAME = "metastoreTableName"
533+
524534
/**
525535
* If parquet's block size (row group size) setting is larger than the min split size,
526536
* we use parquet's block size setting as the min split size. Otherwise, we will create

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ private[sql] object SparkPlanGraph {
6666
metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
6767
}
6868
val node = SparkPlanGraphNode(
69-
nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics)
69+
nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, plan.metadata, metrics)
7070
nodes += node
7171
val childrenNodes = plan.children.map(
7272
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
@@ -85,26 +85,33 @@ private[sql] object SparkPlanGraph {
8585
* @param metrics metrics that this SparkPlan node will track
8686
*/
8787
private[ui] case class SparkPlanGraphNode(
88-
id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) {
88+
id: Long,
89+
name: String,
90+
desc: String,
91+
metadata: Map[String, String],
92+
metrics: Seq[SQLPlanMetric]) {
8993

9094
def makeDotNode(metricsValue: Map[Long, String]): String = {
91-
val values = {
92-
for (metric <- metrics;
93-
value <- metricsValue.get(metric.accumulatorId)) yield {
94-
metric.name + ": " + value
95-
}
95+
val builder = new mutable.StringBuilder(name)
96+
97+
val values = for {
98+
metric <- metrics
99+
value <- metricsValue.get(metric.accumulatorId)
100+
} yield {
101+
metric.name + ": " + value
102+
}
103+
104+
if (values.nonEmpty) {
105+
// If there are metrics, display each entry in a separate line. We should use an escaped
106+
// "\n" here to follow the dot syntax.
107+
//
108+
// Note: whitespace between two "\n"s is to create an empty line between the name of
109+
// SparkPlan and metrics. If removing it, it won't display the empty line in UI.
110+
builder ++= "\\n \\n"
111+
builder ++= values.mkString("\\n")
96112
}
97-
val label = if (values.isEmpty) {
98-
name
99-
} else {
100-
// If there are metrics, display all metrics in a separate line. We should use an escaped
101-
// "\n" here to follow the dot syntax.
102-
//
103-
// Note: whitespace between two "\n"s is to create an empty line between the name of
104-
// SparkPlan and metrics. If removing it, it won't display the empty line in UI.
105-
name + "\\n \\n" + values.mkString("\\n")
106-
}
107-
s""" $id [label="$label"];"""
113+
114+
s""" $id [label="${builder.toString()}"];"""
108115
}
109116
}
110117

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ abstract class HadoopFsRelation private[sql](
422422
parameters: Map[String, String])
423423
extends BaseRelation with FileRelation with Logging {
424424

425-
override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
425+
override def toString: String = getClass.getSimpleName
426426

427427
def this() = this(None, Map.empty[String, String])
428428

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class PlannerSuite extends SharedSQLContext {
169169

170170
withTempTable("testPushed") {
171171
val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
172-
assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
172+
assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
173173
}
174174
}
175175
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,12 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
411411
// evil case insensitivity issue, which is reconciled within `ParquetRelation`.
412412
val parquetOptions = Map(
413413
ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
414-
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
414+
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
415+
ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
416+
metastoreRelation.tableName,
417+
Some(metastoreRelation.databaseName)
418+
).unquotedString
419+
)
415420
val tableIdentifier =
416421
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
417422

0 commit comments

Comments
 (0)