# [ PySpark SQL and DataFrames ] [ cheatsheet ]
1. Creating DataFrames
● Create DataFrame from RDD: df = spark.createDataFrame(rdd)
● Create DataFrame from list: df = spark.createDataFrame([(1, "John"), (2,
"Jane"), (3, "Bob")])
● Create DataFrame from CSV: df = spark.read.csv("path/to/file.csv",
header=True, inferSchema=True)
● Create DataFrame from JSON: df = spark.read.json("path/to/file.json")
● Create DataFrame from Parquet: df =
spark.read.parquet("path/to/file.parquet")
● Create DataFrame from Avro: df =
spark.read.format("avro").load("path/to/file.avro")
● Create DataFrame from ORC: df = spark.read.orc("path/to/file.orc")
● Create DataFrame from JDBC: df = spark.read.format("jdbc").option("url",
"jdbc:postgresql:dbserver").option("dbtable",
"schema.tablename").option("user", "username").option("password",
"password").load()
● Create empty DataFrame with schema: schema =
StructType([StructField("id", IntegerType()), StructField("name",
StringType())]); df = spark.createDataFrame([], schema)
2. DataFrame Operations
● Show DataFrame: df.show()
● Show DataFrame with truncated columns: df.show(truncate=False)
● Show DataFrame with limited rows: df.show(n=10)
● Print DataFrame schema: df.printSchema()
● Select columns: df.select("column1", "column2")
● Select columns with aliases: df.select(col("column1").alias("col1"),
col("column2").alias("col2"))
● Filter rows: df.filter(col("age") > 18)
● Filter rows with multiple conditions: df.filter((col("age") > 18) &
(col("gender") == "M"))
● Filter rows with SQL expression: df.filter("age > 18 AND gender = 'M'")
● Filter rows with NULL values: df.filter(col("column").isNull())
● Filter rows with NOT NULL values: df.filter(col("column").isNotNull())
● Filter rows with IN clause: df.filter(col("column").isin(1, 2, 3))
By: Waleed Mousa
● Filter rows with LIKE clause: df.filter(col("name").like("J%"))
● Filter rows with RLIKE clause: df.filter(col("name").rlike("J.*"))
● Filter rows with BETWEEN clause: df.filter(col("age").between(18, 30))
● Distinct values: df.distinct()
● Distinct values of specific columns: df.dropDuplicates(["column1",
"column2"])
● Sort by column: df.sort("column")
● Sort by multiple columns: df.sort("column1", "column2")
● Sort by column in descending order: df.sort(col("column").desc())
● Group by column: df.groupBy("column")
● Group by multiple columns: df.groupBy("column1", "column2")
● Aggregations (count, sum, avg, min, max):
df.groupBy("column").agg(count("*").alias("count"),
sum("value").alias("sum"), avg("value").alias("avg"),
min("value").alias("min"), max("value").alias("max"))
● Pivot table: df.groupBy("column1").pivot("column2").agg(count("*"))
● Unpivot table: df.select("column1", expr("stack(3, 'column2', column2,
'column3', column3, 'column4', column4) as (key, value)")).where("value
is not null")
● Window functions (rank, dense_rank, percent_rank, row_number): from
pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2"); df.withColumn("rank",
rank().over(window))
● Lag and lead functions: from pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2"); df.withColumn("lag",
lag("value", 1).over(window)).withColumn("lead", lead("value",
1).over(window))
● Cumulative sum: from pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2");
df.withColumn("cumulative_sum", sum("value").over(window))
● Cumulative max: from pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2");
df.withColumn("cumulative_max", max("value").over(window))
3. DataFrame Joins
● Inner join: df1.join(df2, on="key", how="inner")
● Left outer join: df1.join(df2, on="key", how="left")
● Right outer join: df1.join(df2, on="key", how="right")
● Full outer join: df1.join(df2, on="key", how="full")
● Left semi join: df1.join(df2, on="key", how="leftsemi")
By: Waleed Mousa
● Left anti join: df1.join(df2, on="key", how="leftanti")
● Cross join: df1.crossJoin(df2)
● Self join: df.alias("t1").join(df.alias("t2"), on="key")
● Join with complex condition: df1.join(df2, (df1.column1 == df2.column2) &
(df1.column3 > df2.column4))
● Join with multiple keys: df1.join(df2, on=["key1", "key2"], how="inner")
4. DataFrame Set Operations
● Union: df1.union(df2)
● Union by name: df1.unionByName(df2)
● Intersect: df1.intersect(df2)
● Except: df1.except(df2)
● Subtract: df1.subtract(df2)
5. DataFrame Sorting
● Sort by column: df.sort("column")
● Sort by multiple columns: df.sort("column1", "column2")
● Sort by column in ascending order: df.sort(col("column").asc())
● Sort by column in descending order: df.sort(col("column").desc())
6. DataFrame Grouping and Aggregation
● Group by column: df.groupBy("column")
● Group by multiple columns: df.groupBy("column1", "column2")
● Aggregations (count, sum, avg, min, max):
df.groupBy("column").agg(count("*").alias("count"),
sum("value").alias("sum"), avg("value").alias("avg"),
min("value").alias("min"), max("value").alias("max"))
● Aggregation with filter: df.groupBy("column").agg(sum(when(col("value") >
100, col("value"))).alias("sum_filtered"))
● Aggregation with multiple filters:
df.groupBy("column").agg(sum(when(col("value") > 100,
col("value"))).alias("sum_filtered1"), sum(when(col("value") < 50,
col("value"))).alias("sum_filtered2"))
● Pivot table: df.groupBy("column1").pivot("column2").agg(count("*"))
● Unpivot table: df.select("column1", expr("stack(3, 'column2', column2,
'column3', column3, 'column4', column4) as (key, value)")).where("value
is not null")
By: Waleed Mousa
7. DataFrame Window Functions
● Window functions (rank, dense_rank, percent_rank, row_number): from
pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2"); df.withColumn("rank",
rank().over(window))
● Lag and lead functions: from pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2"); df.withColumn("lag",
lag("value", 1).over(window)).withColumn("lead", lead("value",
1).over(window))
● Cumulative sum: from pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2");
df.withColumn("cumulative_sum", sum("value").over(window))
● Cumulative max: from pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2");
df.withColumn("cumulative_max", max("value").over(window))
● Moving average: from pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2").rowsBetween(-2, 0);
df.withColumn("moving_avg", avg("value").over(window))
● Running total: from pyspark.sql.window import Window; window =
Window.partitionBy("column1").orderBy("column2").rowsBetween(Window.unbou
ndedPreceding, Window.currentRow); df.withColumn("running_total",
sum("value").over(window))
8. DataFrame Explode and Flatten
● Explode array column: df.select(explode("array_column"))
● Explode map column: df.select(explode("map_column"))
● Flatten struct column: df.select("*", col("struct_column.*"))
● Flatten nested struct column: df.select("*",
col("nested_struct_column.level1.*"),
col("nested_struct_column.level2.*"))
9. DataFrame Array Functions
● Array contains: df.filter(array_contains(col("array_column"), "value"))
● Array distinct: df.select(array_distinct(col("array_column")))
● Array except: df.select(array_except(col("array_column1"),
col("array_column2")))
● Array intersect: df.select(array_intersect(col("array_column1"),
col("array_column2")))
By: Waleed Mousa
● Array join: df.select(array_join(col("array_column"), ","))
● Array max: df.select(array_max(col("array_column")))
● Array min: df.select(array_min(col("array_column")))
● Array position: df.select(array_position(col("array_column"), "value"))
● Array remove: df.select(array_remove(col("array_column"), "value"))
● Array repeat: df.select(array_repeat("value", 3))
● Array size: df.select(size(col("array_column")))
● Array sort: df.select(array_sort(col("array_column")))
● Array union: df.select(array_union(col("array_column1"),
col("array_column2")))
● Array zip: df.select(arrays_zip(col("array_column1"),
col("array_column2")))
10. DataFrame Map Functions
● Map contains key: df.filter(col("map_column").getItem("key").isNotNull())
● Map keys: df.select(map_keys(col("map_column")))
● Map values: df.select(map_values(col("map_column")))
● Map from entries: df.select(map_from_entries(col("array_column")))
● Map concat: df.select(map_concat(col("map_column1"), col("map_column2")))
● Map zip with: df.select(map_zip_with(col("map_column1"),
col("map_column2"), (k, v1, v2) => v1 + v2))
11. DataFrame Date and Timestamp Functions
● Current date: df.select(current_date())
● Current timestamp: df.select(current_timestamp())
● Date add: df.select(date_add(col("date_column"), 7))
● Date format: df.select(date_format(col("date_column"), "yyyy-MM-dd"))
● Date sub: df.select(date_sub(col("date_column"), 7))
● Date diff: df.select(datediff(col("end_date"), col("start_date")))
● To date: df.select(to_date(col("timestamp_column")))
● To timestamp: df.select(to_timestamp(col("string_column"), "yyyy-MM-dd
HH:mm:ss"))
● Trunc: df.select(trunc(col("timestamp_column"), "year"))
12. DataFrame Miscellaneous Functions
● Coalesce: df.select(coalesce(col("column1"), col("column2"),
lit("default_value")))
By: Waleed Mousa
● When otherwise: df.select(when(col("column") > 10,
"GT10").when(col("column") < 5, "LT5").otherwise("BETWEEN"))
● Case when: df.select(expr("CASE WHEN column1 > 10 THEN 'GT10' WHEN
column1 < 5 THEN 'LT5' ELSE 'BETWEEN' END"))
● Concat: df.select(concat(col("column1"), lit("_"), col("column2")))
● Concat with separator: df.select(concat_ws("_", col("column1"),
col("column2"), col("column3")))
● Substring: df.select(substring(col("column"), 1, 5))
● Substring index: df.select(substring_index(col("column"), ".", 1))
● Instr: df.select(instr(col("column"), "substring"))
By: Waleed Mousa