Skip to content

Commit 89e67bc

Browse files
authored
Data cleaning region tags (GoogleCloudPlatform#4785)
* add region tags * remove whitespace * add mising region tag, fix whitespcae * remove gender * remove collect * update gender column removal, switch args position * fix lint * add region tag, address brad comments, fix test * remove timeout * address brad comment
1 parent 6a5fc80 commit 89e67bc

File tree

2 files changed

+44
-39
lines changed

2 files changed

+44
-39
lines changed

data-science-onramp/data-cleaning/clean.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from pyspark.sql.types import FloatType, IntegerType, StringType
1111

1212

13+
# [START datascienceonramp_tripdurationudf]
1314
def trip_duration_udf(duration):
1415
"""Convert trip duration to seconds. Return None if negative."""
1516
if not duration:
@@ -33,11 +34,17 @@ def trip_duration_udf(duration):
3334
return int(time)
3435

3536

37+
# [END datascienceonramp_tripdurationudf]
38+
39+
# [START datascienceonramp_stationnameudf]
3640
def station_name_udf(name):
3741
"""Replaces '/' with '&'."""
3842
return name.replace("/", "&") if name else None
3943

4044

45+
# [END datascienceonramp_stationnameudf]
46+
47+
# [START datascienceonramp_usertypeudf]
4148
def user_type_udf(user):
4249
"""Converts user type to 'Subscriber' or 'Customer'."""
4350
if not user:
@@ -49,17 +56,10 @@ def user_type_udf(user):
4956
return "Customer"
5057

5158

52-
def gender_udf(gender):
53-
"""Converts gender to 'Male' or 'Female'."""
54-
if not gender:
55-
return None
56-
57-
if gender.lower().startswith("m"):
58-
return "Male"
59-
elif gender.lower().startswith("f"):
60-
return "Female"
59+
# [END datascienceonramp_usertypeudf]
6160

6261

62+
# [START datascienceonramp_stationlocationudf]
6363
def angle_udf(angle):
6464
"""Converts DMS notation to degrees. Return None if not in DMS or degrees notation."""
6565
if not angle:
@@ -74,6 +74,9 @@ def angle_udf(angle):
7474
return float(degrees[0])
7575

7676

77+
# [END datascienceonramp_stationlocationudf]
78+
79+
# [START datascienceonramp_timeconvertudf]
7780
def compute_time(duration, start, end):
7881
"""Calculates duration, start time, and end time from each other if one value is null."""
7982
time_format = "%Y-%m-%dT%H:%M:%S"
@@ -94,15 +97,17 @@ def compute_time(duration, start, end):
9497
if duration:
9598
# Convert to timedelta
9699
duration = datetime.timedelta(seconds=duration)
97-
100+
# [END datascienceonramp_timeconvertudf]
101+
# [START datascienceonramp_timemissingvalueudf]
98102
# Calculate missing value
99103
if start and end and not duration:
100104
duration = end - start
101105
elif duration and end and not start:
102106
start = end - duration
103107
elif duration and start and not end:
104108
end = start + duration
105-
109+
# [END datascienceonramp_timemissingvalueudf]
110+
# [START datascienceonramp_timereturnudf]
106111
# Transform to primitive types
107112
if duration:
108113
duration = int(duration.total_seconds())
@@ -114,6 +119,9 @@ def compute_time(duration, start, end):
114119
return (duration, start, end)
115120

116121

122+
# [END datascienceonramp_timereturnudf]
123+
124+
# [START datascienceonramp_timehelperudf]
117125
def compute_duration_udf(duration, start, end):
118126
"""Calculates duration from start and end time if null."""
119127
return compute_time(duration, start, end)[0]
@@ -129,9 +137,12 @@ def compute_end_udf(duration, start, end):
129137
return compute_time(duration, start, end)[2]
130138

131139

140+
# [END datascienceonramp_timehelperudf]
141+
142+
# [START datascienceonramp_sparksession]
132143
if __name__ == "__main__":
133-
TABLE = sys.argv[1]
134-
BUCKET_NAME = sys.argv[2]
144+
BUCKET_NAME = sys.argv[1]
145+
TABLE = sys.argv[2]
135146

136147
# Create a SparkSession, viewable via the Spark UI
137148
spark = SparkSession.builder.appName("data_cleaning").getOrCreate()
@@ -142,13 +153,20 @@ def compute_end_udf(duration, start, end):
142153
except Py4JJavaError as e:
143154
raise Exception(f"Error reading {TABLE}") from e
144155

156+
# [END datascienceonramp_sparksession]
157+
158+
# [START datascienceonramp_removecolumn]
159+
# remove unused column
160+
df = df.drop("gender")
161+
# [END datascienceonramp_removecolumn]
162+
163+
# [START datascienceonramp_sparksingleudfs]
145164
# Single-parameter udfs
146165
udfs = {
147166
"start_station_name": UserDefinedFunction(station_name_udf, StringType()),
148167
"end_station_name": UserDefinedFunction(station_name_udf, StringType()),
149168
"tripduration": UserDefinedFunction(trip_duration_udf, IntegerType()),
150169
"usertype": UserDefinedFunction(user_type_udf, StringType()),
151-
"gender": UserDefinedFunction(gender_udf, StringType()),
152170
"start_station_latitude": UserDefinedFunction(angle_udf, FloatType()),
153171
"start_station_longitude": UserDefinedFunction(angle_udf, FloatType()),
154172
"end_station_latitude": UserDefinedFunction(angle_udf, FloatType()),
@@ -157,7 +175,8 @@ def compute_end_udf(duration, start, end):
157175

158176
for name, udf in udfs.items():
159177
df = df.withColumn(name, udf(name))
160-
178+
# [END datascienceonramp_sparksingleudfs]
179+
# [START datascienceonramp_sparkmultiudfs]
161180
# Multi-parameter udfs
162181
multi_udfs = {
163182
"tripduration": {
@@ -176,10 +195,12 @@ def compute_end_udf(duration, start, end):
176195

177196
for name, obj in multi_udfs.items():
178197
df = df.withColumn(name, obj["udf"](*obj["params"]))
179-
198+
# [END datascienceonramp_sparkmultiudfs]
199+
# [START datascienceonramp_displaysamplerows]
180200
# Display sample of rows
181201
df.show(n=20)
182-
202+
# [END datascienceonramp_displaysamplerows]
203+
# [START datascienceonramp_writetogcs]
183204
# Write results to GCS
184205
if "--dry-run" in sys.argv:
185206
print("Data will not be uploaded to GCS")
@@ -222,3 +243,4 @@ def compute_end_udf(duration, start, end):
222243
print(
223244
"Data successfully uploaded to " + "gs://" + BUCKET_NAME + "/" + final_path
224245
)
246+
# [END datascienceonramp_writetogcs]

data-science-onramp/data-cleaning/clean_test.py

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@
4949
},
5050
},
5151
}
52-
DATAPROC_JOB = { # Dataproc job configuration
52+
DATAPROC_JOB = { # Dataproc job configuration
5353
"placement": {"cluster_name": DATAPROC_CLUSTER},
5454
"pyspark_job": {
5555
"main_python_file_uri": f"gs://{BUCKET_NAME}/{BUCKET_BLOB}",
56-
"args": [BQ_TABLE, BUCKET_NAME, "--dry-run"],
56+
"args": [BUCKET_NAME, BQ_TABLE, "--dry-run"],
5757
"jar_file_uris": ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"],
5858
},
5959
}
@@ -70,12 +70,9 @@ def setup_and_teardown_table():
7070
# Load table from dataframe
7171
df = pd.read_csv(CSV_FILE)
7272
job_config = bigquery.LoadJobConfig(
73-
autodetect=True,
74-
write_disposition="WRITE_TRUNCATE"
75-
)
76-
operation = bq_client.load_table_from_dataframe(
77-
df, BQ_TABLE, job_config=job_config
73+
autodetect=True, write_disposition="WRITE_TRUNCATE"
7874
)
75+
operation = bq_client.load_table_from_dataframe(df, BQ_TABLE, job_config=job_config)
7976

8077
# Wait for job to complete
8178
operation.result()
@@ -108,8 +105,7 @@ def setup_and_teardown_cluster():
108105
operation = cluster_client.delete_cluster(
109106
project_id=PROJECT_ID,
110107
region=CLUSTER_REGION,
111-
cluster_name=DATAPROC_CLUSTER,
112-
timeout=300
108+
cluster_name=DATAPROC_CLUSTER
113109
)
114110
operation.result()
115111

@@ -172,20 +168,7 @@ def test_clean():
172168

173169
# gender
174170
assert not is_in_table("M", out)
175-
assert not is_in_table("m", out)
176-
assert not is_in_table("male", out)
177-
assert not is_in_table("MALE", out)
178171
assert not is_in_table("F", out)
179-
assert not is_in_table("f", out)
180-
assert not is_in_table("female", out)
181-
assert not is_in_table("FEMALE", out)
182-
assert not is_in_table("U", out)
183-
assert not is_in_table("u", out)
184-
assert not is_in_table("unknown", out)
185-
assert not is_in_table("UNKNOWN", out)
186-
187-
assert is_in_table("Male", out)
188-
assert is_in_table("Female", out)
189172

190173
# customer plan
191174
assert not is_in_table("subscriber", out)

0 commit comments

Comments
 (0)