Skip to content

Commit 958d078

Browse files
authored
feat: Add PySpark Integration (getsentry#519)
* feat: Add SparkDriver and SparkWorker Integrations
1 parent 9dc161b commit 958d078

File tree

6 files changed

+634
-0
lines changed

6 files changed

+634
-0
lines changed

mypy.ini

+2
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,5 @@ ignore_missing_imports = True
4444
ignore_missing_imports = True
4545
[mypy-rq.*]
4646
ignore_missing_imports = True
47+
[mypy-pyspark.*]
48+
ignore_missing_imports = True
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from sentry_sdk.integrations.spark.spark_driver import SparkIntegration
2+
from sentry_sdk.integrations.spark.spark_worker import SparkWorkerIntegration
3+
4+
__all__ = ["SparkIntegration", "SparkWorkerIntegration"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
from sentry_sdk import configure_scope
2+
from sentry_sdk.hub import Hub
3+
from sentry_sdk.integrations import Integration
4+
from sentry_sdk.utils import capture_internal_exceptions
5+
6+
from sentry_sdk._types import MYPY
7+
8+
if MYPY:
9+
from typing import Any
10+
from typing import Optional
11+
12+
from sentry_sdk._types import Event, Hint
13+
14+
15+
class SparkIntegration(Integration):
16+
identifier = "spark"
17+
18+
@staticmethod
19+
def setup_once():
20+
# type: () -> None
21+
patch_spark_context_init()
22+
23+
24+
def _set_app_properties():
25+
# type: () -> None
26+
"""
27+
Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
28+
This allows worker integration to have access to app_name and application_id.
29+
"""
30+
from pyspark import SparkContext
31+
32+
sparkContext = SparkContext._active_spark_context
33+
if sparkContext:
34+
sparkContext.setLocalProperty("sentry_app_name", sparkContext.appName)
35+
sparkContext.setLocalProperty(
36+
"sentry_application_id", sparkContext.applicationId
37+
)
38+
39+
40+
def _start_sentry_listener(sc):
41+
# type: (Any) -> None
42+
"""
43+
Start java gateway server to add custom `SparkListener`
44+
"""
45+
from pyspark.java_gateway import ensure_callback_server_started
46+
47+
gw = sc._gateway
48+
ensure_callback_server_started(gw)
49+
listener = SentryListener()
50+
sc._jsc.sc().addSparkListener(listener)
51+
52+
53+
def patch_spark_context_init():
54+
# type: () -> None
55+
from pyspark import SparkContext
56+
57+
spark_context_init = SparkContext._do_init
58+
59+
def _sentry_patched_spark_context_init(self, *args, **kwargs):
60+
# type: (SparkContext, *Any, **Any) -> Optional[Any]
61+
init = spark_context_init(self, *args, **kwargs)
62+
63+
if Hub.current.get_integration(SparkIntegration) is None:
64+
return init
65+
66+
_start_sentry_listener(self)
67+
_set_app_properties()
68+
69+
with configure_scope() as scope:
70+
71+
@scope.add_event_processor
72+
def process_event(event, hint):
73+
# type: (Event, Hint) -> Optional[Event]
74+
with capture_internal_exceptions():
75+
if Hub.current.get_integration(SparkIntegration) is None:
76+
return event
77+
78+
event.setdefault("user", {}).setdefault("id", self.sparkUser())
79+
80+
event.setdefault("tags", {}).setdefault(
81+
"executor.id", self._conf.get("spark.executor.id")
82+
)
83+
event["tags"].setdefault(
84+
"spark-submit.deployMode",
85+
self._conf.get("spark.submit.deployMode"),
86+
)
87+
event["tags"].setdefault(
88+
"driver.host", self._conf.get("spark.driver.host")
89+
)
90+
event["tags"].setdefault(
91+
"driver.port", self._conf.get("spark.driver.port")
92+
)
93+
event["tags"].setdefault("spark_version", self.version)
94+
event["tags"].setdefault("app_name", self.appName)
95+
event["tags"].setdefault("application_id", self.applicationId)
96+
event["tags"].setdefault("master", self.master)
97+
event["tags"].setdefault("spark_home", self.sparkHome)
98+
99+
event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)
100+
101+
return event
102+
103+
return init
104+
105+
SparkContext._do_init = _sentry_patched_spark_context_init
106+
107+
108+
class SparkListener(object):
109+
def onApplicationEnd(self, applicationEnd):
110+
# type: (Any) -> None
111+
pass
112+
113+
def onApplicationStart(self, applicationStart):
114+
# type: (Any) -> None
115+
pass
116+
117+
def onBlockManagerAdded(self, blockManagerAdded):
118+
# type: (Any) -> None
119+
pass
120+
121+
def onBlockManagerRemoved(self, blockManagerRemoved):
122+
# type: (Any) -> None
123+
pass
124+
125+
def onBlockUpdated(self, blockUpdated):
126+
# type: (Any) -> None
127+
pass
128+
129+
def onEnvironmentUpdate(self, environmentUpdate):
130+
# type: (Any) -> None
131+
pass
132+
133+
def onExecutorAdded(self, executorAdded):
134+
# type: (Any) -> None
135+
pass
136+
137+
def onExecutorBlacklisted(self, executorBlacklisted):
138+
# type: (Any) -> None
139+
pass
140+
141+
def onExecutorBlacklistedForStage(self, executorBlacklistedForStage):
142+
# type: (Any) -> None
143+
pass
144+
145+
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
146+
# type: (Any) -> None
147+
pass
148+
149+
def onExecutorRemoved(self, executorRemoved):
150+
# type: (Any) -> None
151+
pass
152+
153+
def onJobEnd(self, jobEnd):
154+
# type: (Any) -> None
155+
pass
156+
157+
def onJobStart(self, jobStart):
158+
# type: (Any) -> None
159+
pass
160+
161+
def onNodeBlacklisted(self, nodeBlacklisted):
162+
# type: (Any) -> None
163+
pass
164+
165+
def onNodeBlacklistedForStage(self, nodeBlacklistedForStage):
166+
# type: (Any) -> None
167+
pass
168+
169+
def onNodeUnblacklisted(self, nodeUnblacklisted):
170+
# type: (Any) -> None
171+
pass
172+
173+
def onOtherEvent(self, event):
174+
# type: (Any) -> None
175+
pass
176+
177+
def onSpeculativeTaskSubmitted(self, speculativeTask):
178+
# type: (Any) -> None
179+
pass
180+
181+
def onStageCompleted(self, stageCompleted):
182+
# type: (Any) -> None
183+
pass
184+
185+
def onStageSubmitted(self, stageSubmitted):
186+
# type: (Any) -> None
187+
pass
188+
189+
def onTaskEnd(self, taskEnd):
190+
# type: (Any) -> None
191+
pass
192+
193+
def onTaskGettingResult(self, taskGettingResult):
194+
# type: (Any) -> None
195+
pass
196+
197+
def onTaskStart(self, taskStart):
198+
# type: (Any) -> None
199+
pass
200+
201+
def onUnpersistRDD(self, unpersistRDD):
202+
# type: (Any) -> None
203+
pass
204+
205+
class Java:
206+
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
207+
208+
209+
class SentryListener(SparkListener):
210+
def __init__(self):
211+
# type: () -> None
212+
self.hub = Hub.current
213+
214+
def onJobStart(self, jobStart):
215+
# type: (Any) -> None
216+
message = "Job {} Started".format(jobStart.jobId())
217+
self.hub.add_breadcrumb(level="info", message=message)
218+
_set_app_properties()
219+
220+
def onJobEnd(self, jobEnd):
221+
# type: (Any) -> None
222+
level = ""
223+
message = ""
224+
data = {"result": jobEnd.jobResult().toString()}
225+
226+
if jobEnd.jobResult().toString() == "JobSucceeded":
227+
level = "info"
228+
message = "Job {} Ended".format(jobEnd.jobId())
229+
else:
230+
level = "warning"
231+
message = "Job {} Failed".format(jobEnd.jobId())
232+
233+
self.hub.add_breadcrumb(level=level, message=message, data=data)
234+
235+
def onStageSubmitted(self, stageSubmitted):
236+
# type: (Any) -> None
237+
stageInfo = stageSubmitted.stageInfo()
238+
message = "Stage {} Submitted".format(stageInfo.stageId())
239+
data = {"attemptId": stageInfo.attemptId(), "name": stageInfo.name()}
240+
self.hub.add_breadcrumb(level="info", message=message, data=data)
241+
_set_app_properties()
242+
243+
def onStageCompleted(self, stageCompleted):
244+
# type: (Any) -> None
245+
from py4j.protocol import Py4JJavaError # type: ignore
246+
247+
stageInfo = stageCompleted.stageInfo()
248+
message = ""
249+
level = ""
250+
data = {"attemptId": stageInfo.attemptId(), "name": stageInfo.name()}
251+
252+
# Have to Try Except because stageInfo.failureReason() is typed with Scala Option
253+
try:
254+
data["reason"] = stageInfo.failureReason().get()
255+
message = "Stage {} Failed".format(stageInfo.stageId())
256+
level = "warning"
257+
except Py4JJavaError:
258+
message = "Stage {} Completed".format(stageInfo.stageId())
259+
level = "info"
260+
261+
self.hub.add_breadcrumb(level=level, message=message, data=data)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
5+
from sentry_sdk import configure_scope
6+
from sentry_sdk.hub import Hub
7+
from sentry_sdk.integrations import Integration
8+
from sentry_sdk.utils import (
9+
capture_internal_exceptions,
10+
exc_info_from_error,
11+
single_exception_from_error_tuple,
12+
walk_exception_chain,
13+
event_hint_with_exc_info,
14+
)
15+
16+
from sentry_sdk._types import MYPY
17+
18+
if MYPY:
19+
from typing import Any
20+
from typing import Optional
21+
22+
from sentry_sdk._types import ExcInfo, Event, Hint
23+
24+
25+
class SparkWorkerIntegration(Integration):
26+
identifier = "spark_worker"
27+
28+
@staticmethod
29+
def setup_once():
30+
# type: () -> None
31+
import pyspark.daemon as original_daemon
32+
33+
original_daemon.worker_main = _sentry_worker_main
34+
35+
36+
def _capture_exception(exc_info, hub):
37+
# type: (ExcInfo, Hub) -> None
38+
client = hub.client
39+
40+
client_options = client.options # type: ignore
41+
42+
mechanism = {"type": "spark", "handled": False}
43+
44+
exc_info = exc_info_from_error(exc_info)
45+
46+
exc_type, exc_value, tb = exc_info
47+
rv = []
48+
49+
# On Exception worker will call sys.exit(-1), so we can ignore SystemExit and similar errors
50+
for exc_type, exc_value, tb in walk_exception_chain(exc_info):
51+
if exc_type not in (SystemExit, EOFError, ConnectionResetError):
52+
rv.append(
53+
single_exception_from_error_tuple(
54+
exc_type, exc_value, tb, client_options, mechanism
55+
)
56+
)
57+
58+
if rv:
59+
rv.reverse()
60+
hint = event_hint_with_exc_info(exc_info)
61+
event = {"level": "error", "exception": {"values": rv}}
62+
63+
_tag_task_context()
64+
65+
hub.capture_event(event, hint=hint)
66+
67+
68+
def _tag_task_context():
69+
# type: () -> None
70+
from pyspark.taskcontext import TaskContext
71+
72+
with configure_scope() as scope:
73+
74+
@scope.add_event_processor
75+
def process_event(event, hint):
76+
# type: (Event, Hint) -> Optional[Event]
77+
with capture_internal_exceptions():
78+
integration = Hub.current.get_integration(SparkWorkerIntegration)
79+
taskContext = TaskContext.get()
80+
81+
if integration is None or taskContext is None:
82+
return event
83+
84+
event.setdefault("tags", {}).setdefault(
85+
"stageId", taskContext.stageId()
86+
)
87+
event["tags"].setdefault("partitionId", taskContext.partitionId())
88+
event["tags"].setdefault("attemptNumber", taskContext.attemptNumber())
89+
event["tags"].setdefault("taskAttemptId", taskContext.taskAttemptId())
90+
91+
if taskContext._localProperties:
92+
if "sentry_app_name" in taskContext._localProperties:
93+
event["tags"].setdefault(
94+
"app_name", taskContext._localProperties["sentry_app_name"]
95+
)
96+
event["tags"].setdefault(
97+
"application_id",
98+
taskContext._localProperties["sentry_application_id"],
99+
)
100+
101+
if "callSite.short" in taskContext._localProperties:
102+
event.setdefault("extra", {}).setdefault(
103+
"callSite", taskContext._localProperties["callSite.short"]
104+
)
105+
106+
return event
107+
108+
109+
def _sentry_worker_main(*args, **kwargs):
110+
# type: (*Optional[Any], **Optional[Any]) -> None
111+
import pyspark.worker as original_worker
112+
113+
try:
114+
original_worker.main(*args, **kwargs)
115+
except SystemExit:
116+
if Hub.current.get_integration(SparkWorkerIntegration) is not None:
117+
hub = Hub.current
118+
exc_info = sys.exc_info()
119+
with capture_internal_exceptions():
120+
_capture_exception(exc_info, hub)

0 commit comments

Comments
 (0)