Spark - Lazy Evaluation

Download as pdf or txt
Download as pdf or txt
You are on page 1of 3

Spark – Lazy Evaluation #

Introduction to Lazy Evaluation #

Lazy Evaluation is one of the core concepts in Apache Spark, including PySpark. This approach delays the
execution of operations until an action is triggered. In Spark, transformations like map, filter, and flatMap
are lazy, meaning they are not immediately executed when called. Instead, Spark builds a logical execution
plan (called a Directed Acyclic Graph or DAG) and only executes transformations when an action like
collect, count, or saveAsTextFile is invoked.

This design is crucial for optimization as it allows Spark to minimize data shuffling, pipeline
transformations, and optimize the execution plan before any actual computation is done.

Why Lazy Evaluation? #

1. Optimization: By delaying execution, Spark has the opportunity to optimize the sequence of
operations.
2. Reduced Memory Usage: Spark avoids storing intermediate results until the action is called, reducing
memory usage.
3. Fault Tolerance: Lazy evaluation allows Spark to reconstruct data lineage (the sequence of
transformations) and recompute lost data in case of failure.

Example of Lazy Evaluation #

from pyspark.sql import SparkSession

# Initialize a Spark session


spark = SparkSession.builder \
.appName("Lazy Evaluation Example") \
.getOrCreate()

# Sample data
data = [("John", 28), ("Jane", 35), ("Sam", 45), ("Alice", 32)]

# Create a DataFrame from the data


df = spark.createDataFrame(data, ["Name", "Age"])

# Transformation 1: Filter people with age > 30 (Lazy transformation)


adults = df.filter(df.Age > 30)

# Transformation 2: Select only the "Name" column (Lazy transformation)


names = adults.select("Name")

# At this point, no action has been triggered, so Spark hasn't processed the data yet.
# The above transformations are only "registered" in Spark's execution plan.

# Action: Collect the results (triggers execution)


result = names.collect()

# Print the result


for name in result:
print(name)

Explanation: #

The filter and select are transformations. They do not trigger any actual computation but merely
register their intent.
The collect() is an action. When this action is called, Spark triggers the entire computation, evaluates
the registered transformations, and fetches the result.
DAG (Directed Acyclic Graph) in Lazy Evaluation #

Spark optimizes the execution plan by creating a DAG of the transformations. This DAG represents the
operations to be executed and allows Spark to:

1. Avoid Redundant Computation: Spark can reuse intermediate results.


2. Optimize Joins: It can rearrange the execution plan to minimize shuffling.
3. Pipeline Operations: Combine multiple operations into a single stage for better performance.

In the above example:

1. When you call df.filter(), Spark records that a filter operation needs to be performed but does not
execute it.
2. Similarly, the select() transformation is recorded but not executed.
3. Only when collect() is called, Spark computes both transformations, optimizes them, and performs
the actual computation.

Code Example with Multiple Transformations #

Let’s take a more complex example where multiple transformations are applied before an action is invoked.
# Sample data
data = [("John", 28, "Male"), ("Jane", 35, "Female"), ("Sam", 45, "Male"), ("Alice", 32, "Female")]

# Create a DataFrame
df = spark.createDataFrame(data, ["Name", "Age", "Gender"])

# Apply multiple transformations (none of them are executed yet)


transformed_df = df.filter(df.Age > 30).select("Name", "Gender").orderBy("Name")

# The above transformations are still in the "planning" phase. Nothing has been computed yet.

# Trigger the execution by calling an action (e.g., show, collect)


transformed_df.show()

# Now Spark will evaluate the DAG and perform the computations in an optimized manner.

Optimizations Spark Can Apply Using Lazy Evaluation #

1. Predicate Pushdown: Spark can push down filters to the data source to reduce the amount of data
read.
2. Join Optimization: If there are joins, Spark can determine the most efficient join strategy (e.g.,
broadcast join).
3. Pipelining: Spark pipelines transformations, reducing the number of stages and improving
performance.

Lazy Evaluation with RDDs #

Lazy evaluation works similarly with RDDs (Resilient Distributed Datasets) in PySpark. The difference is in
syntax, but the underlying mechanism remains the same.
# Creating an RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# Applying a lazy transformation (no computation happens here)


squared_rdd = rdd.map(lambda x: x * x)

# Applying another lazy transformation


filtered_rdd = squared_rdd.filter(lambda x: x > 10)

# Action: Collect triggers the actual execution


result = filtered_rdd.collect()

# Print the result


print(result)

In this RDD example:

map() and filter() are lazy transformations.


collect() triggers the actual computation.

Key Actions that Trigger Lazy Evaluation #

collect(): Gathers all the data from the distributed RDD or DataFrame to the driver node.
count(): Counts the number of rows.
take(n): Fetches the first n rows.
saveAsTextFile(): Saves the RDD or DataFrame to a text file.
reduce(): Aggregates data across all partitions.

Conclusion #

Spark’s lazy evaluation model is designed to enhance performance by postponing the actual computation
until necessary. By deferring the execution of transformations, Spark can create an optimized execution plan,
minimizing data shuffling and improving resource utilization.

You might also like