Skip to content

Commit e210aef

Browse files
committed
Format stuff
1 parent 16cbe99 commit e210aef

File tree

3 files changed

+80
-18
lines changed

3 files changed

+80
-18
lines changed

sentry_sdk/integrations/spark/spark_driver.py

+68-9
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313

1414
from sentry_sdk._types import Event, Hint
1515

16-
class SparkTransactionContainer():
16+
17+
class SparkTransactionContainer:
1718
sentry_spark_transaction = None
1819

20+
1921
class SparkIntegration(Integration):
2022
identifier = "spark"
2123

@@ -46,21 +48,69 @@ def _sentry_patched_func(self, *args, **kwargs):
4648

4749
with Hub.current.start_span(op=op, description=name) as span:
4850
if sparkContext:
49-
span.set_tag("callsite", sparkContext.getLocalProperty("callSite.short"))
51+
span.set_tag(
52+
"callsite", sparkContext.getLocalProperty("callSite.short")
53+
)
5054
return func(self, *args, **kwargs)
51-
55+
5256
return _sentry_patched_func
5357

5458
setattr(obj, name, wrap_rdd_func_call(rdd_func))
5559

60+
5661
def patch_spark_for_spans():
5762
# type: () -> None
5863
from pyspark import SparkContext
5964
from pyspark import RDD
6065

61-
SPARK_FUNCS = frozenset(["emptyRDD", "parallelize", "wholeTextFiles", "binaryFiles", "binaryFiles", "sequenceFile", "newAPIHadoopFile", ])
62-
RDD_ACTIONS = frozenset(["reduce", "collect", "first", "take", "takeSample", "takeOrdered", "saveAsTextFile", "saveAsSequenceFile", "saveAsObjectFile", "countByKey", "foreach"])
63-
RDD_TRANSFORMATIONS = frozenset(["map", "filter", "flatMap", "sample", "union", "intersection", "distinct", "groupByKey", "reduceByKey", "aggregateByKey", "sortByKey", "join", "cogroup", "cartesian", "pipe", "coalesce", "repartition", "repartitionAndSortWithinPartitions"])
66+
SPARK_FUNCS = frozenset(
67+
[
68+
"emptyRDD",
69+
"parallelize",
70+
"wholeTextFiles",
71+
"binaryFiles",
72+
"binaryFiles",
73+
"sequenceFile",
74+
"newAPIHadoopFile",
75+
]
76+
)
77+
RDD_ACTIONS = frozenset(
78+
[
79+
"reduce",
80+
"collect",
81+
"first",
82+
"take",
83+
"takeSample",
84+
"takeOrdered",
85+
"saveAsTextFile",
86+
"saveAsSequenceFile",
87+
"saveAsObjectFile",
88+
"countByKey",
89+
"foreach",
90+
]
91+
)
92+
RDD_TRANSFORMATIONS = frozenset(
93+
[
94+
"map",
95+
"filter",
96+
"flatMap",
97+
"sample",
98+
"union",
99+
"intersection",
100+
"distinct",
101+
"groupByKey",
102+
"reduceByKey",
103+
"aggregateByKey",
104+
"sortByKey",
105+
"join",
106+
"cogroup",
107+
"cartesian",
108+
"pipe",
109+
"coalesce",
110+
"repartition",
111+
"repartitionAndSortWithinPartitions",
112+
]
113+
)
64114

65115
wrap_span_around_func(SparkContext, SPARK_FUNCS, "spark.createRDD")
66116
wrap_span_around_func(RDD, RDD_ACTIONS, "spark.action")
@@ -76,7 +126,9 @@ def patch_spark_context_stop(spark_transaction_container):
76126
def _sentry_patched_spark_context_stop(self, *args, **kwargs):
77127
if spark_transaction_container.span:
78128
spark_transaction_container.span._tags.setdefault("app_name", self.appName)
79-
spark_transaction_container.span._tags.setdefault("application_id", self.applicationId)
129+
spark_transaction_container.span._tags.setdefault(
130+
"application_id", self.applicationId
131+
)
80132

81133
stop = spark_context_stop(self, *args, **kwargs)
82134

@@ -87,6 +139,7 @@ def _sentry_patched_spark_context_stop(self, *args, **kwargs):
87139

88140
SparkContext.stop = _sentry_patched_spark_context_stop
89141

142+
90143
def _set_app_properties():
91144
# type: () -> None
92145
"""
@@ -129,7 +182,10 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
129182
if Hub.current.get_integration(SparkIntegration) is None:
130183
return init
131184

132-
spark_transaction_container.span = Hub.current.start_span(op="spark.job", transaction="{} - {}".format(self.appName, self.applicationId)).__enter__()
185+
spark_transaction_container.span = Hub.current.start_span(
186+
op="spark.job",
187+
transaction="{} - {}".format(self.appName, self.applicationId),
188+
).__enter__()
133189

134190
_start_sentry_listener(self)
135191
_set_app_properties()
@@ -140,7 +196,10 @@ def _sentry_patched_spark_context_init(self, *args, **kwargs):
140196
def process_event(event, hint):
141197
# type: (Event, Hint) -> Optional[Event]
142198
with capture_internal_exceptions():
143-
if Hub.current.get_integration(SparkIntegration) is None or not self._jsc:
199+
if (
200+
Hub.current.get_integration(SparkIntegration) is None
201+
or not self._jsc
202+
):
144203
return event
145204

146205
event.setdefault("user", {}).setdefault("id", self.sparkUser())

sentry_sdk/tracing.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ def finish(self, hub=None):
314314

315315
if self._span_recorder is None:
316316
return None
317-
317+
318318
print("LETS _span_recorder")
319319
print(self)
320320

@@ -324,7 +324,7 @@ def finish(self, hub=None):
324324
# If this has no transaction set we assume there's a parent
325325
# transaction for this span that would be flushed out eventually.
326326
return None
327-
327+
328328
print("")
329329

330330
client = hub.client
@@ -333,7 +333,7 @@ def finish(self, hub=None):
333333
# We have no client and therefore nowhere to send this transaction
334334
# event.
335335
return None
336-
336+
337337
print("LETS client")
338338

339339
if not self.sampled:

tests/integrations/spark/test_spark.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ def test_set_app_properties():
3333
== sparkContext.applicationId
3434
)
3535

36+
3637
# SENTRY LISTENER
3738

39+
3840
def test_start_sentry_listener():
3941
sparkContext = SparkContext.getOrCreate()
4042

@@ -197,8 +199,10 @@ def test_sentry_listener_on_stage_completed_failure(
197199
assert mockHub.kwargs["data"]["name"] == "run-job"
198200
assert mockHub.kwargs["data"]["reason"] == "failure-reason"
199201

202+
200203
# SPARK RDD
201204

205+
202206
def test_sentry_patch_rdd(sentry_init, capture_events, capture_exceptions):
203207
from pyspark.sql import SparkSession
204208
from operator import add
@@ -208,14 +212,13 @@ def test_sentry_patch_rdd(sentry_init, capture_events, capture_exceptions):
208212
events = capture_events()
209213
exceptions = capture_exceptions()
210214

211-
spark = SparkSession\
212-
.builder\
213-
.appName("PythonPi")\
214-
.getOrCreate()
215-
215+
spark = SparkSession.builder.appName("PythonPi").getOrCreate()
216+
216217
spark.sparkContext.parallelize(range(1, 10), 2).map(lambda x: x + 2).reduce(add)
217218

218-
import pdb; pdb.set_trace()
219+
import pdb
220+
221+
pdb.set_trace()
219222

220223

221224
################

0 commit comments

Comments
 (0)