Data Cleaning Python code:
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
glueContext = GlueContext(SparkContext.getOrCreate())
# Data Catalog: database and table name
db_name = "Redshift"
tbl_name = "Sales_report"
# S3 location for output
output_dir = "s3://glue-sample-target/output-dir/sales_report"
# Read data into a DynamicFrame using the Data Catalog metadata
medicare_dyf = glueContext.create_dynamic_frame.from_catalog(database = Redshift, table_name = Sales_report)
# The `provider id` field will be choice between long and string
# Cast choices into integers, those values that cannot cast result in null
sales_res = sales_dyf.resolveChoice(specs = [('provider id','cast:long')])
# Remove erroneous records
sales_df = sales_res.toDF()
sales_df = sales_df.where("`provider id` is NOT NULL")
# Apply a lambda to remove the '$'
chop_f = udf(lambda x: x[1:], StringType())
sales_df = sales_df.withColumn("ACC", chop_f(sales_df["average covered charges"])).withColumn("ATP",
chop_f(sales_df["average total payments"])).withColumn("AMP", chop_f(sales_df["average sales payments"]))
# Turn it back to a dynamic frame
sales_tmp = DynamicFrame.fromDF(sales_df, glueContext, "nested")
# Rename, cast, and nest with apply_mapping
sales_nest = sales_tmp.apply_mapping([('drg definition', 'string', 'drg', 'string'),
('id', 'long', 'provider.id', 'long'),
('name', 'string', 'provider.name', 'string'),
('city', 'string', 'provider.city', 'string'),
('state', 'string', 'provider.state', 'string'),
('zip code', 'long', 'provider.zip', 'long'),
('sales referral region description', 'string','rr', 'string'),
('ACC', 'string', 'charges.covered', 'double'),
('ATP', 'string', 'charges.total_pay', 'double'),
('AMP', 'string', 'charges.sales_pay', 'double')])
# Write it out in Parquet
glueContext.write_dynamic_frame.from_options(frame = sales_nest, connection_type = "s3", connection_options =
{"path": output_dir}, format = "parquet")
Join and relationalize of data:
# Copyright 2016-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
# catalog: database and table names
db_name = "Redshift"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"
# output s3 and temp directories
output_history_dir = "s3://glue-sample-target/output-dir/sales_history"
redshift_temp_dir = "s3://glue-sample-target/temp-dir/"
# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=Redshift, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=Redshift, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=Redshift, table_name=tbl_organization)
# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field('id', 'org_id').rename_field('name', 'org_name')
# Join the frames to create history
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id',
'organization_id').drop_fields(['person_id', 'org_id'])
# ---- Write out the history ----
# Write out the dynamic frame into parquet in "sales_history" directory
print("Writing to /legislator_history ...")
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options =
{"path": output_history_dir}, format = "parquet")
# Convert to data frame, write to directory "legislator_part", partitioned by (separate) Senate and House.
print("Writing to /legislator_part, partitioned by Senate and House ...")
l_history.toDF().write.parquet(output_lg_partitioned_dir, partitionBy=['org_name'])
# ---- Write out to relational databases ----
# Convert the data to flat tables
print("Converting to flat tables ...")
dfc = l_history.relationalize("hist_root", redshift_temp_dir)
# Cycle through and write to Redshift.
for df_name in dfc.keys():
m_df = dfc.select(df_name)
print("Writing to Redshift table: ", df_name, " ...")
glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, catalog_connection = "redshift3",
connection_options = {"dbtable": df_name, "database": "testdb"}, redshift_tmp_dir = redshift_temp_dir)
Data visualization using Python Scripts
1)Histogram:
import pandas as pd
import matplotlib.pyplot as plt
data = [['E001', 'M', 34, 123, 'Normal', 350],
['E002', 'F', 40, 114, 'Overweight', 450],
['E003', 'F', 37, 135, 'Obesity', 169],
['E004', 'M', 30, 139, 'Underweight', 189],
['E005', 'F', 44, 117, 'Underweight', 183],
['E006', 'M', 36, 121, 'Normal', 80],
['E007', 'M', 32, 133, 'Obesity', 166],
['E008', 'F', 26, 140, 'Normal', 120],
['E009', 'M', 32, 133, 'Normal', 75],
['E010', 'M', 36, 133, 'Underweight', 40] ]
df = pd.DataFrame(data, columns = ['EMPID', 'Gender',
'Age', 'Sales',
'BMI', 'Income'] )
df.hist()
plt.show()
Output
2)Column Chart:
data = [['E001', 'M', 34, 123, 'Normal', 350],
['E002', 'F', 40, 114, 'Overweight', 450],
['E003', 'F', 37, 135, 'Obesity', 169],
['E004', 'M', 30, 139, 'Underweight', 189],
['E005', 'F', 44, 117, 'Underweight', 183],
['E006', 'M', 36, 121, 'Normal', 80],
['E007', 'M', 32, 133, 'Obesity', 166],
['E008', 'F', 26, 140, 'Normal', 120],
['E009', 'M', 32, 133, 'Normal', 75],
['E010', 'M', 36, 133, 'Underweight', 40] ]
df = pd.DataFrame(data, columns = ['EMPID', 'Gender',
'Age', 'Sales',
'BMI', 'Income'] )
df.plot.bar()
plt.bar(df['Age'], df['Sales'])
plt.xlabel("Age")
plt.ylabel("Sales")
plt.show()
Output:
3) Scatter Plot:
import matplotlib.pyplot as plt
x_axis = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
y_axis = [5, 16, 34, 56, 32, 56, 32, 12, 76, 89]
plt.title("Prices over 10 years")
plt.scatter(x_axis, y_axis, color='darkblue', marker='x', label="item 1")
plt.xlabel("Time (years)")
plt.ylabel("Price (dollars)")
plt.grid(True)
plt.legend()
plt.show()
Output: