Stratascratch pyspark coding questions
Easy
1. Salaries Differences
# Import your libraries
from pyspark.sql.functions import col, max,abs
# Start writing code
df = db_employee.join(db_dept, db_employee['department_id'] == db_dept['id'], 'left')
df_mkt = df.filter(col('department') == 'marketing').select(max('salary').alias('mkt'))
df_eng = df.filter(col('department') == 'engineering').select(max('salary').alias('eng'))
df2 = df_mkt.join(df_eng).withColumn('salary_diff', abs(col('mkt') -
col('eng'))).select('salary_diff')
# ans = abs(df_mkt['max(salary)'] - df_eng['max(salary)'])
# ans_df = pd.DataFrame([ans], ['salary'])
# To validate your solution, convert your final pySpark df to a pandas df
df2.toPandas()
2. Finding Updated Records
# Import your libraries
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc,col
# Start writing code
win_spec = Window.partitionBy('first_name','last_name').orderBy(desc('salary'))
ms_employee_salary = ms_employee_salary.withColumn('ranks',
rank().over(win_spec)).filter(col('ranks') == 1).orderBy('id').drop('ranks').select('id',
'first_name','last_name', 'department_id', 'salary')
# To validate your solution, convert your final pySpark df to a pandas df
ms_employee_salary.toPandas()
3. Bikes Last Used
# Import your libraries
from pyspark.sql.functions import col, desc,rank
from pyspark.sql.window import Window
# Start writing code
win = Window.partitionBy('bike_number').orderBy(desc('end_time'))
dc_bikeshare_q1_2012 = dc_bikeshare_q1_2012.withColumn('rn', rank().over(win)).filter(col('rn')
==1).select('bike_number','end_time').sort(desc('end_time'))
# To validate your solution, convert your final pySpark df to a pandas df
dc_bikeshare_q1_2012.toPandas()
4. Reviews of Hotel Arena
# Import your libraries
from pyspark.sql.functions import col,count
# Start writing code
hotel_reviews = hotel_reviews.filter(col('hotel_name') == 'Hotel
Arena').groupBy('hotel_name','reviewer_score').agg(count('reviewer_score').alias('n_reviews')).
select('reviewer_score','hotel_name','n_reviews' )
# To validate your solution, convert your final pySpark df to a pandas df
hotel_reviews.toPandas()
5. Count the number of movies that Abigail Breslin nominated for Oscar
# Import your libraries
from pyspark.sql.functions import col, count,lower,countDistinct
# Start writing code
oscar_nominees = oscar_nominees.filter(col('nominee') == 'Abigail
Breslin').groupBy('nominee').agg(countDistinct('movie').alias('movie_cnt')).select('movie_cnt')
# oscar_nominees = oscar_nominees.filter(lower(col('nominee')).like('%abigail%'))
# To validate your solution, convert your final pySpark df to a pandas df
oscar_nominees.toPandas()
6. Find all posts which were reacted to with a heart
# Import your libraries
from pyspark.sql.functions import col
# Start writing code
facebook_reactions = facebook_reactions.filter(col('reaction') == 'heart')
facebook_posts = facebook_posts.join(facebook_reactions, facebook_posts['post_id'] ==
facebook_reactions['post_id'], 'inner').select(facebook_posts['*']).distinct()
# facebook_posts = facebook_posts.select('post_id', 'poster', 'post_text', 'post_keywords',
'post_date')
# To validate your solution, convert your final pySpark df to a pandas df
facebook_posts.toPandas()
7. Popularity of Hack
# Import your libraries
import pyspark
# Start writing code
df = facebook_employees.join(facebook_hack_survey,
facebook_employees['id']==facebook_hack_survey['employee_id'],'left')
df = df.groupBy('location').avg('popularity')
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
8. Lyft Driver Wages
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
lyft_drivers = lyft_drivers.filter((col('yearly_salary')<=30000) | (col('yearly_salary')>=70000))
# To validate your solution, convert your final pySpark df to a pandas df
lyft_drivers.toPandas()
9. Find how many times each artist appeared on the Spotify ranking list
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
spotify_worldwide_daily_song_ranking =
spotify_worldwide_daily_song_ranking.groupBy('artist').agg(count('*').alias('n_occurences')).sor
t(desc('n_occurences'))
# To validate your solution, convert your final pySpark df to a pandas df
spotify_worldwide_daily_song_ranking.toPandas()
10. Find the base pay for Police Captains
# Import your libraries
from pyspark.sql.functions import col
# Start writing code
sf_public_salaries = sf_public_salaries.filter(col('jobtitle') == "CAPTAIN III (POLICE
DEPARTMENT)").select('employeename','basepay')
# To validate your solution, convert your final pySpark df to a pandas df
sf_public_salaries.toPandas()
11. Find libraries who haven't provided the email address in circulation year 2016 but their notice
preference definition is set to email
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
library_usage = library_usage.filter((col('notice_preference_definition') == 'email') &
(col('provided_email_address') == False) & (col('circulation_active_year') ==
'2016')).select('home_library_code').dropDuplicates()
# To validate your solution, convert your final pySpark df to a pandas df
library_usage.toPandas()
12. Average Salaries
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Start writing code
win_spec = Window.partitionBy('department')
employee = employee.withColumn('avg_sal', avg('salary').over(win_spec)).select('department',
'first_name', 'salary','avg_sal')
# To validate your solution, convert your final pySpark df to a pandas df
employee.toPandas()
13. Order Details
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
customers = customers.filter(col('first_name').isin(['Jill','Eva']))
customers = customers.join(orders,customers['id'] == orders['cust_id'],'left')
# customers = customers.filter(col('first_name').isin(['Jill','Eva']))
customers =
customers.orderBy('cust_id').select('order_date','order_details','total_order_cost','first_name')
# To validate your solution, convert your final pySpark df to a pandas df
customers.toPandas()
14. Customer Details
# Import your libraries
import pyspark
# Start writing code
customers = customers.join(orders, customers['id'] == orders['cust_id'],'left')
customers =
customers.select('first_name','last_name','city','order_details').orderBy(['first_name','order_det
ails'])
# To validate your solution, convert your final pySpark df to a pandas df
customers.toPandas()
15. Number of Workers by Department Starting in April or Later
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
worker =
worker.filter(month('joining_date')>=4).groupBy('department').agg(countDistinct('worker_id'))
# To validate your solution, convert your final pySpark df to a pandas df
worker.toPandas()
16. Admin Department Employees Beginning in April or Later
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
worker = worker.filter(month('joining_date')
>=4).filter(col('department')=='Admin').agg(count('*'))
# To validate your solution, convert your final pySpark df to a pandas df
worker.toPandas()
17. Churro Activity Date
# Import your libraries
from pyspark.sql.functions import col
# Start writing code
los_angeles_restaurant_health_inspections =
los_angeles_restaurant_health_inspections.filter((col('facility_name') == 'STREET CHURROS') &
(col('score') < 95)).select('activity_date','pe_description')
# To validate your solution, convert your final pySpark df to a pandas df
los_angeles_restaurant_health_inspections.toPandas()
18. Find the most profitable company in the financial sector of the entire world along with its
continent
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
forbes_global_2010_2014 = forbes_global_2010_2014.filter(col('sector') ==
'Financials').orderBy(desc('profits')).select('company','continent').limit(1)
# To validate your solution, convert your final pySpark df to a pandas df
forbes_global_2010_2014.toPandas()
19. Count the number of user events performed by MacBookPro users
# Import your libraries
from pyspark.sql.functions import col,desc
# Start writing code
playbook_events = playbook_events.filter(col('device')=='macbook pro')
playbook_events = playbook_events.groupBy(col('event_name')).count().sort(desc('count'))
# To validate your solution, convert your final pySpark df to a pandas df
playbook_events.toPandas()
20. Number Of Bathrooms And Bedrooms
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
airbnb_search_details =
airbnb_search_details.groupBy('city','property_type').agg(avg('bedrooms').alias('n_bedrooms_a
vg'), avg('bathrooms').alias('n_bathrooms_avg'))
# To validate your solution, convert your final pySpark df to a pandas df
airbnb_search_details.toPandas()
21. Most Lucrative Products
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Start writing code
online_orders = online_orders.withColumn('Revenue', col('cost_in_dollars')*col('units_sold'))
online_orders = online_orders.filter(year('date') == '2022').filter(month('date').between(1,6))
online_orders = online_orders.groupBy('product_id').agg(sum('Revenue').alias('Total'))
win_spec = Window.orderBy(desc('Total'))
online_orders = online_orders.withColumn('rnk',
dense_rank().over(win_spec)).filter(col('rnk')<6).drop('rnk')
# To validate your solution, convert your final pySpark df to a pandas df
online_orders.toPandas()
22. Number of Shipments Per Month
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
amazon_shipment = amazon_shipment.withColumn('Shipment', concat('shipment_id',
'sub_id')).withColumn('year_month', date_format('shipment_date', 'yyyy-
MM')).groupBy('year_month').agg(count('Shipment').alias('n_ship')).select('year_month',
'n_ship')
# To validate your solution, convert your final pySpark df to a pandas df
amazon_shipment.toPandas()
23. Unique Users Per Client Per Month
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
fact_events = fact_events.groupBy('client_id',month('time_id')).agg(countDistinct('user_id'))
# To validate your solution, convert your final pySpark df to a pandas df
fact_events.toPandas()
MEDIUM
1. Most Profitable Companies
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Start writing code
wins = Window.orderBy(desc('profits'))
df = forbes_global_2010_2014.withColumn('rnk',
dense_rank().over(wins)).filter('rnk<4').select('company', 'profits')
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
2. Activity Rank
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
win = Window.orderBy(desc('sent'), 'from_user')
# Start writing code
google_gmail_emails = google_gmail_emails.groupBy('from_user').agg(count('*').alias('sent'))
google_gmail_emails = google_gmail_emails.withColumn('rn', dense_rank().over(win))
# To validate your solution, convert your final pySpark df to a pandas df
google_gmail_emails.toPandas()
3. Finding User Purchases
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
wins = Window.partitionBy('user_id').orderBy('created_at')
# Start writing code
amazon_transactions = amazon_transactions.withColumn('next_purchase',
lead('created_at').over(wins))
df = amazon_transactions.withColumn('diff', datediff('next_purchase',
'created_at')).filter('diff<=7').select('user_id').distinct()
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
4. New Products
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df1 = car_launches.filter(col('year') ==
'2019').groupBy('company_name').agg(count('product_name').alias('p_19'))
df2 = car_launches.filter(col('year') ==
'2020').groupBy('company_name').agg(count('product_name').alias('p_20'))
df3 = df1.join(df2, ['company_name'],'inner').withColumn('net_p', col('p_20') -
col('p_19')).select('company_name', 'net_p')
car_launches
print(df1.count())
print(df2.count())
# To validate your solution, convert your final pySpark df to a pandas df
df3.toPandas()
5. Top Percentile Fraud
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
wins = Window.partitionBy('state').orderBy(desc('fraud_score'))
w2 = Window.partitionBy('state')
# Start writing code
df = fraud_score.withColumn('cnt', count('fraud_score').over(w2)) \
.withColumn('rnk', rank().over(wins)) \
.withColumn('pct', 1.0*col('rnk')/col('cnt')).filter('pct<0.06').select('policy_num',
'state','claim_cost','fraud_score').distinct()
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
6. Acceptance Rate By Date
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
request_sent = fb_friend_requests.filter(col('action')=='sent')
df = fb_friend_requests.filter(col('action')=='accepted').withColumnRenamed('date',
'accepted_date').withColumnRenamed('action', 'accepted')
df2 = df.join(request_sent, ['user_id_sender','user_id_receiver'], 'right').orderBy('date')
# df = fb_friend_requests.withColumn('act', when(col('action')=='sent', 0).otherwise(1))
df2 = df2.groupBy('date').agg(count('accepted_date').alias('accep'), count('action').alias('total'))
# fb_friend_requests
df2 = df2.withColumn('rate', 1.0*col('accep')/col('total')).select('date', 'rate')
# To validate your solution, convert your final pySpark df to a pandas df
df2.toPandas()
7. Popularity Percentage
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
wins = Window.orderBy('user1')
# Start writing code
# facebook_friendf_cnt = facebook_friends.withColumn('cnt', expr('count(distinct user1)'))
df = facebook_friends.select('*')
df_u = df.select('user1','user2').union(df.select('user2','user1')).select('user1',
'user2').toDF('user', 'frnd')
total = df_u.select('user').distinct().count()
df2 = df_u.groupBy('user').agg(countDistinct('frnd').alias('frnd'))
df2 = df2.withColumn('pct', round((100*col('frnd')/total),3)).sort('user').drop('frnd')
# To validate your solution, convert your final pySpark df to a pandas df
df2.toPandas()
8. Ranking Most Active Guests
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
wins = Window.orderBy(desc('n_messages'))
# Start writing code
df = airbnb_contacts.groupBy('id_guest').agg(sum('n_messages').alias('n_messages'))
df = df.withColumn('rnk', dense_rank().over(wins))
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
9. Spam Posts
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
win = Window.partitionBy('post_date').orderBy('post_date')
# Start writing code
facebook_posts = facebook_posts.join(facebook_post_views,['post_id'],'inner')
facebook_posts = facebook_posts.withColumn('spam',
when(col('post_keywords').contains('spam'),1).otherwise(0))
facebook_posts = facebook_posts.groupBy('post_date').agg(sum('spam').alias('spam_cnt'),
count('spam').alias('total'))
facebook_posts = facebook_posts.withColumn('spam_share',
100*col('spam_cnt')/col('total')).select('post_date','spam_share')
# To validate your solution, convert your final pySpark df to a pandas df
facebook_posts.toPandas()
10. Find the percentage of shipable orders
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
# tot = customers.count()
# customers = customers.filter(col('address').isNotNull())
orders = orders.join(customers, orders['cust_id'] == customers['id'], 'left')
tot = orders.count()
o2 = orders.filter(col('address').isNotNull()).count()
res = 100*o2/tot
res
# To validate your solution, convert your final pySpark df to a pandas df
# orders.toPandas()
11. Income By Title and Gender
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
sf_bonus = sf_bonus.groupBy('worker_ref_id').agg(sum('bonus').alias('bonus'))
sf_employee = sf_employee.join(sf_bonus, sf_employee['id']== sf_bonus['worker_ref_id'] ,
'right')
sf_employee = sf_employee.fillna({'bonus':0})
sf_employee = sf_employee.withColumn('total', (col('salary') + col('bonus')))
sf_employee = sf_employee.groupBy('employee_title','sex').agg(avg('total'))
# To validate your solution, convert your final pySpark df to a pandas df
sf_employee.toPandas()
12. Highest Energy Consumption
# Import your libraries
from pyspark.sql.functions import *
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
# Start writing code
df_lst = [fb_asia_energy,fb_eu_energy,fb_na_energy]
df = reduce(DataFrame.unionAll, df_lst)
df = df.groupBy('date').agg(sum('consumption').alias('consumption'))
wins = Window.orderBy(desc('consumption'))
df = df.withColumn('rnk', dense_rank().over(wins)).filter(col('rnk')==1).drop('rnk')
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
13. Reviews of Categories
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df = yelp_business.withColumn('categories', explode(split('categories', ';')))
df =
df.groupBy('categories').agg(sum('review_count').alias('total_reviews')).sort(desc('total_reviews'
))
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
14. Top 5 States With 5 Star Businesses
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
wins = Window.orderBy(desc('start_bus'))
# Start writing code
df = yelp_business.filter('stars = 5')
df = df.groupBy('state').agg(count('business_id').alias('start_bus'))
df = df.withColumn('rnk', rank().over(wins)).filter('rnk<6').drop('rnk').orderBy(desc('start_bus'),
'state')
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
15. Find all wineries which produce wines by possessing aromas of plum, cherry, rose, or hazelnut
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df =
winemag_p1.filter(lower(col('description')).rlike(r'\bplum\b|\bcherry\b|\brose\b|\bhazelnut\b'
)).select('winery').distinct()
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
16. Highest Cost Orders
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Start writing code
wins = Window.orderBy(desc('toc'))
df = customers.join(orders, customers['id'] == orders['cust_id'], 'left')
df = df.filter('order_date BETWEEN "2019-02-01" AND "2019-05-01"')
df = df.groupBy('first_name','order_date').agg(sum('total_order_cost').alias('toc'))
df = df.withColumn('rnk',dense_rank().over(wins)).filter('rnk=1').drop('rnk')
# Start writing code
# customers
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
17. Highest Target Under Manager
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
wins = Window.orderBy(desc('target'))
# Start writing code
salesforce_employees = salesforce_employees.filter(col('manager_id') == 13)
salesforce_employees = salesforce_employees.withColumn('rnk',
rank().over(wins)).filter(col('rnk')==1).select('first_name','target')
# To validate your solution, convert your final pySpark df to a pandas df
salesforce_employees.toPandas()
18. Highest Salary In Department
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Start writing code
wins = Window.partitionBy('department').orderBy(desc('salary'))
employee = employee.withColumn('rnk',
dense_rank().over(wins)).filter('rnk=1').select('department','first_name', 'salary')
# To validate your solution, convert your final pySpark df to a pandas df
employee.toPandas()
19. Employee and Manager Salaries
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df = employee.alias('emp').join(employee.alias('mng'), col('emp.manager_id') == col('mng.id'),
'left').filter('emp.salary>mng.salary').select('emp.first_name', 'emp.salary')
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
20. Second Highest Salary
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Start writing code
win_spec = Window.orderBy(desc('salary'))
employee = employee.withColumn('rnk',
dense_rank().over(win_spec)).filter(col('rnk')==2).select('salary')
# To validate your solution, convert your final pySpark df to a pandas df
employee.toPandas()
21. Find the number of times each word appears in drafts
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df =
google_file_store.filter(col('filename').like('%draft%')).agg(collect_list('contents').alias('contents'
))
df = df.withColumn('contents', concat_ws(' ', 'contents'))
df = df.withColumn('contents', regexp_replace('contents',r'[.|,]', ''))
df = df.withColumn('words', explode(split('contents', ' ')))
df = df.groupBy('words').agg(count('words').alias('words'))
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
22. Counting Instances in Text
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df = google_file_store.agg(collect_list('contents').alias('contents'))
df = df.withColumn('contents', concat_ws(' ', 'contents'))
df = df.withColumn('contents', regexp_replace('contents',r'[.|,]', ''))
df = df.withColumn('words', explode(split('contents', ' ')))
df = df.groupBy('words').agg(count('words').alias('words_cnt'))
df = df.filter(col('words').isin('bull', 'bear'))
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
23. Customer Revenue In March
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
orders = orders.filter(date_format('order_date', 'yyyy-MM') == '2019-03')
orders =
orders.groupBy('cust_id').agg(sum('total_order_cost').alias('Revenue')).sort(desc('Revenue'))
# To validate your solution, convert your final pySpark df to a pandas df
orders.toPandas()
24. Find the rate of processed tickets for each type
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df = facebook_complaints.withColumn("processed", col('processed').cast('integer'))
df = df.groupBy('type').agg(sum('processed').alias('process'),
count('processed').alias('total')).withColumn('rate', 1.0*col('process')/col('total')).select('type',
'rate')
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
25. Number of violations
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df = sf_restaurant_health_violations.filter(col('business_name') == 'Roxanne
Cafe').withColumn('Year' , year('inspection_date'))
df = df.groupBy('Year').agg(count('violation_id').alias('cnt')).sort('Year')
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
26. Classify Business Type
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df = sf_restaurant_health_violations.withColumn('Business_type',
when(
(lower(col('business_name')).contains('restaurant')),'restaurant').when(
((lower(col('business_name')).contains('cafe'))
|(lower(col('business_name')).contains('café'))|(lower(col('business_name')).contains('coffee')))
,'cafe').when(
(lower(col('business_name')).contains('school')),'school').otherwise('other')
)
df = df.select('business_name','Business_type').distinct()
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
27. Find students with a median writing score
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
win = Window.orderBy('sat_writing')
df = sat_scores.withColumn('rn', row_number().over(win)).filter(col('rn') ==
'68').select('sat_writing').first()[0]
# Start writing code
sat_scores = sat_scores.filter(col('sat_writing').isin(df)).select(col('student_id'))
print(sat_scores.count())
# To validate your solution, convert your final pySpark df to a pandas df
sat_scores.toPandas()
28. User with Most Approved Flags
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Start writing code
# flag_review = flag_review.filter(col('reviewed_outcome') == 'APPROVED')
user_flags = user_flags.join(flag_review, ['flag_id'], 'left').filter(col('reviewed_outcome') ==
'APPROVED')
user_flags = user_flags.withColumn('names', concat(col('user_firstname'), lit('
'),col('user_lastname'))).groupBy('names').agg(countDistinct('video_id').alias('fid')).withColumn('
rnk', dense_rank().over(Window.orderBy(desc('fid')))).filter(col('rnk')==1).select('names')
# user_flags = user_flags.filter((col('user_firstname')== 'Richard') & (col('user_lastname') ==
'Hasson'))
# To validate your solution, convert your final pySpark df to a pandas df
user_flags.toPandas()
29. Flags per Video
# Import your libraries
from pyspark.sql.functions import *
# Start writing code Ct6BUPvE2sM
df = user_flags.withColumn('name',concat_ws(' ', col('user_firstname'),col('user_lastname')))
df2 = df.withColumn('flg', when(col('flag_id').isNotNull(),1).otherwise(0)).filter(col('flg') == 1)
df2 = df2.groupBy('video_id').agg(countDistinct('name').alias('n_users'))
user_flags
# To validate your solution, convert your final pySpark df to a pandas df
df2.toPandas()
30. Election Results
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
win = Window.partitionBy('voter').orderBy('voter')
# Start writing code
voting_results = voting_results.filter(col('candidate').isNotNull())
df = voting_results.withColumn('rnk', row_number().over(win)).select('voter', 'rnk')
df = df.groupBy('voter').agg(max('rnk').alias('n_vote')).withColumn('votes', 1.0*1/col('n_vote'))
df2 =
df.join(voting_results,['voter'],'right').groupBy('candidate').agg(sum('votes').alias('total_votes')).
orderBy(desc('total_votes'))
win2 = Window.orderBy(desc('total_votes'))
df2 = df2.withColumn('rnk', dense_rank().over(win2)).filter('rnk<2').select('candidate')
# To validate your solution, convert your final pySpark df to a pandas df
df2.toPandas()
HARD
1. Monthly Percentage Difference
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
wins = Window.orderBy('month_col')
# Start writing code
df = sf_transactions.withColumn('month_col', date_format('created_at', 'yyyy-MM'))
df = df.groupBy('month_col').agg(sum('value').alias('revenue')).sort('month_col')
df = df.withColumn('prev_rev', lag('revenue',1).over(wins))
df = df.withColumn('pct', round((100*(col('revenue') -
col('prev_rev'))/col('prev_rev')),2)).drop('revenue', 'prev_rev')
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
2. Premium vs Freemium
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Start writing code
df = ms_user_dimension.join(ms_acc_dimension, ['acc_id'], 'inner').join(ms_download_facts,
['user_id'], 'inner')
df = df.groupBy('date').pivot('paying_customer').sum('downloads')
df = df.filter(col('no')>col('yes'))
# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()
3. City With Most Amenities
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df = airbnb_search_details.withColumn('amenities', regexp_replace('amenities', r'[{|}]', ''))
df = df.withColumn('amenities', explode(split('amenities',',')))
df = df.groupBy('city').agg(count('amenities').alias('city_cnt')).sort(desc('city_cnt'))
df2 = df.select(max('city_cnt')).collect()
df3 = df.filter(col('city_cnt') == df2[0][0]).select('city')
# To validate your solution, convert your final pySpark df to a pandas df
df3.toPandas()
4. Host Popularity Rental Prices
# Import your libraries
from pyspark.sql.functions import *
# Start writing code
df = airbnb_host_searches.fillna(0,['review_scores_rating'])
df = df.withColumn('host_id', concat('price', 'room_type', 'host_since', 'number_of_reviews'))
df = df.select('host_id', 'number_of_reviews', 'price').dropDuplicates()
df = df.withColumn('number_of_reviews', col('number_of_reviews').cast('int'))
df = df.withColumn('host_review_pop', when(col('number_of_reviews')==0,
'New').when(col('number_of_reviews').between(1,5),
'Rising').when(col('number_of_reviews').between(6,15), 'Trending
Up').when(col('number_of_reviews').between(16,40), 'Popular').otherwise('Hot'))
df2 =
df.groupBy('host_review_pop').agg(min('price').alias('min_price'),avg('price').alias('avg_price'),
max('price').alias('max_price'))
# df =
df.filter(col('review_scores_rating').between(6,15)).select('review_scores_rating').distinct()
# To validate your solution, convert your final pySpark df to a pandas df
df2.toPandas()
5. Retention Rate
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# wins = Window.partitionBy('date_month', '')
# Start writing code
df = sf_events.withColumn('date_month', date_format('date', 'yyyy-MM'))
# df = df.filter()
df_dec = df.filter(col('date_month').like('%2020-12%')).select('account_id',
'user_id').dropDuplicates()
df_jan = df.filter(col('date_month').like('%2021-01%')).select('account_id',
'user_id').dropDuplicates()
df_feb = df.filter(col('date_month').like('%2021-02%')).select('account_id',
'user_id').dropDuplicates()
max_date = df.groupBy('user_id').agg(max('date_month').alias('max_date'))
df_dec2 = df_dec.join(max_date, ['user_id'], 'inner')
dec_ret = df_dec2.withColumn('retention', when(col('max_date')>'2020-12', 1).otherwise(0))
dec_ret = dec_ret.groupBy('account_id').agg(mean('retention').alias('dec_retention'))
df_jan2 = df_jan.join(max_date, ['user_id'], 'inner')
jan_ret = df_jan2.withColumn('retention', when(col('max_date')>'2021-01', 1).otherwise(0))
jan_ret = jan_ret.groupBy('account_id').agg(mean('retention').alias('jan_retention'))
df_retention = dec_ret.join(jan_ret, ['account_id'], 'inner')
df_retention2 = df_retention.withColumn('retention',
1.0*col('jan_retention')/col('dec_retention')).select('account_id', 'retention')
# To validate your solution, convert your final pySpark df to a pandas df
df_retention2.toPandas()
6. The Most Popular Client_Id Among Users Using Video and Voice Calls
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window
event_type_msg = ['video call received', 'video call sent', 'voice call received', 'voice call sent']
# Start writing code
df = fact_events.withColumn('flag',
when(col('event_type').isin(event_type_msg),1).otherwise(0))
df2 = df.groupBy('user_id').agg(count('flag').alias('cnt_us'),sum('flag').alias('sum_us'))
# df = df.filter('flag = 1').groupBy('client_id').agg(count('user_id').alias('cnt_us'))
df2 = df2.withColumn('pct', 100*(1.0*col('sum_us')/col('cnt_us'))).filter('pct>=50')
final_df = fact_events.join(df2, ['user_id'], 'inner').select(df2['*'], fact_events.client_id)
final_df = final_df.groupBy('client_id').agg(count('*').alias('cnt'))
final_df2 = final_df.select(max('cnt')).collect()
finaldf = final_df.filter(col('cnt') == final_df2[0][0]).select('client_id')
# To validate your solution, convert your final pySpark df to a pandas df
finaldf.toPandas()
THANK YOU