Skip to content

Commit c8a1be9

Browse files
authored
Beam integration (getsentry#446)
* Created Beam Integration
1 parent 211f7f4 commit c8a1be9

File tree

3 files changed

+359
-0
lines changed

3 files changed

+359
-0
lines changed

sentry_sdk/integrations/beam.py

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
import types
5+
from functools import wraps
6+
7+
from sentry_sdk.hub import Hub
8+
from sentry_sdk._compat import reraise
9+
from sentry_sdk.utils import capture_internal_exceptions, event_from_exception
10+
from sentry_sdk.integrations import Integration
11+
from sentry_sdk.integrations.logging import ignore_logger
12+
13+
WRAPPED_FUNC = "_wrapped_{}_"
14+
INSPECT_FUNC = "_inspect_{}" # Required format per apache_beam/transforms/core.py
15+
USED_FUNC = "_sentry_used_"
16+
17+
18+
class BeamIntegration(Integration):
19+
identifier = "beam"
20+
21+
@staticmethod
22+
def setup_once():
23+
# type: () -> None
24+
from apache_beam.transforms.core import DoFn, ParDo # type: ignore
25+
26+
ignore_logger("root")
27+
ignore_logger("bundle_processor.create")
28+
29+
function_patches = ["process", "start_bundle", "finish_bundle", "setup"]
30+
for func_name in function_patches:
31+
setattr(
32+
DoFn,
33+
INSPECT_FUNC.format(func_name),
34+
_wrap_inspect_call(DoFn, func_name),
35+
)
36+
37+
old_init = ParDo.__init__
38+
39+
def sentry_init_pardo(self, fn, *args, **kwargs):
40+
# Do not monkey patch init twice
41+
if not getattr(self, "_sentry_is_patched", False):
42+
for func_name in function_patches:
43+
if not hasattr(fn, func_name):
44+
continue
45+
wrapped_func = WRAPPED_FUNC.format(func_name)
46+
47+
# Check to see if inspect is set and process is not
48+
# to avoid monkey patching process twice.
49+
# Check to see if function is part of object for
50+
# backwards compatibility.
51+
process_func = getattr(fn, func_name)
52+
inspect_func = getattr(fn, INSPECT_FUNC.format(func_name))
53+
if not getattr(inspect_func, USED_FUNC, False) and not getattr(
54+
process_func, USED_FUNC, False
55+
):
56+
setattr(fn, wrapped_func, process_func)
57+
setattr(fn, func_name, _wrap_task_call(process_func))
58+
59+
self._sentry_is_patched = True
60+
old_init(self, fn, *args, **kwargs)
61+
62+
ParDo.__init__ = sentry_init_pardo
63+
64+
65+
def _wrap_inspect_call(cls, func_name):
66+
from apache_beam.typehints.decorators import getfullargspec # type: ignore
67+
68+
if not hasattr(cls, func_name):
69+
return None
70+
71+
def _inspect(self):
72+
"""
73+
Inspect function overrides the way Beam gets argspec.
74+
"""
75+
wrapped_func = WRAPPED_FUNC.format(func_name)
76+
if hasattr(self, wrapped_func):
77+
process_func = getattr(self, wrapped_func)
78+
else:
79+
process_func = getattr(self, func_name)
80+
setattr(self, func_name, _wrap_task_call(process_func))
81+
setattr(self, wrapped_func, process_func)
82+
return getfullargspec(process_func)
83+
84+
setattr(_inspect, USED_FUNC, True)
85+
return _inspect
86+
87+
88+
def _wrap_task_call(func):
89+
"""
90+
Wrap task call with a try catch to get exceptions.
91+
Pass the client on to raise_exception so it can get rebinded.
92+
"""
93+
client = Hub.current.client
94+
95+
@wraps(func)
96+
def _inner(*args, **kwargs):
97+
try:
98+
gen = func(*args, **kwargs)
99+
except Exception:
100+
raise_exception(client)
101+
102+
if not isinstance(gen, types.GeneratorType):
103+
return gen
104+
return _wrap_generator_call(gen, client)
105+
106+
setattr(_inner, USED_FUNC, True)
107+
return _inner
108+
109+
110+
def _capture_exception(exc_info, hub):
111+
"""
112+
Send Beam exception to Sentry.
113+
"""
114+
integration = hub.get_integration(BeamIntegration)
115+
if integration:
116+
client = hub.client
117+
event, hint = event_from_exception(
118+
exc_info,
119+
client_options=client.options,
120+
mechanism={"type": "beam", "handled": False},
121+
)
122+
hub.capture_event(event, hint=hint)
123+
124+
125+
def raise_exception(client):
126+
"""
127+
Raise an exception. If the client is not in the hub, rebind it.
128+
"""
129+
hub = Hub.current
130+
if hub.client is None:
131+
hub.bind_client(client)
132+
exc_info = sys.exc_info()
133+
with capture_internal_exceptions():
134+
_capture_exception(exc_info, hub)
135+
reraise(*exc_info)
136+
137+
138+
def _wrap_generator_call(gen, client):
139+
"""
140+
Wrap the generator to handle any failures.
141+
"""
142+
while True:
143+
try:
144+
yield next(gen)
145+
except StopIteration:
146+
break
147+
except Exception:
148+
raise_exception(client)

tests/integrations/beam/test_beam.py

+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
import pytest
2+
import inspect
3+
4+
pytest.importorskip("apache_beam")
5+
6+
import dill
7+
8+
from sentry_sdk.integrations.beam import (
9+
BeamIntegration,
10+
_wrap_task_call,
11+
_wrap_inspect_call,
12+
)
13+
14+
from apache_beam.typehints.trivial_inference import instance_to_type
15+
from apache_beam.typehints.decorators import getcallargs_forhints
16+
from apache_beam.transforms.core import DoFn, ParDo, _DoFnParam, CallableWrapperDoFn
17+
from apache_beam.runners.common import DoFnInvoker, OutputProcessor, DoFnContext
18+
from apache_beam.utils.windowed_value import WindowedValue
19+
20+
21+
def foo():
22+
return True
23+
24+
25+
def bar(x, y):
26+
# print(x + y)
27+
return True
28+
29+
30+
def baz(x, y=2):
31+
# print(x + y)
32+
return True
33+
34+
35+
class A:
36+
def __init__(self, fn):
37+
self.r = "We are in A"
38+
self.fn = fn
39+
setattr(self, "_inspect_fn", _wrap_inspect_call(self, "fn"))
40+
41+
def process(self):
42+
return self.fn()
43+
44+
45+
class B(A, object):
46+
def fa(self, x, element=False, another_element=False):
47+
if x or (element and not another_element):
48+
# print(self.r)
49+
return True
50+
1 / 0
51+
return False
52+
53+
def __init__(self):
54+
self.r = "We are in B"
55+
super(B, self).__init__(self.fa)
56+
57+
58+
class SimpleFunc(DoFn):
59+
def process(self, x):
60+
if x:
61+
1 / 0
62+
return [True]
63+
64+
65+
class PlaceHolderFunc(DoFn):
66+
def process(self, x, timestamp=DoFn.TimestampParam, wx=DoFn.WindowParam):
67+
if isinstance(timestamp, _DoFnParam) or isinstance(wx, _DoFnParam):
68+
raise Exception("Bad instance")
69+
if x:
70+
1 / 0
71+
yield True
72+
73+
74+
def fail(x):
75+
if x:
76+
1 / 0
77+
return [True]
78+
79+
80+
test_parent = A(foo)
81+
test_child = B()
82+
test_simple = SimpleFunc()
83+
test_place_holder = PlaceHolderFunc()
84+
test_callable = CallableWrapperDoFn(fail)
85+
86+
87+
# Cannot call simple functions or placeholder test.
88+
@pytest.mark.parametrize(
89+
"obj,f,args,kwargs",
90+
[
91+
[test_parent, "fn", (), {}],
92+
[test_child, "fn", (False,), {"element": True}],
93+
[test_child, "fn", (True,), {}],
94+
[test_simple, "process", (False,), {}],
95+
[test_callable, "process", (False,), {}],
96+
],
97+
)
98+
def test_monkey_patch_call(obj, f, args, kwargs):
99+
func = getattr(obj, f)
100+
101+
assert func(*args, **kwargs)
102+
assert _wrap_task_call(func)(*args, **kwargs)
103+
104+
105+
@pytest.mark.parametrize("f", [foo, bar, baz, test_parent.fn, test_child.fn])
106+
def test_monkey_patch_pickle(f):
107+
f_temp = _wrap_task_call(f)
108+
assert dill.pickles(f_temp), "{} is not pickling correctly!".format(f)
109+
110+
# Pickle everything
111+
s1 = dill.dumps(f_temp)
112+
s2 = dill.loads(s1)
113+
dill.dumps(s2)
114+
115+
116+
@pytest.mark.parametrize(
117+
"f,args,kwargs",
118+
[
119+
[foo, (), {}],
120+
[bar, (1, 5), {}],
121+
[baz, (1,), {}],
122+
[test_parent.fn, (), {}],
123+
[test_child.fn, (False,), {"element": True}],
124+
[test_child.fn, (True,), {}],
125+
],
126+
)
127+
def test_monkey_patch_signature(f, args, kwargs):
128+
arg_types = [instance_to_type(v) for v in args]
129+
kwargs_types = {k: instance_to_type(v) for (k, v) in kwargs.items()}
130+
f_temp = _wrap_task_call(f)
131+
try:
132+
getcallargs_forhints(f, *arg_types, **kwargs_types)
133+
except Exception:
134+
print("Failed on {} with parameters {}, {}".format(f, args, kwargs))
135+
raise
136+
try:
137+
getcallargs_forhints(f_temp, *arg_types, **kwargs_types)
138+
except Exception:
139+
print("Failed on {} with parameters {}, {}".format(f_temp, args, kwargs))
140+
raise
141+
try:
142+
expected_signature = inspect.signature(f)
143+
test_signature = inspect.signature(f_temp)
144+
assert (
145+
expected_signature == test_signature
146+
), "Failed on {}, signature {} does not match {}".format(
147+
f, expected_signature, test_signature
148+
)
149+
except Exception:
150+
# expected to pass for py2.7
151+
pass
152+
153+
154+
class _OutputProcessor(OutputProcessor):
155+
def process_outputs(self, windowed_input_element, results):
156+
print(windowed_input_element)
157+
try:
158+
for result in results:
159+
assert result
160+
except StopIteration:
161+
print("In here")
162+
163+
164+
@pytest.fixture
165+
def init_beam(sentry_init):
166+
def inner(fn):
167+
sentry_init(default_integrations=False, integrations=[BeamIntegration()])
168+
# Little hack to avoid having to run the whole pipeline.
169+
pardo = ParDo(fn)
170+
signature = pardo._signature
171+
output_processor = _OutputProcessor()
172+
return DoFnInvoker.create_invoker(
173+
signature, output_processor, DoFnContext("test")
174+
)
175+
176+
return inner
177+
178+
179+
@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder])
180+
def test_invoker_normal(init_beam, fn):
181+
invoker = init_beam(fn)
182+
print("Normal testing {} with {} invoker.".format(fn, invoker))
183+
windowed_value = WindowedValue(False, 0, [None])
184+
invoker.invoke_process(windowed_value)
185+
186+
187+
@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder])
188+
def test_invoker_exception(init_beam, capture_events, capture_exceptions, fn):
189+
invoker = init_beam(fn)
190+
events = capture_events()
191+
192+
print("Exception testing {} with {} invoker.".format(fn, invoker))
193+
# Window value will always have one value for the process to run.
194+
windowed_value = WindowedValue(True, 0, [None])
195+
try:
196+
invoker.invoke_process(windowed_value)
197+
except Exception:
198+
pass
199+
200+
event, = events
201+
exception, = event["exception"]["values"]
202+
assert exception["type"] == "ZeroDivisionError"
203+
assert exception["mechanism"]["type"] == "beam"

tox.ini

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ envlist =
3232
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.1,4.2,4.3}
3333
{pypy,py2.7}-celery-3
3434

35+
{py2.7,py3.6}-beam-{12,13,master}
36+
py3.7-beam-{12,13}
37+
3538
# The aws_lambda tests deploy to the real AWS and have their own matrix of Python versions.
3639
py3.7-aws_lambda
3740

@@ -93,6 +96,10 @@ deps =
9396
{py3.5,py3.6}-sanic: aiocontextvars==0.2.1
9497
sanic: aiohttp
9598

99+
beam-12: apache-beam>=2.12.0, <2.13.0
100+
beam-13: apache-beam>=2.13.0, <2.14.0
101+
beam-master: git+https://github.com/apache/beam#egg=apache-beam&subdirectory=sdks/python
102+
96103
celery-3: Celery>=3.1,<4.0
97104
celery-4.1: Celery>=4.1,<4.2
98105
celery-4.2: Celery>=4.2,<4.3
@@ -154,6 +161,7 @@ deps =
154161
setenv =
155162
PYTHONDONTWRITEBYTECODE=1
156163
TESTPATH=tests
164+
beam: TESTPATH=tests/integrations/beam
157165
django: TESTPATH=tests/integrations/django
158166
flask: TESTPATH=tests/integrations/flask
159167
bottle: TESTPATH=tests/integrations/bottle

0 commit comments

Comments
 (0)