Skip to content

Commit 7ed9f7d

Browse files
author
Osmar Coronel
committed
bababuah
1 parent 4331b2e commit 7ed9f7d

File tree

2 files changed

+41
-5
lines changed

2 files changed

+41
-5
lines changed

sentry_sdk/integrations/beam.py

+23
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import sys
44
import types
5+
import linecache
56
from functools import wraps
67

78
from sentry_sdk.hub import Hub
@@ -17,6 +18,18 @@
1718
class BeamIntegration(Integration):
1819
identifier = "beam"
1920

21+
def __init__(self, send_source=False):
22+
self.cached_source = None
23+
self.cached_file = None
24+
if send_source:
25+
import __main__ as main
26+
27+
filename = main.__file__
28+
if filename not in linecache.cache:
29+
linecache.getlines(filename)
30+
self.cached_source = linecache.cache[filename]
31+
self.cached_file = filename
32+
2033
@staticmethod
2134
def setup_once():
2235
# type: () -> None
@@ -130,6 +143,7 @@ def raiseException(client):
130143
hub.bind_client(client)
131144
exc_info = sys.exc_info()
132145
with capture_internal_exceptions():
146+
setup_file(hub)
133147
_capture_exception(exc_info, hub)
134148
reraise(*exc_info)
135149

@@ -145,3 +159,12 @@ def _wrap_generator_call(gen, client):
145159
break
146160
except Exception:
147161
raiseException(client)
162+
163+
164+
def setup_file(hub):
165+
integration = hub.get_integration(BeamIntegration)
166+
if integration:
167+
cached_source = integration.cached_source
168+
cached_file = integration.cached_file
169+
if cached_source is not None and cached_file not in linecache.cache:
170+
linecache.cache[cached_file] = cached_source

tests/integrations/beam/test_beam.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,11 @@ def process_outputs(self, windowed_input_element, results):
163163

164164
@pytest.fixture
165165
def init_beam(sentry_init):
166-
def inner(fn):
167-
sentry_init(default_integrations=False, integrations=[BeamIntegration()])
166+
def inner(fn, send_source=False):
167+
sentry_init(
168+
default_integrations=False,
169+
integrations=[BeamIntegration(send_source=send_source)],
170+
)
168171
# Little hack to avoid having to run the whole pipeline.
169172
pardo = ParDo(fn)
170173
signature = pardo._signature
@@ -176,9 +179,19 @@ def inner(fn):
176179
return inner
177180

178181

179-
@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder])
180-
def test_invoker_normal(init_beam, fn):
181-
invoker = init_beam(fn)
182+
@pytest.mark.parametrize(
183+
"fn,send_source",
184+
[
185+
[test_simple, False],
186+
[test_simple, True],
187+
[test_callable, False],
188+
[test_callable, True],
189+
[test_place_holder, False],
190+
[test_place_holder, True]
191+
],
192+
)
193+
def test_invoker_normal(init_beam, fn, send_source):
194+
invoker = init_beam(fn, send_source)
182195
print("Normal testing {} with {} invoker.".format(fn, invoker))
183196
windowed_value = WindowedValue(False, 0, [None])
184197
invoker.invoke_process(windowed_value)

0 commit comments

Comments
 (0)