Deep Dive:
Query Execution of Spark SQL
Maryann Xue, Xingbo Jiang, Kris Mok
Apr. 2019
1
About Us
Software Engineers
• Maryann Xue
PMC of Apache Calcite & Apache Phoenix @maryannxue
• Xingbo Jiang
Apache Spark Committer @jiangxb1987
• Kris Mok
OpenJDK Committer @rednaxelafx
2
Databricks Unified Analytics Platform
DATABRICKS WORKSPACE
Notebooks
Jobs
Models
APIs
Dashboards End to end ML lifecycle
DATABRICKS RUNTIME
Databricks Delta ML Frameworks
Reliable & Scalable Simple & Integrated
DATABRICKS CLOUD SERVICE
Databricks Customers Across Industries
Financial Services Healthcare & Pharma Media & Entertainment Data & Analytics Services Technology
Public Sector Retail & CPG Consumer Services Marketing & AdTech Energy & Industrial IoT
Apache Spark 3.x
Spark Spark 3rd-party
Spark ML
Streaming Graph Libraries
SQL
SparkSession / DataFrame / Dataset APIs
Catalyst Optimization & Tungsten Execution
Data Source Connectors Spark Core
5
Apache Spark 3.x
Spark Spark 3rd-party
Spark ML
Streaming Graph Libraries
SQL
SparkSession / DataFrame / Dataset APIs
Catalyst Optimization & Tungsten Execution
Data Source Connectors Spark Core
6
Spark SQL Engine
Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution
Runtime
7
Spark SQL Engine - Front End
Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution
Runtime
Reference: A Deep Dive into Spark SQL’s Catalyst Optimizer,
Yin Huai, Spark Summit 2017
8
Spark SQL Engine - Back End
Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution
Runtime
9
Agenda
10
Agenda
Physical
Planning
11
Physical Planning
• Transform logical operators into physical operators
• Choose between different physical alternatives
- e.g., broadcast-hash-join vs. sort-merge-join
• Includes physical traits of the execution engine
- e.g., partitioning & ordering.
• Some ops may be mapped into multiple physical nodes
- e.g., partial agg —> shuffle —> final agg
1
A Physical Plan Example Scan B
SELECT a1, sum(b1)FROM A Filter
JOIN B ON A.key = B.key Scan A
WHERE b1 < 1000 GROUP BY a1 BroadcastExchange
Scan B BroadcastHashJoin
Scan A
Filter HashAggregate
Join ShuffleExchange
Aggregate HashAggregate
1
Scheduling a Physical Plan
Job 1 Stage 1
Scan B
Filter
• Scalar subquery
Broadcast exchange: BroadcastExchange
Job 2 Stage 1
- Executed as separate jobs Scan A
• Partition-local ops: BroadcastHashJoin
- Executed in the same stage
HashAggregate
• Shuffle: ShuffleExchange
Stage 2
- The stage boundary
- A sync barrier across all nodes HashAggregate
1
Agenda
Code
Generation
15
Execution, Old: Volcano Iterator Model
• Volcano iterator model
- All ops implement the same interface, e.g., next()
- next() on final op -> pull input from child op by calling child.next() -> goes on
and on, ending up with a propagation of next() calls
• Pros: Good abstraction; Easy to implement
• Cons: Virtual function calls —> less efficient
next() next() next()
Scan Filter Project Result Iterator iterate
1
Execution, New: Whole-Stage Code Generation
• Inspired by Thomas Neumann’s paper
• Fuse a string of operators (oftentimes Scan
the entire stage) into one WSCG op
that runs the generated code. long count = 0;
Filter for (item in sales) {
if (price < 100) {
• A general-purpose execution engine count += 1;
just like Volcano model but without Project }
Volcano’s performance downsides: }
- No virtual function calls
- Data in CPU registers Aggregate
- Loop unrolling & SIMD
Execution Models: Old vs. New
• Volcano iterator model: Pull model; Driven by the final operator
next() next() next()
Scan Filter Project Result Iterator iterate
• WSCG model: Push model; Driven by the head/source operator
next()
Scan Filter Project Result Iterator iterate
1
A Physical Plan Example - WSCG
Job 2
WSCG Job 1
Scan A
WSCG
BroadcastHashJoin
Stage 1 Scan B
Stage 1
HashAggregate
Filter
ShuffleExchange
WSCG BroadcastExchange
Stage 2
HashAggregate
Implementation
• The top node WholeStageCodegenExec implements the iterator
interface to interop with other code-gen or non-code-gen
physical ops.
• All underlying operators implement a code-generation interface:
doProduce() & doConsume()
• Dump the generated code: df.queryExecution.debug.codegen
Single dependency
• A WSCG node contains a linear list of physical operators that
support code generation.
• No multi dependency between enclosed ops.
• A WSCG node may consist of one or more pipelines.
WSCG
Pipeline 1 Pipeline 2
Op1 Op2 Op3 Op4 Op5
A Single Pipeline in WSCG
• A string of non-blocking operators form a pipeline in WSCG
• The head/source:
- Implement doProduce() - the driving loop producing source data.
• The rest:
- doProduce() - fall through to head of the pipeline.
- Implement doConsume() for its own processing logic.
produce produce produce produce
Op1 Op2 Op3 WSCG Generate
Code
consume consume consume
A Single Pipeline Example
Scan SELECT sid FROM emps WHERE age < 36
produce while (table.hasNext()) {
InternalRow row = table.next();
Filter
produce
Project
produce
WholeStageCodegen
START: if (shouldStop()) return;
produce
Generated for RowIterator }
A Single Pipeline Example
Scan SELECT sid FROM emps WHERE age < 36
produce consume while (table.hasNext()) {
InternalRow row = table.next();
Filter
if (row.getInt(2) < 36) {
produce
Project
produce
}
WholeStageCodegen
START: if (shouldStop()) return;
produce
Generated for RowIterator }
A Single Pipeline Example
Scan SELECT sid FROM emps WHERE age < 36
produce consume while (table.hasNext()) {
InternalRow row = table.next();
Filter
if (row.getInt(2) < 36) {
produce consume String sid = row.getString(0);
rowWriter.write(0, sid);
Project
produce
}
WholeStageCodegen
START: if (shouldStop()) return;
produce
Generated for RowIterator }
A Single Pipeline Example
Scan SELECT sid FROM emps WHERE age < 36
produce consume while (table.hasNext()) {
InternalRow row = table.next();
Filter
if (row.getInt(2) < 36) {
produce consume String sid = row.getString(0);
rowWriter.write(0, sid);
Project
produce consume ret = rowWriter.getRow();
}
WholeStageCodegen
START: if (shouldStop()) return;
produce
Generated for RowIterator }
Multiple Pipelines in WSCG
• Head (source) operator: • End (sink): RowIterator
- The source, w/ or w/o input RDDs - Pulls result from the last pipeline
- e.g., Scan, SortMergeJoin • Blocking operators:
• Non-blocking operators: - End of the previous pipeline
- In the middle of the pipeline - Start of a new pipeline
- e.g., Filter, Project - e.g., HashAggregate, Sort
WSCG source non-blocking blocking non-blocking sink
Pipeline 1 Pipeline 2
RowIterator
Op1 Op2 Op3 Op4 Op5
Blocking Operators in WSCG
• A Blocking operator, e.g., HashAggregateExec, SortExec, break
pipelines, so there may be multiple pipelines in one WSCG node.
• A Blocking operator’s doConsume():
- Implement the callback to build intermediate result.
• A Blocking operator’s doProduce():
- Consume the entire output from upstream to finish building
the intermediate result.
- Start a new loop and produce output for downstream based
on the intermediate result.
A Blocking Operator Example - HashAgg
SELECT age, count(*) FROM emps GROUP BY age
HashAggregate
doProduce() while (table.hasNext()) {
InternalRow row = table.next();
child.produce()
int age = row.getInt(2);
Scan hashMap.insertOrIncrement(sid);
}
consume
HashAggregate
produce
START: WholeStageCodegen
produce
A Blocking Operator Example - HashAgg
SELECT age, count(*) FROM emps GROUP BY age
HashAggregate
doProduce() while (table.hasNext()) {
InternalRow row = table.next();
child.produce()
int age = row.getInt(2);
Scan hashMap.insertOrIncrement(sid);
}
consume
HashAggregate while (hashMapIter.hasNext()) {
Entry e = hashMapIter.next();
produce rowWriter.write(0, e.getKey());
start a new pipeline
consume rowWriter.write(1, e.getValue());
ret = rowWriter.getRow();
START: WholeStageCodegen if (shouldStop()) return;
produce }
WSCG: BHJ vs. SMJ Job 1
WSCG
Scan B
Job 2
• BHJ (broadcast-hash-join) is a
Filter
pipelined operator. Scan A
WSCG
• BHJ executes the build side job first, BroadcastHashJoin BroadcastExchange
the same way as in non-WSCG.
HashAggregate
• BHJ is fused together with the probe ShuffleExchange
side plan (i.e., streaming plan) in
WSCG
WSCG.
HashAggregate
WSCG: BHJ vs. SMJ
Job 1 WSCG
Scan B
WSCG
Scan A
Filter
ShuffleExchange
• SMJ (sort-merge-join) is ShuffleExchange
WSCG
NOT fused with either child Sort WSCG
Sort
plan for WSCG. Child plans
are separate WSCG nodes. WSCG
SortMergeJoin
• Thus, SMJ must be the head HashAggregate
operator of a WSCG node.
ShuffleExchange
WSCG
HashAggregate
WSCG Limitations
• Problems:
- No JIT compilation for bytecode size over 8000 bytes (*).
- Over 64KB methods NOT allowed by Java Class format.
• Solutions:
- Fallback - spark.sql.codegen.fallback; spark.sql.codegen.hugeMethodLimit
- Move blocking loops into separate methods, e.g. hash-map building in
HashAgg and sort buffer building in Sort.
- Split consume() into individual methods for each operator -
spark.sql.codegen.splitConsumeFuncByOperator
About Us
Software Engineers
• Maryann Xue
PMC of Apache Calcite & Apache Phoenix @maryannxue
• Xingbo Jiang
Apache Spark Committer @jiangxb1987
• Kris Mok
OpenJDK Committer @rednaxelafx
34
Agenda
RDDs
(DAGs)
35
A Physical Plan Example Scan B
Filter
SELECT a1, sum(b1) Scan A
BroadcastExchange
FROM A JOIN B
ON A.key = B.key BroadcastHashJoin
WHERE b1 < 1000
HashAggregate
GROUP BY a1
ShuffleExchange
HashAggregate
RDD and Partitions
RDD(Resilient Node1
Distributed Dataset)
represents an Partition
Partition
immutable, Partition Node2
partitioned collection
of elements that can
be operated in Node3
RDD
parallel.
Physical Operator
Volcano
iterator while (iter.hasNext()) output
model {
val tmpVal = Partition
Filter Partition
iter.next() Partition
if (condition(tmpVal))
RDD
{
return tmpVal
}
}
A Physical Plan Example - Scheduling
Job 2
Job 1
Scan A
Scan B
BroadcastHashJoin
Stage 1 Stage 1
Filter
HashAggregate
ShuffleExchange
BroadcastExchange
Stage 2
HashAggregate
Stage Execution
Scan A
TaskSet0
Stage 1 BroadcastHashJoin
HashAggregate
Partition0 Partition1 Partition2 Partition3
Stage Execution
0 Task0
1 Task1
2 TaskSet1
3 Task3
Partitions
Stage Execution
Task0 0 Task0
1 Task1
TaskSet2 2 TaskSet1
3 Task3
Partitions
How to run a Task
spark.task.cpus=1
Task5 Task6 Task7
Task0 Task1 Task2 Task3 Task4
Executor(spark.executor.cores=5)
Fault Tolerance
● MPP-like analytics engines(e.g., Teradata, Presto, Impala):
○ Coarser-grained recovery model
○ Retry an entire query if any machine fails
○ Short/simple queries
● Spark SQL:
○ Mid-query recovery model
○ RDDs track the series of transformations used to build
them (the lineage) to recompute lost partitions.
○ Long/complex queries [e.g., complex UDFs]
Handling Task Failures
Task Failure Fetch Failure
● Don’t count the failure into
● Record the failure count of
task failure count
the task
● Retry the stage if stage failure
● Retry the task if failure
< maxStageFailures
count < maxTaskFailures
● Abort the stage and
● Abort the stage and
corresponding jobs if stage
corresponding jobs if count
failure >= maxStageFailures
>= maxTaskFailures
● Mark executor/host as lost
(optional)
Agenda
Memory
Management
46
Memory Consumption in Executor JVM
Spark uses memory for:
• RDD Storage [e.g., call cache()].
• Execution memory [e.g., Shuffle
and aggregation buffers]
• User code [e.g., allocate large
arrays]
Challenges:
• Task run in a shared-memory environment.
• Memory resource is not enough!
Execution Memory
Execution Memory
• Buffer intermediate results
• Normally short lived
Storage Memory
User Memory
Reserved Memory
Storage Memory
Execution Memory
• Reuse data for future
computation
• Cached data can be Storage Memory
long-lived
• LRU eviction for spill User Memory
data
Reserved Memory
Unified Memory Manager
Execution Memory
(1.0 - spark.memory.storageFraction) *
• Express execution and USABLE_MEMORY
storage memory as one
single unified region Storage Memory
spark.memory.storageFraction *
• Keep acquiring execution USABLE_MEMORY
memory and evict storage as User Memory
you need more execution (1.0 - spark.memory.fraction) *
(SYSTEM_MEMORY - RESERVED_MEMORY)
memory
Reserved Memory
RESERVED_SYSTEM_MEMORY_BYTES
(300MB)
Dynamic occupancy mechanism
spark.memory.storageFraction
• If one of its space is insufficient but the other is free, then it
will borrow the other’s space.
• If both parties don’t have enough space, evict storage
memory using LRU mechanism.
One problem remains...
• The memory resource is not enough!
Inside JVM Outside JVM
Managed by GC Not managed by GC
On-Heap Memory Off-Heap Memory
Executor Process
Off-Heap Memory
• Enabled by spark.memory.offHeap.enabled
• Memory size controlled by
spark.memory.offHeap.size
Execution Memory
Storage Memory
Off-Heap Memory
• Pros
• Speed: Off-Heap Memory > Disk
• Not bound by GC
• Cons
• Manually manage memory allocation/release
Tuning Data Structures
In Spark applications:
• Prefer arrays of objects instead of collection classes
(e.g., HashMap)
• Avoid nested structures with a lot of small objects and
pointers when possible
• Use numeric IDs or enumeration objects instead of strings
for keys
Tuning Memory Config
spark.memory.fraction
• More execution and storage memory
• Higher risk of OOM
spark.memory.storageFraction
• Increase storage memory to cache more data
• Less execution memory may lead to tasks spill more often
Tuning Memory Config
spark.memory.offHeap.enabled
spark.memory.offHeap.size
• Off-Heap memory not bound by GC
• On-Heap + Off-Heap memory must fit in total executor
memory (spark.executor.memory)
spark.shuffle.file.buffer
spark.unsafe.sorter.spill.reader.buffer.size
• Buffer shuffle file to amortize disk I/O
• More execution memory consumption
About Us
Software Engineers
• Maryann Xue
PMC of Apache Calcite & Apache Phoenix @maryannxue
• Xingbo Jiang
Apache Spark Committer @jiangxb1987
• Kris Mok
OpenJDK Committer @rednaxelafx
58
Agenda
Vectorized
Reader
59
Vectorized Readers
Read columnar format data as-is without converting to row
format.
• Apache Parquet
• Apache ORC
• Apache Arrow
• ...
60
Vectorized Readers
Parquet vectorized reader is 9 times faster than the non-
vectorized one.
See blog post
61
Vectorized Readers
Supported built-in data sources:
• Parquet
• ORC
Arrow is used for intermediate data in PySpark.
62
Implement DataSource
DataSource v2 API provides the way to implement your own
vectorized reader.
• PartitionReaderFactory
• supportColumnarReads(...) to return true
• createColumnarReader(...) to return
PartitionReader[ColumnarBatch]
• [SPARK-25186] Stabilize Data Source V2 API
63
Delta Lake
• Full ACID transactions
• Schema management
• Scalable metadata handling
• Data versioning and time travel
• Unified batch/streaming support Delta Lake: https://delta.io/
Documentation:
• Record update and deletion https://docs.delta.io
For details, refer to the blog
• Data expectation https://tinyurl.com/yxhbe2lg
Agenda
UDF
65
What’s behind foo(x) in Spark SQL?
What looks like a function call can be a lot of things:
• upper(str): Built-in function
• max(val): Aggregate function
• max(val) over …: Window function
• explode(arr): Generator
• myudf(x): User-defined function
• myudaf(x): User-defined aggregate function
• transform(arr, x -> x + 1): Higher-order function
• range(10): Table-value function
Functions in Spark SQL
Builtin Scalar Java / Scala Python UDF (*) Aggregate / Higher-order
Function UDF Window Function
Scope 1 Row 1 Row 1 Row Whole table 1 Row
Data Feed Scalar Scalar Batch of data Scalar expressions Expression of
expressions expressions + aggregate buffer complex type
Process Same JVM Same JVM Python Worker Same JVM Same JVM
process
Impl. Level Expression Expression Physical Operator Physical Operator Expression
Data Type Internal External External Internal Internal
(*): and all other non-Java user-defined functions
UDF execution
User Defined Functions:
• Java/Scala UDFs
• Hive UDFs
• when Hive support enabled
Also we have:
• Python/Pandas UDFs
• will talk later in PySpark execution
68
Java/Scala UDFs
• UDF: User Defined Function
• Java/Scala lambdas or method references can be used.
• UDAF: User Defined Aggregate Function
• Need to implement UserDefinedAggregateFunction.
69
UDAF
Implement UserDefinedAggregateFunction
• def initialize(...)
• def update(...)
• def merge(...)
• def evaluate(...)
• ...
70
Hive UDFs
Available when Hive support enabled.
• Register using create function command
• Use in HiveQL
71
Hive UDFs
Provides wrapper expressions for each UDF type:
• HiveSimpleUDF: UDF
• HiveGenericUDF: GenericUDF
• HiveUDAFFunction: UDAF
• HiveGenericUDTF: GenericUDTF
72
UDF execution
1. Before invoking UDFs, convert arguments from internal data
format to objects suitable for each UDF types.
• Java/Scala UDF: Java/Scala objects
• Hive UDF: ObjectInspector
2. Invoke the UDF.
3. After invocation, convert the returned values back to internal
data format.
73
Agenda
PySpark
74
PySpark
PySpark is a set of Python bindings for Spark APIs.
• RDD
• DataFrame
• other libraries based on RDDs, DataFrames.
• MLlib, Structured Streaming, ...
Also, SparkR: R bindings for Spark APIs
75
PySpark
RDD vs. DataFrame:
• RDD invokes Python functions on Python worker
• DataFrame just constructs queries, and executes it on the
JVM.
• except for Python/Pandas UDFs
76
PySpark execution
Python script drives Spark on JVM via Py4J.
Executors run Python worker.
Executor Python Worker
Driver
Executor Python Worker
Python
Executor Python Worker
77
PySpark and Pandas
Ease of interop: PySpark can convert data between PySpark
DataFrame and Pandas DataFrame.
• pdf = df.toPandas()
• df = spark.createDataFrame(pdf)
Note: df.toPandas() triggers the execution of the PySpark
DataFrame, similar to df.collect()
78
PySpark and Pandas (cont’d)
New way of interop: Koalas brings the Pandas API to Apache
Spark
import databricks.koalas as ks
import pandas as pd
pdf = pd.DataFrame({'x':range(3), 'y':['a','b','b'], 'z':['a','b','b']})
# Create a Koalas DataFrame from pandas DataFrame
df = ks.from_pandas(pdf)
# Rename the columns
df.columns = ['x', 'y', 'z1']
# Do some operations in place: https://github.com/databricks/koalas
df['x2'] = df.x * df.x
79
Agenda
Python/Pandas
UDF
80
Python UDF and Pandas UDF
@udf('double')
def plus_one(v):
return v + 1
@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_plus_one(vs):
return vs + 1
81
Python/Pandas UDF execution
Batch of data Deserializer
Physical Operator
PythonRunner Invoke UDF
Batch of data Serializer
82
Python UDF execution
Batch of Rows Deserializer
PythonUDFRunner
Physical Operator
Invoke UDF
Batch of Rows Serializer
83
Pandas UDF execution
Batch of Columns Deserializer
ArrowPythonRunner
Physical Operator
Invoke UDF
Batch of Columns Serializer
84
Python/Pandas UDFs
Python UDF
• Serialize/Deserialize data with Pickle
• Fetch data in blocks, but invoke UDF row by row
Pandas UDF
• Serialize/Deserialize data with Arrow
• Fetch data in blocks, and invoke UDF block by block
85
Python/Pandas UDFs
Pandas UDF perform much better than row-at-a-time Python
UDFs.
• 3x to over 100x
See blog post
86
Further Reading
This Spark+AI Summit:
• Understanding Query Plans and Spark
Previous Spark Summits:
• A Deep Dive into Spark SQL’s Catalyst Optimizer
• Deep Dive into Project Tungsten: Bringing Spark Closer to
Bare Metal
• Improving Python and Spark Performance and
Interoperability with Apache Arrow
Thank you
Maryann Xue (maryann.xue@databricks.com)
Xingbo Jiang (xingbo.jiang@databricks.com)
Kris Mok (kris.mok@databricks.com)
88