1
+ from functools import wraps
2
+
1
3
from sentry_sdk import configure_scope
2
4
from sentry_sdk .hub import Hub
3
5
from sentry_sdk .integrations import Integration
11
13
12
14
from sentry_sdk ._types import Event , Hint
13
15
16
+ class SparkTransactionContainer ():
17
+ sentry_spark_transaction = None
14
18
15
19
class SparkIntegration (Integration ):
16
20
identifier = "spark"
17
21
18
22
@staticmethod
19
23
def setup_once ():
20
24
# type: () -> None
21
- patch_spark_context_init ()
25
+ spark_transaction_container = SparkTransactionContainer ()
26
+ patch_spark_context_init (spark_transaction_container )
27
+ patch_spark_context_stop (spark_transaction_container )
28
+ patch_spark_for_spans ()
29
+
30
+
31
+ def wrap_span_around_func (obj , func_list , op ):
32
+ # type: (List[str], bool) -> None
33
+ from pyspark import SparkContext
34
+
35
+ for name in func_list :
36
+ if not hasattr (obj , name ):
37
+ continue
38
+
39
+ rdd_func = getattr (obj , name )
40
+
41
+ def wrap_rdd_func_call (func ):
42
+ @wraps (func )
43
+ def _sentry_patched_func (self , * args , ** kwargs ):
44
+ # type: (RDD, *Any, **Any) -> Optional[Any]
45
+ sparkContext = SparkContext ._active_spark_context
46
+
47
+ with Hub .current .start_span (op = op , description = name ) as span :
48
+ if sparkContext :
49
+ span .set_tag ("callsite" , sparkContext .getLocalProperty ("callSite.short" ))
50
+ return func (self , * args , ** kwargs )
51
+
52
+ return _sentry_patched_func
53
+
54
+ setattr (obj , name , wrap_rdd_func_call (rdd_func ))
55
+
56
+ def patch_spark_for_spans ():
57
+ # type: () -> None
58
+ from pyspark import SparkContext
59
+ from pyspark import RDD
22
60
61
+ SPARK_FUNCS = frozenset (["emptyRDD" , "parallelize" , "wholeTextFiles" , "binaryFiles" , "binaryFiles" , "sequenceFile" , "newAPIHadoopFile" , ])
62
+ RDD_ACTIONS = frozenset (["reduce" , "collect" , "first" , "take" , "takeSample" , "takeOrdered" , "saveAsTextFile" , "saveAsSequenceFile" , "saveAsObjectFile" , "countByKey" , "foreach" ])
63
+ RDD_TRANSFORMATIONS = frozenset (["map" , "filter" , "flatMap" , "sample" , "union" , "intersection" , "distinct" , "groupByKey" , "reduceByKey" , "aggregateByKey" , "sortByKey" , "join" , "cogroup" , "cartesian" , "pipe" , "coalesce" , "repartition" , "repartitionAndSortWithinPartitions" ])
64
+
65
+ wrap_span_around_func (SparkContext , SPARK_FUNCS , "spark.createRDD" )
66
+ wrap_span_around_func (RDD , RDD_ACTIONS , "spark.action" )
67
+ wrap_span_around_func (RDD , RDD_TRANSFORMATIONS , "spark.transformation" )
68
+
69
+
70
+ def patch_spark_context_stop (spark_transaction_container ):
71
+ # type: () -> None
72
+ from pyspark import SparkContext
73
+
74
+ spark_context_stop = SparkContext .stop
75
+
76
+ def _sentry_patched_spark_context_stop (self , * args , ** kwargs ):
77
+ if spark_transaction_container .span :
78
+ spark_transaction_container .span ._tags .setdefault ("app_name" , self .appName )
79
+ spark_transaction_container .span ._tags .setdefault ("application_id" , self .applicationId )
80
+
81
+ stop = spark_context_stop (self , * args , ** kwargs )
82
+
83
+ if spark_transaction_container .span :
84
+ spark_transaction_container .span .__exit__ (None , None , None )
85
+
86
+ return stop
87
+
88
+ SparkContext .stop = _sentry_patched_spark_context_stop
23
89
24
90
def _set_app_properties ():
25
91
# type: () -> None
@@ -50,7 +116,7 @@ def _start_sentry_listener(sc):
50
116
sc ._jsc .sc ().addSparkListener (listener )
51
117
52
118
53
- def patch_spark_context_init ():
119
+ def patch_spark_context_init (spark_transaction_container ):
54
120
# type: () -> None
55
121
from pyspark import SparkContext
56
122
@@ -63,6 +129,8 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
63
129
if Hub .current .get_integration (SparkIntegration ) is None :
64
130
return init
65
131
132
+ spark_transaction_container .span = Hub .current .start_span (op = "spark.job" , transaction = "{} - {}" .format (self .appName , self .applicationId )).__enter__ ()
133
+
66
134
_start_sentry_listener (self )
67
135
_set_app_properties ()
68
136
@@ -72,7 +140,7 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
72
140
def process_event (event , hint ):
73
141
# type: (Event, Hint) -> Optional[Event]
74
142
with capture_internal_exceptions ():
75
- if Hub .current .get_integration (SparkIntegration ) is None :
143
+ if Hub .current .get_integration (SparkIntegration ) is None or not self . _jsc :
76
144
return event
77
145
78
146
event .setdefault ("user" , {}).setdefault ("id" , self .sparkUser ())
0 commit comments