Python for Data Engineering
Python plays a crucial role in the world of data engineering, offering versatile
and powerful libraries. It has been adopted in various domains, including data
science, machine learning, AI, data visualization, and data engineering. Python
is widely used in big data processing through frameworks like Apache Spark,
workflow orchestration, web scraping, and more.
Build-in data structures:
Python provides several built-in data structures that are widely used for various
purposes. List, Tuple, Dictionary, and Set are four different types of data
structures, each with its characteristics and use cases.
List:
Mutable: Lists are mutable, meaning you can modify their elements after the
list is created. You can add, remove, or modify elements.
Syntax: Defined using square brackets [].
my_list = [1, 2, 3, 'a', 'b', 'c']
my_list.append(4) # Adds 4 to the end of the list
my_list.remove('a') # Removes the element 'a'
Tuple:
Immutable: Tuples are immutable, meaning once they are created, their
elements cannot be changed or modified. Unlike lists, you can’t add or remove
elements from a tuple.
Syntax: Defined using parentheses ().
my_tuple = (1, 2, 3, 'a', 'b', 'c')
Dictionary:
Dictionaries are used to store data values in key-value pairs. A dictionary is a
collection that is changeable and does not allow duplicates in the keys.
thisdict = {
"brand": "Ford",
"model": "Mustang",
"year": 1964
}
Set
Unordered and Unique Elements: Sets are unordered collections of unique
elements. They do not allow duplicate values, and the order of elements is not
guaranteed.
Syntax: Defined using curly braces {} or by using the set() constructor.
my_set = {1, 2, 3}
my_set.add(1)
my_set.add(1)
print(my_set)
As you can see, even after adding two more ‘1’s, the set still contains only one ‘1’.
Operations on Lists
List Comprehension:
List comprehension offers a shorter syntax when you want to create a new list
based on the values of an existing list.
This allows you to simplify code.
# Without list comprehension
squares = []
for i in range(5):
squares.append(i**2)
print(squares)
# With list comprehension
squares = []
squares = [i**2 for i in range(5)]
print(squares)
# Output:
# [0, 1, 4, 9, 16]
Filter, Map, and Reduce
Map, Filter, and Reduce allow the programmer to write simpler, shorter code,
without loops.
Filter:
Filter as the name suggests helps to filter your list using filter conditions.
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
# Filter even numbers
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
# Output: [2, 4, 6, 8]
Map:
The Map function allows us to use a function on a list of elements.
def power2(val: int) -> int:
return val*val
numbers = [1, 2, 3, 4, 5]
power_numbers = list(map(power2, numbers))
# OR
squared_numbers = list(map(lambda x: x**2, numbers))
# Output: [1, 4, 9, 16, 25]
Reduce:
The reduce() function is a powerful tool in Python that operates on a list (or
any iterable), applies a function to its elements, and ‘reduces’ them to a single
output.
from functools import reduce
words = ["apple", "banana", "orange", "apple", "grape", "banana"]
# Count the occurrences of each word
word_counts = reduce(lambda counts, word: {**counts, word:
counts.get(word, 0) + 1}, words, {})
print(word_counts)
# Output: {'apple': 2, 'banana': 2, 'orange': 1, 'grape': 1}
numbers = [1, 2, 3, 4, 5]
product = reduce((lambda x, y: x * y), numbers)
# Output: 120
Enumerate:
The enumerate function adds a counter to an iterable and returns it. If your
background is in C# avoid using this:
lista = ['a','b','c','d','e']
count = 0
for l in lista:
print('Index:', count,' Value:', l)
count+=1
Use Enumerate to achieve the same result:
lista = ['a','b','c','d','e']
for count, l in enumerate(lista):
print('Index:', count,' Value:', l)
Generators:
Python provides a generator to create your iterator function. A generator is a
special type of function that does not return a single value, instead, it returns an
iterator object with a sequence of values. In a generator function, a yield keyword
is used instead of a return. It allows you to create an iterator without loading the
entire dataset into memory, making it suitable for processing huge files.
text = """
transaction_id,user\r
1,aaa\r
\r
2,xx\r
3,ccc\r
\r
"""
def process_large_file(text):
for line in text.split("\r"):
# Process the line
processed_line = line.strip().upper()
if processed_line != "":
# Yield the processed line
yield processed_line
# Example usage
for processed_line in process_large_file(text):
print(processed_line)
# Output:
# 1,AAA
# 2,XX
# 3,CCC
We can use it to adjust file format to remove content that doesn’t meet the
CSV format requirements.
Decorators:
Python Decorators are used to apply additional functionality to objects. They
are used to provide more functionality without having to write additional code
inside the object. For example, we can modify the returned value and display it
from the decorator function.
def make_upper(function):
def upper():
f = function()
print(f"this from orgin value: {f}")
return f.upper()
return upper
@make_upper #decorator
def helloworld():
return "hello world"
print(helloworld())
# Otput:
# this from orgin value: hello world
# HELLO WORLD
In practice, you can create a retry decorator to execute a function in the case of
an error a few times.
import time
def retry(times, wait):
def decorator(func):
def newfn(*args, **kwargs):
attempt = 0
while attempt < times:
try:
time.sleep(wait)
return func(*args, **kwargs)
except Exception as e:
print(
'Exception thrown when attempting to run %s,
attempt '
'%d of %d' % (func, attempt, times)
)
attempt += 1
time.sleep(wait)
return func(*args, **kwargs)
return newfn
return decorator
@retry(times=3, wait=2)
def get_from_rest():
print('Try read data from rest API')
raise ConnectionError ('Lack of connection')
get_from_rest()
from dataclasses import dataclass
Data Class:
A data class in Python is a specially structured class that is optimized for the
storage and representation of data. Data classes have certain built-in functions to
take care of the representation of data as well as its storage.
Data class takes care of things like displaying values, and object comparison. We
don’t need to use a constructor to assign values. You don’t need to implement
__repr__, __eq__, or __hash__ for debugging and object comparison.
@dataclass #dataclass decorator
class cutomerD:
name: str #Type Hints
id: int
surname: str
class customer:
def __init__(self,name,aid,books):
self.name = name
self.id = aid
self.surname = books
Obj1 = customer("Erick",1254,"Nowak")
Obj2 = customer("Erick",1254,"Nowak")
Obj3 = cutomerD("Erick",1254,"Nowak")
Obj4 = cutomerD("Erick",1254,"Nowak")
print("Difrence for debuging")
print(Obj1)
print(Obj3)
print("\nDifference in Equality Check")
print(Obj1==Obj2)
print(Obj3==Obj4)
# Output:
# Difrence for debuging
# <__main__.customer object at 0x0000021E1108AC80>
# cutomerD(name='Erick', id=1254, surname='Nowak')
#
# Difference in Equality Check
# False
# True
Easy way to convert to dictionary or tuple. It simplifies the code when we need to
operate on this format and save data in JSON output.
from dataclasses import dataclass, astuple, asdict
@dataclass #dataclass decorator
class customer:
name: str #Type Hints
id: int
surname: str
Obj1 = customer("Erick",1254,"Nowak")
print(astuple(Obj1))
print(asdict(Obj1))
# Output:
# ('Erick', 1254, 'Nowak')
# {'name': 'Erick', 'id': 1254, 'surname': 'Nowak'}
json.dumps(asdict(Obj1))
Concurrency vs. parallelism:
Concurrency and parallelism are names for two different mechanisms for task
execution in a script. For a data engineer, these techniques will help to speed
up the process of retrieving data from API or transforming data.
Multithreading Pools:
Multithreading is a way to achieve parallelism by executing multiple threads of
code. A thread is a lightweight process that shares the same memory space as
the parent process. To use multi-threading pools in Python, you can use the
ThreadPoolExecutor class from the concurrent.futures module. Using threads
we can parallel calls to API to retrieve data.
import calendar
from concurrent.futures import ThreadPoolExecutor
import requests
def generate_dates(year, month):
_, last_day = calendar.monthrange(year, month)
dates = [f"{year}-{month:02d}-{day:02d}" for day in range(1, last_day
+ 1)]
return dates
year = 2023
month = 10
result = generate_dates(year, month)
urls = []
for x in result:
urls.append(f"http://api.nbp.pl/api/exchangerates/rates/a/gbp/{x}/")
def download_page(url):
response = requests.get(url)
return response.content
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(download_page, urls))
for result in results:
print(result)
Concurrency:
Concurrency in Python can be achieved through coroutines or asynchronous
programming, providing an alternative approach to running functions
concurrently. Unlike traditional threading, coroutines use specific programming
constructs and are managed by the Python runtime with significantly reduced
overhead.
For example, consider the usage of asyncio:
import asyncio
import time
async def sql_command1():
await asyncio.sleep(2)
print("Query executed 1")
return {"col1": 1, "col2": 2}
async def sql_command2():
await asyncio.sleep(3)
print("Query executed 2")
return {"col1": 1, "col2": 2}
async def sql_command3():
await asyncio.sleep(3)
print("Query executed 3")
return {"col1": 1, "col2": 2}
async def main():
start_time = time.time()
sql1 = asyncio.create_task(sql_command1())
sql2 = asyncio.create_task(sql_command2())
sql3 = asyncio.create_task(sql_command3())
res1 = await sql1
res2 = await sql2
res3 = await sql3
end_time = time.time()
exec_time = end_time - start_time
print(f"re1 {res1}")
print(f"re2 {res2}")
print(f"re3 {res3}")
print(f"total time {exec_time:.2f}")
if __name__ == "__main__":
asyncio.run(main())
Rather than executing code synchronously and waiting for each step, using this
method allows us to import data simultaneously from different URLs, thereby
reducing import time.
For example, here is an illustration of importing data from multiple URLs:
import aiohttp
import asyncio
import calendar
def generate_dates(year, month):
_, last_day = calendar.monthrange(year, month)
dates = [f"{year}-{month:02d}-{day:02d}" for day in range(1, last_day
+ 1)]
return dates
year = 2023
month = 10
result = generate_dates(year, month)
urls = []
for x in result:
urls.append(f"http://api.nbp.pl/api/exchangerates/rates/a/gbp/{x}/")
async def download_page(session, url):
async with session.get(url) as r:
return await r.text()
async def main():
async with aiohttp.ClientSession() as session:
datas = await asyncio.gather(*[download_page(session, u) for u in
urls])
for x in datas:
print(x)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
Integration with Cloud Storage
When integrating our application with cloud storage, the typical approach
involves using SDKs and writing methods to handle tasks such as reading,
writing, and listing files. However, this often requires different methods for
each cloud provider.
Alternatively, Python libraries like gcsfs, adlfs, and s3fs offer a streamlined way
to integrate with the cloud storage services of Azure, AWS, and GCP
respectively.
gcsfs — GCP Cloud storage integration:
import gcsfs
fs = gcsfs.GCSFileSystem()
dest = "gs://dxxx/clients/clients.csv"
with fs.open(dest,"wb") as file:
file.write("hello;csv;file")
adlfs- Azure Account storage integration
import adlfs
fs =
adlfs.AzureBlobFileSystem(account_name=os.environ["AZURE_STORAGE_ACCOUNT_N
AME"])
local_filename = "landing/clients.csv"
with fs.open(local_filename, "wb") as f:
f.write("hello;csv;file")
s3fs- AWS S3 storage integration:
import s3fs
fs = s3fs.S3FileSystem(key=mykey, secret=mysecretkey)
bucket = "my-bucket"
files = fs.ls(bucket)
with s3.open('my-bucket/my-file.txt', 'rb') as f:
print(f.read())
As evident, these libraries offer a straightforward and unified method for
communication with cloud storage. The consistency in file read/write
operations across gcsfs, adlfs, and s3fs proves immensely helpful, especially
when working with multiple cloud providers. This uniformity not only simplifies
code implementation but also contributes to making your code cloud-agnostic,
allowing for seamless transitions between different cloud storage services.
Useful libraries
As a data engineer, a set of essential libraries is crucial to efficiently handle
various aspects of data processing. These include libraries for data
manipulation, filtering, adjusting values, aggregations, as well as tools for
seamlessly working with diverse file formats such as CSV, Parquet, JSON, Avro,
and more. Additionally, having robust libraries for interfacing with databases
further streamlines data management tasks, contributing to the overall
effectiveness of data engineering workflows.
Pandas:
Pandas stands as a widely-used Python library in the realm of data
manipulation and analysis. Its versatility makes it an invaluable tool for
handling datasets of various sizes and complexities. Below is an example
demonstrating how Pandas can efficiently read large CSV files, showcasing its
capability to manage substantial datasets with ease:
import pandas as pd
chunksize = 10 ** 6
for chunk in pd.read_csv(filename, chunksize=chunksize):
for index, row in chunk.iterrows():
print(row)
PySpark
PySpark is the Python library for Apache Spark, an open-source, distributed
computing system. Apache Spark offers a fast and versatile cluster computing
framework designed for extensive big data processing and analytics. One of its
key strengths is the ability to efficiently perform processing tasks on massive
datasets while also enabling the distribution of these tasks across multiple
computers for enhanced scalability.
data = {'Name': ['Alice', 'Bob', 'Charlie', 'David'],
'Age': [25, 30, 35, 40],
'City': ['New York', 'San Francisco', 'Los Angeles', 'Chicago']}
df = spark.createDataFrame(data)
df.printSchema()
Polars
Polars is a high-performance DataFrame library implemented in Rust and
designed for seamless data manipulation in Python. While similar to Pandas in
its DataFrame functionality, Polars distinguishes itself by prioritizing
performance. Below is a brief example showcasing the efficiency and ease of
use offered by Polars:
import polars as pl
# Creating a DataFrame
data = {'Name': ['Alice', 'Bob', 'Charlie', 'David'],
'Age': [25, 30, 35, 40],
'City': ['New York', 'San Francisco', 'Los Angeles', 'Chicago']}
df = pl.DataFrame(data)
# Filtering data
filtered_df = df.filter(df['Age'] > 30)
# Adding a new column
df = df.with_columns(salary = pl.lit(100))
# Grouping data and calculating statistics
grouped_city =
df.groupby('City').agg(pl.col('Age').mean().alias('Avg_Age'))
DuckDB:
DuckDB stands out as an in-memory analytical database management system
tailored for OLAP (Online Analytical Processing) workloads. Renowned for its
exceptional performance on analytical queries and efficient resource utilization,
DuckDB offers a seamless experience for data analysis. One notable feature is its
execution on a local machine, eliminating the need for server installations.
Moreover, DuckDB seamlessly integrates with Python, enabling the reading of
various formats such as CSV and Parquet. Its versatility extends to integration with
popular data frame libraries like Polars and Pandas, providing users with flexible
options for data manipulation and analysis.
import duckdb
import pandas
# Create a Pandas dataframe
my_df = pandas.DataFrame.from_dict({'a': [42]})
# query the Pandas DataFrame "my_df"
# Note: duckdb.sql connects to the default in-memory database connection
results = duckdb.sql("SELECT * FROM my_df").df()
Faker-Synthetic data generator
Faker is a powerful Python package designed for generating synthetic data. It
proves invaluable for testing pipelines, databases, and creating stress tests by
providing a diverse set of fake data, including names, addresses, card numbers,
and more.
Noteworthy is Faker’s versatility, allowing users to customize generated data
using locales. This feature enhances the tool’s adaptability to various scenarios,
making it a go-to choice for those in need of realistic yet synthetic data for
testing and development.
from faker import Faker
# Generate synthetic client data
def generate_client_data(num_clients=100000):
clients = []
for client_num in range(1, num_clients + 1):
client = {
"client_number": client_num,
"name": fake.name(),
"email": fake.email(),
"phone_number": fake.phone_number(),
"bulding_number": fake.building_number(),
"street_name": fake.street_name(),
"postcode": fake.postcode(),
"city": fake.city(),
"state": fake.state(),
"birth_date": fake.date_of_birth(minimum_age=18,
maximum_age=90).strftime('%Y-%m-%d'),
"credit_card_number" :
fake.credit_card_number(card_type='mastercard'),
}
clients.append(client)
return clients
SQLAlchemy — Connection to Database
Data engineering often involves working with databases, encompassing tasks
like querying tables, and inserting, updating, and deleting rows. SQLAlchemy is
a versatile Python library that supports a range of popular SQL databases such
as PostgreSQL, Oracle, MS SQL, Snowflake, BigQuery, and more. Notably, its
seamless integration with Pandas streamlines and automates many data-
related tasks. To establish a connection to a database, the `create_engine()`
function is employed with a URL. For instance, the URL for connecting to a
PostgreSQL database would look like this:
“dialect+driver://username:password@host:port/database”.
import psycopg2
import pandas as pds
from sqlalchemy import create_engine
engine= create_engine('postgresql+psycopg2://test:@127.0.0.1',
pool_recycle=3600)
with engine.connect() as conn:
dataFrame = pds.read_sql("select * from tab1", conn)
If we need to load data to an SQL table you can easily achieve this using:
df = pd.read_excel('sample.xlsx')
df.to_sql('table_name', con=engine, if_exists='append', index= False)
Unit Tests:
Testing is a crucial component of data engineering development, particularly when
dealing with large and intricate systems for data processing. To ensure the
robustness of methods and prevent bugs, Python offers two popular testing
frameworks: unittest and pytest. While a detailed discussion of their differences is
beyond the scope here, ample resources are available online for those interested.
In the context of data engineering, let’s explore a few examples of how unittest
and pytest can be employed to fortify our testing strategies:
Unittest
Below you can see a few examples that test if get the expected result.
import unittest
class ExampleTestSuite(unittest.TestCase):
def test_import(self):
self.assertTrue(True)
def test_addition(self):
self.assertEqual(1 + 2, 3)
def test_subtraction(self):
self.assertNotEqual(1 - 2, 0)
if __name__ == '__main__':
unittest.main()
Pytest
Below you can see the same example for pytest
def test_import():
assert True
def test_addition():
assert 1 + 2 == 3
def test_subtraction():
assert 1 - 2 != 0
We can call this test using pytest command.
Mocking:
Mocking is a powerful technique, especially when dealing with external APIs in
data engineering. It enables us to simulate responses from APIs in dependent
functions, allowing for thorough testing without making actual API calls. Below,
you’ll find examples showcasing how to use mocking effectively in a data
engineering context:
# app.py
import requests
import json
def get_currency(url):
response = requests.get(url)
return response.text
def get_currency_rate():
url = "http://api.nbp.pl/api/exchangerates/rates/a/gbp/2023-11-02/"
currency = json.loads(get_currency(url))
return currency["rates"][0]["mid"]
if __name__ == "__main__":
print(get_currency_rate())
Mocking Unit Test:
import unittest
from unittest.mock import patch, MagicMock
from app import get_currency_rate, get_currency
class TestImports(unittest.TestCase):
text = '{"table": "A", "currency": "funt szterling", "code": "GBP",
"rates": [{"no": "212/A/NBP/2023", "effectiveDate": "2023-11-02", "mid":
5.1135}]}'
@patch("app.get_currency")
def test_import_currency(self, mock_get_currency):
mock_get_currency.return_value = self.text
self.assertEqual(get_currency_rate(), 5.1135)
@patch("app.requests")
def test_get_currency(self, mock_requests):
mock_response = MagicMock()
mock_response.status_code = 404
mock_requests.get.return_value = mock_response
self.assertEqual(get_currency("test"), '{"rates": [{"mid": ""}]}')
if __name__ == "__main__":
unittest.main()
Module Fixture — Data Frame testing
When working with data frames in libraries like Spark, Pandas, Polars, and
others, having input data for testing is essential. In Pytest, fixtures provide a
convenient way to prepare such data. A fixture is a function marked with the
@pytest.fixture decorator, which automatically executes and delivers test data
for a corresponding test function. Here’s an example demonstrating how to use
a fixture for testing data frames:
# app.py
import polars as pl
def filter_dataframe(df: pl.DataFrame) -> pl.DataFrame:
return df.filter(pl.col("col2") == "a")
def add_col_to_df(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns((pl.col("col1") ** 2).alias("col1_power"))
if __name__ == "__main__":
df = pl.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
print(add_col_to_df(df))
Test File:
# test_app.py
import pytest
import polars as pl
from app import filter_dataframe, add_col_to_df
@pytest.fixture
def test_dataframe() -> pl.DataFrame:
return pl.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
def test_filter_dataframe(test_dataframe):
df = filter_dataframe(test_dataframe)
rows_count_b = df.filter(pl.col("col2") ==
"b").select(pl.count()).item()
rows_count_c = df.filter(pl.col("col2") ==
"c").select(pl.count()).item()
assert rows_count_b == 0
assert rows_count_c == 0
def test_add_col_to_df(test_dataframe):
actual_df = add_col_to_df(test_dataframe)
assert actual_df.columns == ["col1", "col2", "col1_power"]
assert actual_df.rows() == [(1, "a", 1), (2, "b", 4), (3, "c", 9)]