10
10
from pyspark .sql .types import FloatType , IntegerType , StringType
11
11
12
12
13
+ # [START datascienceonramp_tripdurationudf]
13
14
def trip_duration_udf (duration ):
14
15
"""Convert trip duration to seconds. Return None if negative."""
15
16
if not duration :
@@ -33,11 +34,17 @@ def trip_duration_udf(duration):
33
34
return int (time )
34
35
35
36
37
+ # [END datascienceonramp_tripdurationudf]
38
+
39
+ # [START datascienceonramp_stationnameudf]
36
40
def station_name_udf (name ):
37
41
"""Replaces '/' with '&'."""
38
42
return name .replace ("/" , "&" ) if name else None
39
43
40
44
45
+ # [END datascienceonramp_stationnameudf]
46
+
47
+ # [START datascienceonramp_usertypeudf]
41
48
def user_type_udf (user ):
42
49
"""Converts user type to 'Subscriber' or 'Customer'."""
43
50
if not user :
@@ -49,17 +56,10 @@ def user_type_udf(user):
49
56
return "Customer"
50
57
51
58
52
- def gender_udf (gender ):
53
- """Converts gender to 'Male' or 'Female'."""
54
- if not gender :
55
- return None
56
-
57
- if gender .lower ().startswith ("m" ):
58
- return "Male"
59
- elif gender .lower ().startswith ("f" ):
60
- return "Female"
59
+ # [END datascienceonramp_usertypeudf]
61
60
62
61
62
+ # [START datascienceonramp_stationlocationudf]
63
63
def angle_udf (angle ):
64
64
"""Converts DMS notation to degrees. Return None if not in DMS or degrees notation."""
65
65
if not angle :
@@ -74,6 +74,9 @@ def angle_udf(angle):
74
74
return float (degrees [0 ])
75
75
76
76
77
+ # [END datascienceonramp_stationlocationudf]
78
+
79
+ # [START datascienceonramp_timeconvertudf]
77
80
def compute_time (duration , start , end ):
78
81
"""Calculates duration, start time, and end time from each other if one value is null."""
79
82
time_format = "%Y-%m-%dT%H:%M:%S"
@@ -94,15 +97,17 @@ def compute_time(duration, start, end):
94
97
if duration :
95
98
# Convert to timedelta
96
99
duration = datetime .timedelta (seconds = duration )
97
-
100
+ # [END datascienceonramp_timeconvertudf]
101
+ # [START datascienceonramp_timemissingvalueudf]
98
102
# Calculate missing value
99
103
if start and end and not duration :
100
104
duration = end - start
101
105
elif duration and end and not start :
102
106
start = end - duration
103
107
elif duration and start and not end :
104
108
end = start + duration
105
-
109
+ # [END datascienceonramp_timemissingvalueudf]
110
+ # [START datascienceonramp_timereturnudf]
106
111
# Transform to primitive types
107
112
if duration :
108
113
duration = int (duration .total_seconds ())
@@ -114,6 +119,9 @@ def compute_time(duration, start, end):
114
119
return (duration , start , end )
115
120
116
121
122
+ # [END datascienceonramp_timereturnudf]
123
+
124
+ # [START datascienceonramp_timehelperudf]
117
125
def compute_duration_udf (duration , start , end ):
118
126
"""Calculates duration from start and end time if null."""
119
127
return compute_time (duration , start , end )[0 ]
@@ -129,9 +137,12 @@ def compute_end_udf(duration, start, end):
129
137
return compute_time (duration , start , end )[2 ]
130
138
131
139
140
+ # [END datascienceonramp_timehelperudf]
141
+
142
+ # [START datascienceonramp_sparksession]
132
143
if __name__ == "__main__" :
133
- TABLE = sys .argv [1 ]
134
- BUCKET_NAME = sys .argv [2 ]
144
+ BUCKET_NAME = sys .argv [1 ]
145
+ TABLE = sys .argv [2 ]
135
146
136
147
# Create a SparkSession, viewable via the Spark UI
137
148
spark = SparkSession .builder .appName ("data_cleaning" ).getOrCreate ()
@@ -142,13 +153,20 @@ def compute_end_udf(duration, start, end):
142
153
except Py4JJavaError as e :
143
154
raise Exception (f"Error reading { TABLE } " ) from e
144
155
156
+ # [END datascienceonramp_sparksession]
157
+
158
+ # [START datascienceonramp_removecolumn]
159
+ # remove unused column
160
+ df = df .drop ("gender" )
161
+ # [END datascienceonramp_removecolumn]
162
+
163
+ # [START datascienceonramp_sparksingleudfs]
145
164
# Single-parameter udfs
146
165
udfs = {
147
166
"start_station_name" : UserDefinedFunction (station_name_udf , StringType ()),
148
167
"end_station_name" : UserDefinedFunction (station_name_udf , StringType ()),
149
168
"tripduration" : UserDefinedFunction (trip_duration_udf , IntegerType ()),
150
169
"usertype" : UserDefinedFunction (user_type_udf , StringType ()),
151
- "gender" : UserDefinedFunction (gender_udf , StringType ()),
152
170
"start_station_latitude" : UserDefinedFunction (angle_udf , FloatType ()),
153
171
"start_station_longitude" : UserDefinedFunction (angle_udf , FloatType ()),
154
172
"end_station_latitude" : UserDefinedFunction (angle_udf , FloatType ()),
@@ -157,7 +175,8 @@ def compute_end_udf(duration, start, end):
157
175
158
176
for name , udf in udfs .items ():
159
177
df = df .withColumn (name , udf (name ))
160
-
178
+ # [END datascienceonramp_sparksingleudfs]
179
+ # [START datascienceonramp_sparkmultiudfs]
161
180
# Multi-parameter udfs
162
181
multi_udfs = {
163
182
"tripduration" : {
@@ -176,10 +195,12 @@ def compute_end_udf(duration, start, end):
176
195
177
196
for name , obj in multi_udfs .items ():
178
197
df = df .withColumn (name , obj ["udf" ](* obj ["params" ]))
179
-
198
+ # [END datascienceonramp_sparkmultiudfs]
199
+ # [START datascienceonramp_displaysamplerows]
180
200
# Display sample of rows
181
201
df .show (n = 20 )
182
-
202
+ # [END datascienceonramp_displaysamplerows]
203
+ # [START datascienceonramp_writetogcs]
183
204
# Write results to GCS
184
205
if "--dry-run" in sys .argv :
185
206
print ("Data will not be uploaded to GCS" )
@@ -222,3 +243,4 @@ def compute_end_udf(duration, start, end):
222
243
print (
223
244
"Data successfully uploaded to " + "gs://" + BUCKET_NAME + "/" + final_path
224
245
)
246
+ # [END datascienceonramp_writetogcs]
0 commit comments