Skip to content

Commit 16cbe99

Browse files
committed
Continue work
1 parent 4b4b1c2 commit 16cbe99

File tree

4 files changed

+101
-3
lines changed

4 files changed

+101
-3
lines changed

sentry_sdk/integrations/spark/spark_driver.py

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from functools import wraps
2+
13
from sentry_sdk import configure_scope
24
from sentry_sdk.hub import Hub
35
from sentry_sdk.integrations import Integration
@@ -11,15 +13,79 @@
1113

1214
from sentry_sdk._types import Event, Hint
1315

16+
class SparkTransactionContainer():
17+
sentry_spark_transaction = None
1418

1519
class SparkIntegration(Integration):
1620
identifier = "spark"
1721

1822
@staticmethod
1923
def setup_once():
2024
# type: () -> None
21-
patch_spark_context_init()
25+
spark_transaction_container = SparkTransactionContainer()
26+
patch_spark_context_init(spark_transaction_container)
27+
patch_spark_context_stop(spark_transaction_container)
28+
patch_spark_for_spans()
29+
30+
31+
def wrap_span_around_func(obj, func_list, op):
32+
# type: (List[str], bool) -> None
33+
from pyspark import SparkContext
34+
35+
for name in func_list:
36+
if not hasattr(obj, name):
37+
continue
38+
39+
rdd_func = getattr(obj, name)
40+
41+
def wrap_rdd_func_call(func):
42+
@wraps(func)
43+
def _sentry_patched_func(self, *args, **kwargs):
44+
# type: (RDD, *Any, **Any) -> Optional[Any]
45+
sparkContext = SparkContext._active_spark_context
46+
47+
with Hub.current.start_span(op=op, description=name) as span:
48+
if sparkContext:
49+
span.set_tag("callsite", sparkContext.getLocalProperty("callSite.short"))
50+
return func(self, *args, **kwargs)
51+
52+
return _sentry_patched_func
53+
54+
setattr(obj, name, wrap_rdd_func_call(rdd_func))
55+
56+
def patch_spark_for_spans():
57+
# type: () -> None
58+
from pyspark import SparkContext
59+
from pyspark import RDD
2260

61+
SPARK_FUNCS = frozenset(["emptyRDD", "parallelize", "wholeTextFiles", "binaryFiles", "binaryFiles", "sequenceFile", "newAPIHadoopFile", ])
62+
RDD_ACTIONS = frozenset(["reduce", "collect", "first", "take", "takeSample", "takeOrdered", "saveAsTextFile", "saveAsSequenceFile", "saveAsObjectFile", "countByKey", "foreach"])
63+
RDD_TRANSFORMATIONS = frozenset(["map", "filter", "flatMap", "sample", "union", "intersection", "distinct", "groupByKey", "reduceByKey", "aggregateByKey", "sortByKey", "join", "cogroup", "cartesian", "pipe", "coalesce", "repartition", "repartitionAndSortWithinPartitions"])
64+
65+
wrap_span_around_func(SparkContext, SPARK_FUNCS, "spark.createRDD")
66+
wrap_span_around_func(RDD, RDD_ACTIONS, "spark.action")
67+
wrap_span_around_func(RDD, RDD_TRANSFORMATIONS, "spark.transformation")
68+
69+
70+
def patch_spark_context_stop(spark_transaction_container):
71+
# type: () -> None
72+
from pyspark import SparkContext
73+
74+
spark_context_stop = SparkContext.stop
75+
76+
def _sentry_patched_spark_context_stop(self, *args, **kwargs):
77+
if spark_transaction_container.span:
78+
spark_transaction_container.span._tags.setdefault("app_name", self.appName)
79+
spark_transaction_container.span._tags.setdefault("application_id", self.applicationId)
80+
81+
stop = spark_context_stop(self, *args, **kwargs)
82+
83+
if spark_transaction_container.span:
84+
spark_transaction_container.span.__exit__(None, None, None)
85+
86+
return stop
87+
88+
SparkContext.stop = _sentry_patched_spark_context_stop
2389

2490
def _set_app_properties():
2591
# type: () -> None
@@ -50,7 +116,7 @@ def _start_sentry_listener(sc):
50116
sc._jsc.sc().addSparkListener(listener)
51117

52118

53-
def patch_spark_context_init():
119+
def patch_spark_context_init(spark_transaction_container):
54120
# type: () -> None
55121
from pyspark import SparkContext
56122

@@ -63,6 +129,8 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
63129
if Hub.current.get_integration(SparkIntegration) is None:
64130
return init
65131

132+
spark_transaction_container.span = Hub.current.start_span(op="spark.job", transaction="{} - {}".format(self.appName, self.applicationId)).__enter__()
133+
66134
_start_sentry_listener(self)
67135
_set_app_properties()
68136

@@ -72,7 +140,7 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
72140
def process_event(event, hint):
73141
# type: (Event, Hint) -> Optional[Event]
74142
with capture_internal_exceptions():
75-
if Hub.current.get_integration(SparkIntegration) is None:
143+
if Hub.current.get_integration(SparkIntegration) is None or not self._jsc:
76144
return event
77145

78146
event.setdefault("user", {}).setdefault("id", self.sparkUser())

sentry_sdk/integrations/spark/spark_worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22

33
import sys
4+
from datetime import datetime
45

56
from sentry_sdk import configure_scope
67
from sentry_sdk.hub import Hub

sentry_sdk/tracing.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,20 +314,27 @@ def finish(self, hub=None):
314314

315315
if self._span_recorder is None:
316316
return None
317+
318+
print("LETS _span_recorder")
319+
print(self)
317320

318321
self._span_recorder.finish_span(self)
319322

320323
if self.transaction is None:
321324
# If this has no transaction set we assume there's a parent
322325
# transaction for this span that would be flushed out eventually.
323326
return None
327+
328+
print("")
324329

325330
client = hub.client
326331

327332
if client is None:
328333
# We have no client and therefore nowhere to send this transaction
329334
# event.
330335
return None
336+
337+
print("LETS client")
331338

332339
if not self.sampled:
333340
# At this point a `sampled = None` should have already been

tests/integrations/spark/test_spark.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
_set_app_properties,
55
_start_sentry_listener,
66
SentryListener,
7+
SparkIntegration,
78
)
89

910
from sentry_sdk.integrations.spark.spark_worker import SparkWorkerIntegration
@@ -32,6 +33,7 @@ def test_set_app_properties():
3233
== sparkContext.applicationId
3334
)
3435

36+
# SENTRY LISTENER
3537

3638
def test_start_sentry_listener():
3739
sparkContext = SparkContext.getOrCreate()
@@ -195,6 +197,26 @@ def test_sentry_listener_on_stage_completed_failure(
195197
assert mockHub.kwargs["data"]["name"] == "run-job"
196198
assert mockHub.kwargs["data"]["reason"] == "failure-reason"
197199

200+
# SPARK RDD
201+
202+
def test_sentry_patch_rdd(sentry_init, capture_events, capture_exceptions):
203+
from pyspark.sql import SparkSession
204+
from operator import add
205+
206+
sentry_init(integrations=[SparkIntegration()], traces_sample_rate=1)
207+
208+
events = capture_events()
209+
exceptions = capture_exceptions()
210+
211+
spark = SparkSession\
212+
.builder\
213+
.appName("PythonPi")\
214+
.getOrCreate()
215+
216+
spark.sparkContext.parallelize(range(1, 10), 2).map(lambda x: x + 2).reduce(add)
217+
218+
import pdb; pdb.set_trace()
219+
198220

199221
################
200222
# WORKER TESTS #

0 commit comments

Comments
 (0)