@@ -29,11 +29,11 @@ def _set_app_properties():
29
29
"""
30
30
from pyspark import SparkContext
31
31
32
- sparkContext = SparkContext ._active_spark_context
33
- if sparkContext :
34
- sparkContext .setLocalProperty ("sentry_app_name" , sparkContext .appName )
35
- sparkContext .setLocalProperty (
36
- "sentry_application_id" , sparkContext .applicationId
32
+ spark_context = SparkContext ._active_spark_context
33
+ if spark_context :
34
+ spark_context .setLocalProperty ("sentry_app_name" , spark_context .appName )
35
+ spark_context .setLocalProperty (
36
+ "sentry_application_id" , spark_context .applicationId
37
37
)
38
38
39
39
@@ -106,99 +106,101 @@ def process_event(event, hint):
106
106
107
107
108
108
class SparkListener (object ):
109
- def onApplicationEnd (self , applicationEnd ):
109
+ def onApplicationEnd (self , applicationEnd ): # noqa: N802,N803
110
110
# type: (Any) -> None
111
111
pass
112
112
113
- def onApplicationStart (self , applicationStart ):
113
+ def onApplicationStart (self , applicationStart ): # noqa: N802,N803
114
114
# type: (Any) -> None
115
115
pass
116
116
117
- def onBlockManagerAdded (self , blockManagerAdded ):
117
+ def onBlockManagerAdded (self , blockManagerAdded ): # noqa: N802,N803
118
118
# type: (Any) -> None
119
119
pass
120
120
121
- def onBlockManagerRemoved (self , blockManagerRemoved ):
121
+ def onBlockManagerRemoved (self , blockManagerRemoved ): # noqa: N802,N803
122
122
# type: (Any) -> None
123
123
pass
124
124
125
- def onBlockUpdated (self , blockUpdated ):
125
+ def onBlockUpdated (self , blockUpdated ): # noqa: N802,N803
126
126
# type: (Any) -> None
127
127
pass
128
128
129
- def onEnvironmentUpdate (self , environmentUpdate ):
129
+ def onEnvironmentUpdate (self , environmentUpdate ): # noqa: N802,N803
130
130
# type: (Any) -> None
131
131
pass
132
132
133
- def onExecutorAdded (self , executorAdded ):
133
+ def onExecutorAdded (self , executorAdded ): # noqa: N802,N803
134
134
# type: (Any) -> None
135
135
pass
136
136
137
- def onExecutorBlacklisted (self , executorBlacklisted ):
137
+ def onExecutorBlacklisted (self , executorBlacklisted ): # noqa: N802,N803
138
138
# type: (Any) -> None
139
139
pass
140
140
141
- def onExecutorBlacklistedForStage (self , executorBlacklistedForStage ):
141
+ def onExecutorBlacklistedForStage ( # noqa: N802
142
+ self , executorBlacklistedForStage # noqa: N803
143
+ ):
142
144
# type: (Any) -> None
143
145
pass
144
146
145
- def onExecutorMetricsUpdate (self , executorMetricsUpdate ):
147
+ def onExecutorMetricsUpdate (self , executorMetricsUpdate ): # noqa: N802,N803
146
148
# type: (Any) -> None
147
149
pass
148
150
149
- def onExecutorRemoved (self , executorRemoved ):
151
+ def onExecutorRemoved (self , executorRemoved ): # noqa: N802,N803
150
152
# type: (Any) -> None
151
153
pass
152
154
153
- def onJobEnd (self , jobEnd ):
155
+ def onJobEnd (self , jobEnd ): # noqa: N802,N803
154
156
# type: (Any) -> None
155
157
pass
156
158
157
- def onJobStart (self , jobStart ):
159
+ def onJobStart (self , jobStart ): # noqa: N802,N803
158
160
# type: (Any) -> None
159
161
pass
160
162
161
- def onNodeBlacklisted (self , nodeBlacklisted ):
163
+ def onNodeBlacklisted (self , nodeBlacklisted ): # noqa: N802,N803
162
164
# type: (Any) -> None
163
165
pass
164
166
165
- def onNodeBlacklistedForStage (self , nodeBlacklistedForStage ):
167
+ def onNodeBlacklistedForStage (self , nodeBlacklistedForStage ): # noqa: N802,N803
166
168
# type: (Any) -> None
167
169
pass
168
170
169
- def onNodeUnblacklisted (self , nodeUnblacklisted ):
171
+ def onNodeUnblacklisted (self , nodeUnblacklisted ): # noqa: N802,N803
170
172
# type: (Any) -> None
171
173
pass
172
174
173
- def onOtherEvent (self , event ):
175
+ def onOtherEvent (self , event ): # noqa: N802,N803
174
176
# type: (Any) -> None
175
177
pass
176
178
177
- def onSpeculativeTaskSubmitted (self , speculativeTask ):
179
+ def onSpeculativeTaskSubmitted (self , speculativeTask ): # noqa: N802,N803
178
180
# type: (Any) -> None
179
181
pass
180
182
181
- def onStageCompleted (self , stageCompleted ):
183
+ def onStageCompleted (self , stageCompleted ): # noqa: N802,N803
182
184
# type: (Any) -> None
183
185
pass
184
186
185
- def onStageSubmitted (self , stageSubmitted ):
187
+ def onStageSubmitted (self , stageSubmitted ): # noqa: N802,N803
186
188
# type: (Any) -> None
187
189
pass
188
190
189
- def onTaskEnd (self , taskEnd ):
191
+ def onTaskEnd (self , taskEnd ): # noqa: N802,N803
190
192
# type: (Any) -> None
191
193
pass
192
194
193
- def onTaskGettingResult (self , taskGettingResult ):
195
+ def onTaskGettingResult (self , taskGettingResult ): # noqa: N802,N803
194
196
# type: (Any) -> None
195
197
pass
196
198
197
- def onTaskStart (self , taskStart ):
199
+ def onTaskStart (self , taskStart ): # noqa: N802,N803
198
200
# type: (Any) -> None
199
201
pass
200
202
201
- def onUnpersistRDD (self , unpersistRDD ):
203
+ def onUnpersistRDD (self , unpersistRDD ): # noqa: N802,N803
202
204
# type: (Any) -> None
203
205
pass
204
206
@@ -211,13 +213,13 @@ def __init__(self):
211
213
# type: () -> None
212
214
self .hub = Hub .current
213
215
214
- def onJobStart (self , jobStart ):
216
+ def onJobStart (self , jobStart ): # noqa: N802,N803
215
217
# type: (Any) -> None
216
218
message = "Job {} Started" .format (jobStart .jobId ())
217
219
self .hub .add_breadcrumb (level = "info" , message = message )
218
220
_set_app_properties ()
219
221
220
- def onJobEnd (self , jobEnd ):
222
+ def onJobEnd (self , jobEnd ): # noqa: N802,N803
221
223
# type: (Any) -> None
222
224
level = ""
223
225
message = ""
@@ -232,30 +234,30 @@ def onJobEnd(self, jobEnd):
232
234
233
235
self .hub .add_breadcrumb (level = level , message = message , data = data )
234
236
235
- def onStageSubmitted (self , stageSubmitted ):
237
+ def onStageSubmitted (self , stageSubmitted ): # noqa: N802,N803
236
238
# type: (Any) -> None
237
- stageInfo = stageSubmitted .stageInfo ()
238
- message = "Stage {} Submitted" .format (stageInfo .stageId ())
239
- data = {"attemptId" : stageInfo .attemptId (), "name" : stageInfo .name ()}
239
+ stage_info = stageSubmitted .stageInfo ()
240
+ message = "Stage {} Submitted" .format (stage_info .stageId ())
241
+ data = {"attemptId" : stage_info .attemptId (), "name" : stage_info .name ()}
240
242
self .hub .add_breadcrumb (level = "info" , message = message , data = data )
241
243
_set_app_properties ()
242
244
243
- def onStageCompleted (self , stageCompleted ):
245
+ def onStageCompleted (self , stageCompleted ): # noqa: N802,N803
244
246
# type: (Any) -> None
245
247
from py4j .protocol import Py4JJavaError # type: ignore
246
248
247
- stageInfo = stageCompleted .stageInfo ()
249
+ stage_info = stageCompleted .stageInfo ()
248
250
message = ""
249
251
level = ""
250
- data = {"attemptId" : stageInfo .attemptId (), "name" : stageInfo .name ()}
252
+ data = {"attemptId" : stage_info .attemptId (), "name" : stage_info .name ()}
251
253
252
254
# Have to Try Except because stageInfo.failureReason() is typed with Scala Option
253
255
try :
254
- data ["reason" ] = stageInfo .failureReason ().get ()
255
- message = "Stage {} Failed" .format (stageInfo .stageId ())
256
+ data ["reason" ] = stage_info .failureReason ().get ()
257
+ message = "Stage {} Failed" .format (stage_info .stageId ())
256
258
level = "warning"
257
259
except Py4JJavaError :
258
- message = "Stage {} Completed" .format (stageInfo .stageId ())
260
+ message = "Stage {} Completed" .format (stage_info .stageId ())
259
261
level = "info"
260
262
261
263
self .hub .add_breadcrumb (level = level , message = message , data = data )
0 commit comments