13
13
14
14
from sentry_sdk ._types import Event , Hint
15
15
16
- class SparkTransactionContainer ():
16
+
17
+ class SparkTransactionContainer :
17
18
sentry_spark_transaction = None
18
19
20
+
19
21
class SparkIntegration (Integration ):
20
22
identifier = "spark"
21
23
@@ -46,21 +48,69 @@ def _sentry_patched_func(self, *args, **kwargs):
46
48
47
49
with Hub .current .start_span (op = op , description = name ) as span :
48
50
if sparkContext :
49
- span .set_tag ("callsite" , sparkContext .getLocalProperty ("callSite.short" ))
51
+ span .set_tag (
52
+ "callsite" , sparkContext .getLocalProperty ("callSite.short" )
53
+ )
50
54
return func (self , * args , ** kwargs )
51
-
55
+
52
56
return _sentry_patched_func
53
57
54
58
setattr (obj , name , wrap_rdd_func_call (rdd_func ))
55
59
60
+
56
61
def patch_spark_for_spans ():
57
62
# type: () -> None
58
63
from pyspark import SparkContext
59
64
from pyspark import RDD
60
65
61
- SPARK_FUNCS = frozenset (["emptyRDD" , "parallelize" , "wholeTextFiles" , "binaryFiles" , "binaryFiles" , "sequenceFile" , "newAPIHadoopFile" , ])
62
- RDD_ACTIONS = frozenset (["reduce" , "collect" , "first" , "take" , "takeSample" , "takeOrdered" , "saveAsTextFile" , "saveAsSequenceFile" , "saveAsObjectFile" , "countByKey" , "foreach" ])
63
- RDD_TRANSFORMATIONS = frozenset (["map" , "filter" , "flatMap" , "sample" , "union" , "intersection" , "distinct" , "groupByKey" , "reduceByKey" , "aggregateByKey" , "sortByKey" , "join" , "cogroup" , "cartesian" , "pipe" , "coalesce" , "repartition" , "repartitionAndSortWithinPartitions" ])
66
+ SPARK_FUNCS = frozenset (
67
+ [
68
+ "emptyRDD" ,
69
+ "parallelize" ,
70
+ "wholeTextFiles" ,
71
+ "binaryFiles" ,
72
+ "binaryFiles" ,
73
+ "sequenceFile" ,
74
+ "newAPIHadoopFile" ,
75
+ ]
76
+ )
77
+ RDD_ACTIONS = frozenset (
78
+ [
79
+ "reduce" ,
80
+ "collect" ,
81
+ "first" ,
82
+ "take" ,
83
+ "takeSample" ,
84
+ "takeOrdered" ,
85
+ "saveAsTextFile" ,
86
+ "saveAsSequenceFile" ,
87
+ "saveAsObjectFile" ,
88
+ "countByKey" ,
89
+ "foreach" ,
90
+ ]
91
+ )
92
+ RDD_TRANSFORMATIONS = frozenset (
93
+ [
94
+ "map" ,
95
+ "filter" ,
96
+ "flatMap" ,
97
+ "sample" ,
98
+ "union" ,
99
+ "intersection" ,
100
+ "distinct" ,
101
+ "groupByKey" ,
102
+ "reduceByKey" ,
103
+ "aggregateByKey" ,
104
+ "sortByKey" ,
105
+ "join" ,
106
+ "cogroup" ,
107
+ "cartesian" ,
108
+ "pipe" ,
109
+ "coalesce" ,
110
+ "repartition" ,
111
+ "repartitionAndSortWithinPartitions" ,
112
+ ]
113
+ )
64
114
65
115
wrap_span_around_func (SparkContext , SPARK_FUNCS , "spark.createRDD" )
66
116
wrap_span_around_func (RDD , RDD_ACTIONS , "spark.action" )
@@ -76,7 +126,9 @@ def patch_spark_context_stop(spark_transaction_container):
76
126
def _sentry_patched_spark_context_stop (self , * args , ** kwargs ):
77
127
if spark_transaction_container .span :
78
128
spark_transaction_container .span ._tags .setdefault ("app_name" , self .appName )
79
- spark_transaction_container .span ._tags .setdefault ("application_id" , self .applicationId )
129
+ spark_transaction_container .span ._tags .setdefault (
130
+ "application_id" , self .applicationId
131
+ )
80
132
81
133
stop = spark_context_stop (self , * args , ** kwargs )
82
134
@@ -87,6 +139,7 @@ def _sentry_patched_spark_context_stop(self, *args, **kwargs):
87
139
88
140
SparkContext .stop = _sentry_patched_spark_context_stop
89
141
142
+
90
143
def _set_app_properties ():
91
144
# type: () -> None
92
145
"""
@@ -129,7 +182,10 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
129
182
if Hub .current .get_integration (SparkIntegration ) is None :
130
183
return init
131
184
132
- spark_transaction_container .span = Hub .current .start_span (op = "spark.job" , transaction = "{} - {}" .format (self .appName , self .applicationId )).__enter__ ()
185
+ spark_transaction_container .span = Hub .current .start_span (
186
+ op = "spark.job" ,
187
+ transaction = "{} - {}" .format (self .appName , self .applicationId ),
188
+ ).__enter__ ()
133
189
134
190
_start_sentry_listener (self )
135
191
_set_app_properties ()
@@ -140,7 +196,10 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
140
196
def process_event (event , hint ):
141
197
# type: (Event, Hint) -> Optional[Event]
142
198
with capture_internal_exceptions ():
143
- if Hub .current .get_integration (SparkIntegration ) is None or not self ._jsc :
199
+ if (
200
+ Hub .current .get_integration (SparkIntegration ) is None
201
+ or not self ._jsc
202
+ ):
144
203
return event
145
204
146
205
event .setdefault ("user" , {}).setdefault ("id" , self .sparkUser ())
0 commit comments