Skip to content

Commit dff06d8

Browse files
committed
feat: Add SentryStreamingQueryListener
1 parent 4b4b1c2 commit dff06d8

File tree

5 files changed

+344
-210
lines changed

5 files changed

+344
-210
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ venv
2222
semaphore
2323
pip-wheel-metadata
2424
.mypy_cache
25+
.vscode/
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
from sentry_sdk import capture_message, push_scope
2+
from sentry_sdk.hub import Hub
3+
from sentry_sdk.utils import capture_internal_exceptions
4+
5+
from sentry_sdk._types import MYPY
6+
7+
if MYPY:
8+
from typing import Any
9+
10+
11+
def _set_app_properties():
12+
# type: () -> None
13+
"""
14+
Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
15+
This allows worker integration to have access to app_name and application_id.
16+
"""
17+
from pyspark import SparkContext
18+
19+
sparkContext = SparkContext._active_spark_context
20+
if sparkContext:
21+
sparkContext.setLocalProperty("sentry_app_name", sparkContext.appName)
22+
sparkContext.setLocalProperty(
23+
"sentry_application_id", sparkContext.applicationId
24+
)
25+
26+
27+
class SparkListener(object):
28+
def onApplicationEnd(self, applicationEnd):
29+
# type: (Any) -> None
30+
pass
31+
32+
def onApplicationStart(self, applicationStart):
33+
# type: (Any) -> None
34+
pass
35+
36+
def onBlockManagerAdded(self, blockManagerAdded):
37+
# type: (Any) -> None
38+
pass
39+
40+
def onBlockManagerRemoved(self, blockManagerRemoved):
41+
# type: (Any) -> None
42+
pass
43+
44+
def onBlockUpdated(self, blockUpdated):
45+
# type: (Any) -> None
46+
pass
47+
48+
def onEnvironmentUpdate(self, environmentUpdate):
49+
# type: (Any) -> None
50+
pass
51+
52+
def onExecutorAdded(self, executorAdded):
53+
# type: (Any) -> None
54+
pass
55+
56+
def onExecutorBlacklisted(self, executorBlacklisted):
57+
# type: (Any) -> None
58+
pass
59+
60+
def onExecutorBlacklistedForStage(self, executorBlacklistedForStage):
61+
# type: (Any) -> None
62+
pass
63+
64+
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
65+
# type: (Any) -> None
66+
pass
67+
68+
def onExecutorRemoved(self, executorRemoved):
69+
# type: (Any) -> None
70+
pass
71+
72+
def onJobEnd(self, jobEnd):
73+
# type: (Any) -> None
74+
pass
75+
76+
def onJobStart(self, jobStart):
77+
# type: (Any) -> None
78+
pass
79+
80+
def onNodeBlacklisted(self, nodeBlacklisted):
81+
# type: (Any) -> None
82+
pass
83+
84+
def onNodeBlacklistedForStage(self, nodeBlacklistedForStage):
85+
# type: (Any) -> None
86+
pass
87+
88+
def onNodeUnblacklisted(self, nodeUnblacklisted):
89+
# type: (Any) -> None
90+
pass
91+
92+
def onOtherEvent(self, event):
93+
# type: (Any) -> None
94+
pass
95+
96+
def onSpeculativeTaskSubmitted(self, speculativeTask):
97+
# type: (Any) -> None
98+
pass
99+
100+
def onStageCompleted(self, stageCompleted):
101+
# type: (Any) -> None
102+
pass
103+
104+
def onStageSubmitted(self, stageSubmitted):
105+
# type: (Any) -> None
106+
pass
107+
108+
def onTaskEnd(self, taskEnd):
109+
# type: (Any) -> None
110+
pass
111+
112+
def onTaskGettingResult(self, taskGettingResult):
113+
# type: (Any) -> None
114+
pass
115+
116+
def onTaskStart(self, taskStart):
117+
# type: (Any) -> None
118+
pass
119+
120+
def onUnpersistRDD(self, unpersistRDD):
121+
# type: (Any) -> None
122+
pass
123+
124+
class Java:
125+
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
126+
127+
128+
class SentryListener(SparkListener):
129+
def __init__(self):
130+
# type: () -> None
131+
self.hub = Hub.current
132+
_set_app_properties()
133+
134+
def onJobStart(self, jobStart):
135+
# type: (Any) -> None
136+
message = "Job {} Started".format(jobStart.jobId())
137+
self.hub.add_breadcrumb(level="info", message=message)
138+
_set_app_properties()
139+
140+
def onJobEnd(self, jobEnd):
141+
# type: (Any) -> None
142+
level = ""
143+
message = ""
144+
data = {"result": jobEnd.jobResult().toString()}
145+
146+
if jobEnd.jobResult().toString() == "JobSucceeded":
147+
level = "info"
148+
message = "Job {} Ended".format(jobEnd.jobId())
149+
else:
150+
level = "warning"
151+
message = "Job {} Failed".format(jobEnd.jobId())
152+
153+
self.hub.add_breadcrumb(level=level, message=message, data=data)
154+
155+
def onStageSubmitted(self, stageSubmitted):
156+
# type: (Any) -> None
157+
stageInfo = stageSubmitted.stageInfo()
158+
message = "Stage {} Submitted".format(stageInfo.stageId())
159+
data = {"attemptId": stageInfo.attemptId(), "name": stageInfo.name()}
160+
self.hub.add_breadcrumb(level="info", message=message, data=data)
161+
_set_app_properties()
162+
163+
def onStageCompleted(self, stageCompleted):
164+
# type: (Any) -> None
165+
from py4j.protocol import Py4JJavaError # type: ignore
166+
167+
stageInfo = stageCompleted.stageInfo()
168+
message = ""
169+
level = ""
170+
data = {"attemptId": stageInfo.attemptId(), "name": stageInfo.name()}
171+
172+
# Have to Try Except because stageInfo.failureReason() is typed with Scala Option
173+
try:
174+
data["reason"] = stageInfo.failureReason().get()
175+
message = "Stage {} Failed".format(stageInfo.stageId())
176+
level = "warning"
177+
except Py4JJavaError:
178+
message = "Stage {} Completed".format(stageInfo.stageId())
179+
level = "info"
180+
181+
self.hub.add_breadcrumb(level=level, message=message, data=data)
182+
183+
184+
class SparkStreamingQueryListener(object):
185+
def onQueryProgress(self, event):
186+
# type: (Any) -> None
187+
pass
188+
189+
def onQueryStarted(self, event):
190+
# type: (Any) -> None
191+
pass
192+
193+
def onQueryTerminated(self, event):
194+
# type: (Any) -> None
195+
pass
196+
197+
class Java:
198+
implements = ["org.apache.spark.sql.hive.thriftserver.DummyStreamingQueryListener"]
199+
200+
201+
class SentryStreamingQueryListener(SparkStreamingQueryListener):
202+
def __init__(self):
203+
# type: () -> None
204+
self.hub = Hub.current
205+
self.name = None
206+
207+
def onQueryStarted(self, event):
208+
# type: (Any) -> None
209+
self.name = event.name()
210+
211+
message = "Query {} started".format(self.name)
212+
data = {"runId": event.runId()}
213+
self.hub.add_breadcrumb(message=message, data=data)
214+
215+
def onQueryProgress(self, event):
216+
# type: (Any) -> None
217+
progress = event.progress()
218+
219+
self.name = progress.name()
220+
221+
message = "Query {} progressed".format(self.name)
222+
data = {
223+
"runId": progress.runId(),
224+
"timestamp": progress.timestamp(),
225+
"json": progress.json(),
226+
}
227+
self.hub.add_breadcrumb(message=message, data=data)
228+
229+
def onQueryTerminated(self, event):
230+
# type: (Any) -> None
231+
from py4j.protocol import Py4JJavaError
232+
233+
# Have to Try Except because event.exception() is typed with Scala Option
234+
try:
235+
with push_scope() as scope:
236+
with capture_internal_exceptions():
237+
message = event.exception()
238+
scope.set_tag("name", self.name)
239+
scope.set_tag("runId", event.runId())
240+
scope.set_tag("id", event.id())
241+
242+
capture_message(message=message, level="error")
243+
244+
except Py4JJavaError:
245+
message = "Query {} terminated".format(self.name)
246+
data = {"runId": event.runId()}
247+
self.hub.add_breadcrumb(message=message, data=data)

0 commit comments

Comments
 (0)