0% found this document useful (0 votes)
180 views

Snowpark For Python

This document demonstrates connecting to Snowflake via Snowpark without using PySpark. It shows how to join and aggregate large tables, write results to a new table, and scale the warehouse size. Key benefits of Snowpark over Spark/PySpark are also summarized, including being quicker to migrate to, cheaper by using serverless compute that scales instantly, faster by eliminating unnecessary data movement, and easier to use with less maintenance required.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
180 views

Snowpark For Python

This document demonstrates connecting to Snowflake via Snowpark without using PySpark. It shows how to join and aggregate large tables, write results to a new table, and scale the warehouse size. Key benefits of Snowpark over Spark/PySpark are also summarized, including being quicker to migrate to, cheaper by using serverless compute that scales instantly, faster by eliminating unnecessary data movement, and easier to use with less maintenance required.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 5

https://github.

com/NickAkincilar/Sample_Snowpark_Demos/blob/main/
Snowpark_Data_Engineering_Public.ipynb

https://h2o.ai/blog/h2o-integrates-with-snowflake-snowpark-java-udfs-how-to-better-leverage-
the-snowflake-data-marketplace-and-deploy-in-database/

Install Snowpark
In [ ]:
# !pip install snowflake-snowpark-python

Connect to Snowflake via SnowPark (&


without PySpark)
In [32]:
import time
# ---> REMOVE PYSPARK REFERENCES

# import pyspark.sql.functions as f
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import udf,col
# from pyspark.sql.types import IntegerType
# spark = SparkSession.builder.appName("DataEngeering1").getOrCreate()

# <--- REPLACE WITH SNOWPARK REFERENCES (Rest of code is almost


identical)

import snowflake.snowpark.functions as f
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.functions import udf, col
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import call_udf

# <----- Make these changes before running the notebook -------


# Change Connection params to match your environment
#
<------------------------------------------------------------------------
----

Warehouse_Name = 'MY_DEMO_WH'
Warehouse_Size = "LARGE"
DB_name = 'DEMO_SNOWPARK'
Schema_Name = 'Public'

CONNECTION_PARAMETERS= {
'account': '<Snowflake_Account_Locator>',
'user': 'SomeUser',
'password': 'Not4u2Know',
'role': 'SYSADMIN'
}

print("Connecting to Snowflake.....\n")
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
print("Connected Successfully!...\n")

sql_cmd = f"CREATE OR REPLACE WAREHOUSE {Warehouse_Name} WAREHOUSE_SIZE =


'X-Small' AUTO_SUSPEND = 10 "
print("XS Cluster Created & Ready \n")

session.sql(sql_cmd).collect()

sql_cmd = f"CREATE OR REPLACE DATABASE {DB_name}"


session.sql(sql_cmd).collect()
print("Database is Created & Ready \n")

session.use_database(DB_name)
session.use_schema(Schema_Name)
session.use_warehouse(Warehouse_Name)
Connecting to Snowflake.....

Connected Successfully!...

XS Cluster Created & Ready

Database is Created & Ready

Start Data Engineering Process


In [30]:
# 2 - READ & JOIN 2 LARGE TABLES (600M & 1M rows)
print("Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing
results to new table(80M rows) ..\n")

dfLineItems = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM")
# 600 Million Rows
dfSuppliers = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.SUPPLIER")
# 1 Million Rows

print('Lineitems Table: %s rows' % dfLineItems.count())


print('Suppliers Table: %s rows' % dfSuppliers.count())

# 3 - JOIN TABLES
dfJoinTables = dfLineItems.join(dfSuppliers,
dfLineItems.col("L_SUPPKEY") ==
dfSuppliers.col("S_SUPPKEY"))

# 4 - SUMMARIZE THE DATA BY SUPPLIER, PART, SUM, MIN & MAX


dfSummary = dfJoinTables.groupBy("S_NAME", "L_PARTKEY").agg([
f.sum("L_QUANTITY").alias("TOTAL_QTY"),
f.min("L_QUANTITY").alias("MIN_QTY"),
f.max("L_QUANTITY").alias("MAX_QTY"),
])
Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing
results to new table(80M rows) ..

Lineitems Table: 600037902 rows


Suppliers Table: 1000000 rows

↑ Compute is NOT used up to this point. (Lazy Execution Model) !!!

3. Storing the Results in Table or Showing results


triggers the compute & previous steps.
In [31]:
start_time = time.time()

# 4 - INCREASE COMPUTE SIZE


print( f"Resizing to from XS(1 Node) to {Warehouse_Size} ..")

sql_cmd = f"ALTER WAREHOUSE {Warehouse_Name} SET WAREHOUSE_SIZE =


'{Warehouse_Size}' WAIT_FOR_COMPLETION = TRUE"
session.sql(sql_cmd).collect()

print("Completed!...\n\n")

# 5 - WRITE THE RESULTS TO A NEW TABLE ( 80 Million Rows)


# <-- This is when all the previous operations are compiled & executed as
a single job
print("Creating the target SALES_SUMMARY table...\n\n")
dfSummary.write.mode("overwrite").saveAsTable("SALES_SUMMARY")
print("Target Table Created!...")

# 6 - QUERY THE RESULTS (80 Million Rows)


print("Querying the results..\n")
dfSales = session.table("SALES_SUMMARY")
dfSales.show()
end_time = time.time()
# 7 - SCALE DOWN COMPUTE TO 1 NODE
print("Reducing the warehouse to XS..\n")
sql_cmd = "ALTER WAREHOUSE {} SET WAREHOUSE_SIZE =
'XSMALL'".format(Warehouse_Name)
session.sql(sql_cmd).collect()

print("Completed!...\n")

print("--- %s seconds to Join, Summarize & Write Results to a new Table


--- \n" % int(end_time - start_time))
print("--- %s Rows Written to SALES_SUMMARY table" % dfSales.count())
Resizing to from XS(1 Node) to LARGE ..
Completed!...

Creating the target SALES_SUMMARY table...

Target Table Created!...


Querying the results..

-------------------------------------------------------------------------
-
|"S_NAME" |"L_PARTKEY" |"TOTAL_QTY" |"MIN_QTY" |"MAX_QTY"
|
-------------------------------------------------------------------------
-
|Supplier#000941845 |13441818 |163.00 |14.00 |45.00
|
|Supplier#000816569 |1316566 |287.00 |3.00 |50.00
|
|Supplier#000305838 |18555783 |219.00 |3.00 |49.00
|
|Supplier#000030491 |10030490 |203.00 |4.00 |47.00
|
|Supplier#000659231 |1409229 |158.00 |19.00 |50.00
|
|Supplier#000911793 |13911792 |310.00 |2.00 |49.00
|
|Supplier#000560166 |9310156 |108.00 |6.00 |44.00
|
|Supplier#000598113 |7598112 |155.00 |12.00 |47.00
|
|Supplier#000951634 |16701617 |190.00 |9.00 |50.00
|
|Supplier#000460895 |7210887 |268.00 |4.00 |49.00
|
-------------------------------------------------------------------------
-
Reducing the warehouse to XS..

Completed!...

--- 19 seconds to Join, Summarize & Write Results to a new Table ---

--- 79975543 Rows Written to SALES_SUMMARY table

Benefits of Snowpark Over Spark &


PySpark
- Quick to Migrate as code is mostly identical & does not require re-learning
new language

- Cheaper as compute is fully serverless. It can Scale (up/Down) instantly via


code & runs(costs) only when in use.

- Faster as all unnecesseary data movement is eliminated = Less time using


Compute = Less Cost

- Easier to use = Less FTE as Little to No Maintanence needed for Compute


& Storage.

https://github.com/NickAkincilar/Sample_Snowpark_Demos

You might also like