Scenario Based Interview
Pyspark vs
Spark SQL
Ganesh. R
#Problem Statement You are the restaurant owner and you want to analyze a possible
expansion (there will be at least one customer every day).
Compute the moving average of how much the customer paid in a seven days window (i.e.,
current day + 6 days before). average_amount should be rounded to two decimal places.
Return the result table ordered by visited_on in ascending order.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum, round, window
from pyspark.sql.types import DateType
# Initialize Spark session
spark = SparkSession.builder.appName("MovingAverage").getOrCreate()
# Sample data
data = [
(1, "Jhon", "2019-01-01", 100),
(2, "Daniel", "2019-01-02", 110),
(3, "Jade", "2019-01-03", 120),
(4, "Khaled", "2019-01-04", 130),
(5, "Winston", "2019-01-05", 110),
(6, "Elvis", "2019-01-06", 140),
(7, "Anna", "2019-01-07", 150),
(8, "Maria", "2019-01-08", 80),
(9, "Jaze", "2019-01-09", 110),
(1, "Jhon", "2019-01-10", 130),
(3, "Jade", "2019-01-10", 150),
]
# Create DataFrame
columns = ["customer_id", "name", "visited_on", "amount"]
df = spark.createDataFrame(data, schema=columns)
df.display()
df.printSchema()
root
|-- customer_id: long (nullable = true)
|-- name: string (nullable = true)
|-- visited_on: string (nullable = true)
|-- amount: long (nullable = true)
# Define a window specification
window_spec = Window.orderBy("visited_on").rowsBetween(-6, 0)
# Calculate the rolling sum and average
result_df = (
df.groupBy("visited_on")
.agg(sum("amount").alias("daily_amount"))
.withColumn("amount", sum("daily_amount").over(window_spec))
.withColumn("average_amount",
round(avg("daily_amount").over(window_spec), 2))
)
# Filter to include only rows where row_number >= 7
result_df = (
result_df.withColumn("row_number",
row_number().over(Window.orderBy("visited_on")))
.filter(col("row_number") >= 7)
.select("visited_on", "amount", "average_amount")
)
# Show the result
result_df.display()
df.createOrReplaceTempView("Customer")
%sql
WITH CustomerGrouped AS (
SELECT
visited_on,
SUM(amount) AS total_amount
FROM
Customer
GROUP BY
visited_on
),
MovingAverage AS (
SELECT
visited_on,
total_amount,
SUM(total_amount) OVER (
ORDER BY
visited_on ROWS BETWEEN 6 PRECEDING
AND CURRENT ROW
) AS sum_amount_7d
FROM
CustomerGrouped
)
SELECT
visited_on,
sum_amount_7d AS amount,
ROUND(sum_amount_7d / 7, 2) AS average_amount
FROM
MovingAverage
WHERE
DATEDIFF(
visited_on,
(
SELECT
MIN(visited_on)
FROM
CustomerGrouped
)
) >= 6
ORDER BY
visited_on;
IF YOU FOUND
THIS POST
USEFUL, PLEASE
SAVE IT.
Ganesh. R
+91-9030485102. Hyderabad, Telangana. rganesh0203@gmail.com
https://medium.com/@rganesh0203 https://rganesh203.github.io/Portfolio/
https://github.com/rganesh203. https://www.linkedin.com/in/r-ganesh-a86418155/
https://www.instagram.com/rg_data_talks/ https://topmate.io/ganesh_r0203