Skip to content

Commit 5fbbe0f

Browse files
author
Osmar Coronel
committed
Created Beam Integration
1 parent 05229f1 commit 5fbbe0f

File tree

4 files changed

+355
-0
lines changed

4 files changed

+355
-0
lines changed

.travis.yml

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ branches:
1818
only:
1919
- master
2020
- /^release\/.+$/
21+
- beam-integration
2122

2223
matrix:
2324
allow_failures:

sentry_sdk/integrations/beam.py

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
import types
5+
from functools import wraps
6+
7+
from sentry_sdk.hub import Hub
8+
from sentry_sdk._compat import reraise
9+
from sentry_sdk.utils import capture_internal_exceptions, event_from_exception
10+
from sentry_sdk.integrations import Integration
11+
from sentry_sdk.integrations.logging import ignore_logger
12+
13+
WRAPPED_FUNC = "_wrapped_{}_"
14+
INSPECT_FUNC = "_inspect_{}"
15+
16+
17+
class BeamIntegration(Integration):
18+
identifier = "beam"
19+
20+
@staticmethod
21+
def setup_once():
22+
# type: () -> None
23+
from apache_beam.transforms.core import DoFn, ParDo # type: ignore
24+
25+
ignore_logger("root")
26+
ignore_logger("bundle_processor.create")
27+
28+
function_patches = ["process", "start_bundle", "finish_bundle", "setup"]
29+
for func_name in function_patches:
30+
setattr(
31+
DoFn,
32+
INSPECT_FUNC.format(func_name),
33+
patched_inspect_process(DoFn, func_name),
34+
)
35+
36+
old_init = ParDo.__init__
37+
38+
def sentry_init_pardo(self, fn, *args, **kwargs):
39+
# Do not monkey patch init twice
40+
if not getattr(self, "_sentry_is_patched", False):
41+
for func_name in function_patches:
42+
if not hasattr(fn, func_name):
43+
continue
44+
wrapped_func = WRAPPED_FUNC.format(func_name)
45+
46+
# Check to see if inspect is set and process is not
47+
# to avoid monkey patching process twice.
48+
# Check to see if function is part of object for
49+
# backwards compatibility.
50+
process_func = getattr(fn, func_name)
51+
inspect_func = getattr(fn, INSPECT_FUNC.format(func_name))
52+
if not getattr(inspect_func, "__used__", False) and not getattr(
53+
process_func, "__used__", False
54+
):
55+
setattr(fn, wrapped_func, process_func)
56+
setattr(fn, func_name, _wrap_task_call(process_func))
57+
58+
self._sentry_is_patched = True
59+
old_init(self, fn, *args, **kwargs)
60+
61+
ParDo.__init__ = sentry_init_pardo
62+
63+
64+
def patched_inspect_process(cls, func_name):
65+
from apache_beam.typehints.decorators import getfullargspec # type: ignore
66+
67+
if not hasattr(cls, func_name):
68+
return None
69+
70+
def _inspect(self):
71+
"""
72+
Inspect function overrides the way Beam gets argspec.
73+
"""
74+
wrapped_func = WRAPPED_FUNC.format(func_name)
75+
if hasattr(self, wrapped_func):
76+
process_func = getattr(self, wrapped_func)
77+
else:
78+
process_func = getattr(self, func_name)
79+
setattr(self, func_name, _wrap_task_call(process_func))
80+
setattr(self, wrapped_func, process_func)
81+
return getfullargspec(process_func)
82+
83+
setattr(_inspect, "__used__", True)
84+
return _inspect
85+
86+
87+
def _wrap_task_call(func):
88+
"""
89+
Wrap task call with a try catch to get exceptions.
90+
Pass the client on to raise_exception so it can get rebinded.
91+
"""
92+
client = Hub.current.client
93+
94+
@wraps(func)
95+
def _inner(*args, **kwargs):
96+
try:
97+
gen = func(*args, **kwargs)
98+
except Exception:
99+
raise_exception(client)
100+
101+
if not isinstance(gen, types.GeneratorType):
102+
return gen
103+
return _wrap_generator_call(gen, client)
104+
105+
setattr(_inner, "__used__", True)
106+
return _inner
107+
108+
109+
def _capture_exception(exc_info, hub):
110+
"""
111+
Send Beam exception to Sentry
112+
"""
113+
integration = hub.get_integration(BeamIntegration)
114+
if integration:
115+
client = hub.client
116+
event, hint = event_from_exception(
117+
exc_info,
118+
client_options=client.options,
119+
mechanism={"type": "beam", "handled": False},
120+
)
121+
hub.capture_event(event, hint=hint)
122+
123+
124+
def raise_exception(client):
125+
"""
126+
Raise an exception. If the client is not in the hub, rebind it.
127+
"""
128+
hub = Hub.current
129+
if hub.client is None:
130+
hub.bind_client(client)
131+
exc_info = sys.exc_info()
132+
with capture_internal_exceptions():
133+
_capture_exception(exc_info, hub)
134+
reraise(*exc_info)
135+
136+
137+
def _wrap_generator_call(gen, client):
138+
"""
139+
Wrap the generator to handle any failures.
140+
"""
141+
while True:
142+
try:
143+
yield next(gen)
144+
except StopIteration:
145+
raise
146+
except Exception:
147+
raise_exception(client)

tests/integrations/beam/test_beam.py

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import pytest
2+
import dill
3+
import inspect
4+
5+
pytest.importorskip("apache_beam")
6+
7+
from sentry_sdk.integrations.beam import (
8+
BeamIntegration,
9+
_wrap_task_call,
10+
patched_inspect_process,
11+
)
12+
13+
from apache_beam.typehints.trivial_inference import instance_to_type
14+
from apache_beam.typehints.decorators import getcallargs_forhints
15+
from apache_beam.transforms.core import DoFn, ParDo, _DoFnParam, CallableWrapperDoFn
16+
from apache_beam.runners.common import DoFnInvoker, OutputProcessor, DoFnContext
17+
from apache_beam.utils.windowed_value import WindowedValue
18+
19+
20+
def foo():
21+
return True
22+
23+
24+
def bar(x, y):
25+
# print(x + y)
26+
return True
27+
28+
29+
def baz(x, y=2):
30+
# print(x + y)
31+
return True
32+
33+
34+
class A:
35+
def __init__(self, fn):
36+
self.r = "We are in A"
37+
self.fn = fn
38+
setattr(self, "_inspect_fn", patched_inspect_process(self, "fn"))
39+
40+
def process(self):
41+
return self.fn()
42+
43+
44+
class B(A, object):
45+
def fa(self, x, element=False, another_element=False):
46+
if x or (element and not another_element):
47+
# print(self.r)
48+
return True
49+
1 / 0
50+
return False
51+
52+
def __init__(self):
53+
self.r = "We are in B"
54+
super(B, self).__init__(self.fa)
55+
56+
57+
class SimpleFunc(DoFn):
58+
def process(self, x):
59+
if x:
60+
1 / 0
61+
return [True]
62+
63+
64+
class PlaceHolderFunc(DoFn):
65+
def process(self, x, timestamp=DoFn.TimestampParam, wx=DoFn.WindowParam):
66+
if isinstance(timestamp, _DoFnParam) or isinstance(wx, _DoFnParam):
67+
raise Exception("Bad instance")
68+
if x:
69+
1 / 0
70+
yield True
71+
72+
73+
def fail(x):
74+
if x:
75+
1 / 0
76+
return [True]
77+
78+
79+
test_parent = A(foo)
80+
test_child = B()
81+
test_simple = SimpleFunc()
82+
test_place_holder = PlaceHolderFunc()
83+
test_callable = CallableWrapperDoFn(fail)
84+
85+
86+
# Cannot call simple functions or placeholder test.
87+
@pytest.mark.parametrize(
88+
"obj,f,args,kwargs",
89+
[
90+
[test_parent, "fn", (), {}],
91+
[test_child, "fn", (False,), {"element": True}],
92+
[test_child, "fn", (True,), {}],
93+
[test_simple, "process", (False,), {}],
94+
[test_callable, "process", (False,), {}],
95+
],
96+
)
97+
def test_monkey_patch_call(obj, f, args, kwargs):
98+
func = getattr(obj, f)
99+
100+
assert func(*args, **kwargs)
101+
assert _wrap_task_call(func)(*args, **kwargs)
102+
103+
104+
@pytest.mark.parametrize("f", [foo, bar, baz, test_parent.fn, test_child.fn])
105+
def test_monkey_patch_pickle(f):
106+
f_temp = _wrap_task_call(f)
107+
assert dill.pickles(f_temp), "{} is not pickling correctly!".format(f)
108+
109+
# Pickle everything
110+
s1 = dill.dumps(f_temp)
111+
s2 = dill.loads(s1)
112+
dill.dumps(s2)
113+
114+
115+
@pytest.mark.parametrize(
116+
"f,args,kwargs",
117+
[
118+
[foo, (), {}],
119+
[bar, (1, 5), {}],
120+
[baz, (1,), {}],
121+
[test_parent.fn, (), {}],
122+
[test_child.fn, (False,), {"element": True}],
123+
[test_child.fn, (True,), {}],
124+
],
125+
)
126+
def test_monkey_patch_signature(f, args, kwargs):
127+
arg_types = [instance_to_type(v) for v in args]
128+
kwargs_types = {k: instance_to_type(v) for (k, v) in kwargs.items()}
129+
f_temp = _wrap_task_call(f)
130+
try:
131+
getcallargs_forhints(f, *arg_types, **kwargs_types)
132+
except Exception:
133+
print("Failed on {} with parameters {}, {}".format(f, args, kwargs))
134+
raise
135+
try:
136+
getcallargs_forhints(f_temp, *arg_types, **kwargs_types)
137+
except Exception:
138+
print("Failed on {} with parameters {}, {}".format(f_temp, args, kwargs))
139+
raise
140+
try:
141+
expected_signature = inspect.signature(f)
142+
test_signature = inspect.signature(f_temp)
143+
assert (
144+
expected_signature == test_signature
145+
), "Failed on {}, signature {} does not match {}".format(
146+
f, expected_signature, test_signature
147+
)
148+
except Exception:
149+
# expected to pass for py2.7
150+
pass
151+
152+
153+
class _OutputProcessor(OutputProcessor):
154+
def process_outputs(self, windowed_input_element, results):
155+
print(windowed_input_element)
156+
for result in results:
157+
assert result
158+
159+
160+
@pytest.fixture
161+
def init_beam(sentry_init):
162+
def inner(fn):
163+
sentry_init(default_integrations=False, integrations=[BeamIntegration()])
164+
# Little hack to avoid having to run the whole pipeline.
165+
pardo = ParDo(fn)
166+
signature = pardo._signature
167+
output_processor = _OutputProcessor()
168+
return DoFnInvoker.create_invoker(
169+
signature, output_processor, DoFnContext("test")
170+
)
171+
172+
return inner
173+
174+
175+
@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder])
176+
def test_invoker_normal(init_beam, fn):
177+
invoker = init_beam(fn)
178+
print("Normal testing {} with {} invoker.".format(fn, invoker))
179+
windowed_value = WindowedValue(False, 0, [None])
180+
invoker.invoke_process(windowed_value)
181+
182+
183+
@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder])
184+
def test_invoker_exception(init_beam, capture_events, capture_exceptions, fn):
185+
invoker = init_beam(fn)
186+
events = capture_events()
187+
188+
print("Exception testing {} with {} invoker.".format(fn, invoker))
189+
# Window value will always have one value for the process to run.
190+
windowed_value = WindowedValue(True, 0, [None])
191+
try:
192+
invoker.invoke_process(windowed_value)
193+
except Exception:
194+
pass
195+
196+
event, = events
197+
exception, = event["exception"]["values"]
198+
assert exception["type"] == "ZeroDivisionError"
199+
assert exception["mechanism"]["type"] == "beam"

tox.ini

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ envlist =
3232
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.1,4.2,4.3}
3333
{pypy,py2.7}-celery-3
3434

35+
py2.7-beam-{12,13,master}
36+
{py3.6,py3.7,py3.8}-beam-{12,13,master,dev}
37+
3538
# The aws_lambda tests deploy to the real AWS and have their own matrix of Python versions.
3639
py3.7-aws_lambda
3740

@@ -89,6 +92,10 @@ deps =
8992
{py3.5,py3.6}-sanic: aiocontextvars==0.2.1
9093
sanic: aiohttp
9194

95+
beam-12: apache-beam>=2.12.0, <2.13.0
96+
beam-13: apache-beam>=2.13.0, <2.14.0
97+
beam-master: git+https://github.com/apache/beam#egg=apache-beam&subdirectory=sdks/python
98+
9299
celery-3: Celery>=3.1,<4.0
93100
celery-4.1: Celery>=4.1,<4.2
94101
celery-4.2: Celery>=4.2,<4.3
@@ -146,6 +153,7 @@ deps =
146153
setenv =
147154
PYTHONDONTWRITEBYTECODE=1
148155
TESTPATH=tests
156+
beam: TESTPATH=tests/integrations/beam
149157
django: TESTPATH=tests/integrations/django
150158
flask: TESTPATH=tests/integrations/flask
151159
bottle: TESTPATH=tests/integrations/bottle

0 commit comments

Comments
 (0)