Skip to content

Commit 1e56a2c

Browse files
committed
Add capture_internal_exceptions to event processor
1 parent 55549af commit 1e56a2c

File tree

2 files changed

+62
-56
lines changed

2 files changed

+62
-56
lines changed

sentry_sdk/integrations/spark/spark_driver.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from sentry_sdk import configure_scope
22
from sentry_sdk.hub import Hub
33
from sentry_sdk.integrations import Integration
4+
from sentry_sdk.utils import capture_internal_exceptions
45

56

67
class SparkIntegration(Integration):
@@ -54,37 +55,39 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
5455
_start_sentry_listener(self)
5556
_set_app_properties()
5657

57-
with configure_scope() as scope:
58+
with capture_internal_exceptions():
59+
with configure_scope() as scope:
60+
61+
@scope.add_event_processor
62+
def process_event(event, hint):
63+
if Hub.current.get_integration(SparkIntegration) is None:
64+
return event
65+
66+
event.setdefault("user", {}).setdefault("id", self.sparkUser())
67+
68+
event.setdefault("tags", {}).setdefault(
69+
"executor.id", self._conf.get("spark.executor.id")
70+
)
71+
event["tags"].setdefault(
72+
"spark.submit.deployMode",
73+
self._conf.get("spark.submit.deployMode"),
74+
)
75+
event["tags"].setdefault(
76+
"driver.host", self._conf.get("spark.driver.host")
77+
)
78+
event["tags"].setdefault(
79+
"driver.port", self._conf.get("spark.driver.port")
80+
)
81+
event["tags"].setdefault("spark_version", self.version)
82+
event["tags"].setdefault("app_name", self.appName)
83+
event["tags"].setdefault("application_id", self.applicationId)
84+
event["tags"].setdefault("master", self.master)
85+
event["tags"].setdefault("spark.home", self.sparkHome)
86+
87+
event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)
5888

59-
@scope.add_event_processor
60-
def process_event(event, hint):
61-
if Hub.current.get_integration(SparkIntegration) is None:
6289
return event
6390

64-
event.setdefault("user", {}).setdefault("id", self.sparkUser())
65-
66-
event.setdefault("tags", {}).setdefault(
67-
"executor.id", self._conf.get("spark.executor.id")
68-
)
69-
event["tags"].setdefault(
70-
"spark.submit.deployMode", self._conf.get("spark.submit.deployMode")
71-
)
72-
event["tags"].setdefault(
73-
"driver.host", self._conf.get("spark.driver.host")
74-
)
75-
event["tags"].setdefault(
76-
"driver.port", self._conf.get("spark.driver.port")
77-
)
78-
event["tags"].setdefault("spark_version", self.version)
79-
event["tags"].setdefault("app_name", self.appName)
80-
event["tags"].setdefault("application_id", self.applicationId)
81-
event["tags"].setdefault("master", self.master)
82-
event["tags"].setdefault("spark.home", self.sparkHome)
83-
84-
event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)
85-
86-
return event
87-
8891
return init
8992

9093
SparkContext._do_init = _sentry_patched_spark_context_init

sentry_sdk/integrations/spark/spark_worker.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,37 +58,40 @@ def _capture_exception(exc_info, hub):
5858
def _tag_task_context():
5959
from pyspark.taskcontext import TaskContext
6060

61-
with configure_scope() as scope:
61+
with capture_internal_exceptions():
62+
with configure_scope() as scope:
6263

63-
@scope.add_event_processor
64-
def process_event(event, hint):
65-
integration = Hub.current.get_integration(SparkWorkerIntegration)
66-
taskContext = TaskContext.get()
64+
@scope.add_event_processor
65+
def process_event(event, hint):
66+
integration = Hub.current.get_integration(SparkWorkerIntegration)
67+
taskContext = TaskContext.get()
6768

68-
if integration is None or taskContext is None:
69-
return event
69+
if integration is None or taskContext is None:
70+
return event
71+
72+
event.setdefault("tags", {}).setdefault(
73+
"stageId", taskContext.stageId()
74+
)
75+
event["tags"].setdefault("partitionId", taskContext.partitionId())
76+
event["tags"].setdefault("attemptNumber", taskContext.attemptNumber())
77+
event["tags"].setdefault("taskAttemptId", taskContext.taskAttemptId())
78+
79+
if taskContext._localProperties:
80+
if "sentry_app_name" in taskContext._localProperties:
81+
event["tags"].setdefault(
82+
"app_name", taskContext._localProperties["sentry_app_name"]
83+
)
84+
event["tags"].setdefault(
85+
"application_id",
86+
taskContext._localProperties["sentry_application_id"],
87+
)
88+
89+
if "callSite.short" in taskContext._localProperties:
90+
event.setdefault("extra", {}).setdefault(
91+
"callSite", taskContext._localProperties["callSite.short"]
92+
)
7093

71-
event.setdefault("tags", {}).setdefault("stageId", taskContext.stageId())
72-
event["tags"].setdefault("partitionId", taskContext.partitionId())
73-
event["tags"].setdefault("attemptNumber", taskContext.attemptNumber())
74-
event["tags"].setdefault("taskAttemptId", taskContext.taskAttemptId())
75-
76-
if taskContext._localProperties:
77-
if "sentry_app_name" in taskContext._localProperties:
78-
event["tags"].setdefault(
79-
"app_name", taskContext._localProperties["sentry_app_name"]
80-
)
81-
event["tags"].setdefault(
82-
"application_id",
83-
taskContext._localProperties["sentry_application_id"],
84-
)
85-
86-
if "callSite.short" in taskContext._localProperties:
87-
event.setdefault("extra", {}).setdefault(
88-
"callSite", taskContext._localProperties["callSite.short"]
89-
)
90-
91-
return event
94+
return event
9295

9396

9497
def _sentry_worker_main(*args, **kwargs):

0 commit comments

Comments
 (0)