|
| 1 | +import pytest |
| 2 | +import dill |
| 3 | +import inspect |
| 4 | + |
| 5 | +pytest.importorskip("apache_beam") |
| 6 | + |
| 7 | +from sentry_sdk.integrations.beam import ( |
| 8 | + BeamIntegration, |
| 9 | + _wrap_task_call, |
| 10 | + patched_inspect_process, |
| 11 | +) |
| 12 | + |
| 13 | +from apache_beam.typehints.trivial_inference import instance_to_type |
| 14 | +from apache_beam.typehints.decorators import getcallargs_forhints |
| 15 | +from apache_beam.transforms.core import DoFn, ParDo, _DoFnParam, CallableWrapperDoFn |
| 16 | +from apache_beam.runners.common import DoFnInvoker, OutputProcessor, DoFnContext |
| 17 | +from apache_beam.utils.windowed_value import WindowedValue |
| 18 | + |
| 19 | + |
| 20 | +def foo(): |
| 21 | + return True |
| 22 | + |
| 23 | + |
| 24 | +def bar(x, y): |
| 25 | + # print(x + y) |
| 26 | + return True |
| 27 | + |
| 28 | + |
| 29 | +def baz(x, y=2): |
| 30 | + # print(x + y) |
| 31 | + return True |
| 32 | + |
| 33 | + |
| 34 | +class A: |
| 35 | + def __init__(self, fn): |
| 36 | + self.r = "We are in A" |
| 37 | + self.fn = fn |
| 38 | + setattr(self, "_inspect_fn", patched_inspect_process(self, "fn")) |
| 39 | + |
| 40 | + def process(self): |
| 41 | + return self.fn() |
| 42 | + |
| 43 | + |
| 44 | +class B(A, object): |
| 45 | + def fa(self, x, element=False, another_element=False): |
| 46 | + if x or (element and not another_element): |
| 47 | + # print(self.r) |
| 48 | + return True |
| 49 | + 1 / 0 |
| 50 | + return False |
| 51 | + |
| 52 | + def __init__(self): |
| 53 | + self.r = "We are in B" |
| 54 | + super(B, self).__init__(self.fa) |
| 55 | + |
| 56 | + |
| 57 | +class SimpleFunc(DoFn): |
| 58 | + def process(self, x): |
| 59 | + if x: |
| 60 | + 1 / 0 |
| 61 | + return [True] |
| 62 | + |
| 63 | + |
| 64 | +class PlaceHolderFunc(DoFn): |
| 65 | + def process(self, x, timestamp=DoFn.TimestampParam, wx=DoFn.WindowParam): |
| 66 | + if isinstance(timestamp, _DoFnParam) or isinstance(wx, _DoFnParam): |
| 67 | + raise Exception("Bad instance") |
| 68 | + if x: |
| 69 | + 1 / 0 |
| 70 | + yield True |
| 71 | + |
| 72 | + |
| 73 | +def fail(x): |
| 74 | + if x: |
| 75 | + 1 / 0 |
| 76 | + return [True] |
| 77 | + |
| 78 | + |
| 79 | +test_parent = A(foo) |
| 80 | +test_child = B() |
| 81 | +test_simple = SimpleFunc() |
| 82 | +test_place_holder = PlaceHolderFunc() |
| 83 | +test_callable = CallableWrapperDoFn(fail) |
| 84 | + |
| 85 | + |
| 86 | +# Cannot call simple functions or placeholder test. |
| 87 | +@pytest.mark.parametrize( |
| 88 | + "obj,f,args,kwargs", |
| 89 | + [ |
| 90 | + [test_parent, "fn", (), {}], |
| 91 | + [test_child, "fn", (False,), {"element": True}], |
| 92 | + [test_child, "fn", (True,), {}], |
| 93 | + [test_simple, "process", (False,), {}], |
| 94 | + [test_callable, "process", (False,), {}], |
| 95 | + ], |
| 96 | +) |
| 97 | +def test_monkey_patch_call(obj, f, args, kwargs): |
| 98 | + func = getattr(obj, f) |
| 99 | + |
| 100 | + assert func(*args, **kwargs) |
| 101 | + assert _wrap_task_call(func)(*args, **kwargs) |
| 102 | + |
| 103 | + |
| 104 | +@pytest.mark.parametrize("f", [foo, bar, baz, test_parent.fn, test_child.fn]) |
| 105 | +def test_monkey_patch_pickle(f): |
| 106 | + f_temp = _wrap_task_call(f) |
| 107 | + assert dill.pickles(f_temp), "{} is not pickling correctly!".format(f) |
| 108 | + |
| 109 | + # Pickle everything |
| 110 | + s1 = dill.dumps(f_temp) |
| 111 | + s2 = dill.loads(s1) |
| 112 | + dill.dumps(s2) |
| 113 | + |
| 114 | + |
| 115 | +@pytest.mark.parametrize( |
| 116 | + "f,args,kwargs", |
| 117 | + [ |
| 118 | + [foo, (), {}], |
| 119 | + [bar, (1, 5), {}], |
| 120 | + [baz, (1,), {}], |
| 121 | + [test_parent.fn, (), {}], |
| 122 | + [test_child.fn, (False,), {"element": True}], |
| 123 | + [test_child.fn, (True,), {}], |
| 124 | + ], |
| 125 | +) |
| 126 | +def test_monkey_patch_signature(f, args, kwargs): |
| 127 | + arg_types = [instance_to_type(v) for v in args] |
| 128 | + kwargs_types = {k: instance_to_type(v) for (k, v) in kwargs.items()} |
| 129 | + f_temp = _wrap_task_call(f) |
| 130 | + try: |
| 131 | + getcallargs_forhints(f, *arg_types, **kwargs_types) |
| 132 | + except Exception: |
| 133 | + print("Failed on {} with parameters {}, {}".format(f, args, kwargs)) |
| 134 | + raise |
| 135 | + try: |
| 136 | + getcallargs_forhints(f_temp, *arg_types, **kwargs_types) |
| 137 | + except Exception: |
| 138 | + print("Failed on {} with parameters {}, {}".format(f_temp, args, kwargs)) |
| 139 | + raise |
| 140 | + try: |
| 141 | + expected_signature = inspect.signature(f) |
| 142 | + test_signature = inspect.signature(f_temp) |
| 143 | + assert ( |
| 144 | + expected_signature == test_signature |
| 145 | + ), "Failed on {}, signature {} does not match {}".format( |
| 146 | + f, expected_signature, test_signature |
| 147 | + ) |
| 148 | + except Exception: |
| 149 | + # expected to pass for py2.7 |
| 150 | + pass |
| 151 | + |
| 152 | + |
| 153 | +class _OutputProcessor(OutputProcessor): |
| 154 | + def process_outputs(self, windowed_input_element, results): |
| 155 | + print(windowed_input_element) |
| 156 | + for result in results: |
| 157 | + assert result |
| 158 | + |
| 159 | + |
| 160 | +@pytest.fixture |
| 161 | +def init_beam(sentry_init): |
| 162 | + def inner(fn): |
| 163 | + sentry_init(default_integrations=False, integrations=[BeamIntegration()]) |
| 164 | + # Little hack to avoid having to run the whole pipeline. |
| 165 | + pardo = ParDo(fn) |
| 166 | + signature = pardo._signature |
| 167 | + output_processor = _OutputProcessor() |
| 168 | + return DoFnInvoker.create_invoker( |
| 169 | + signature, output_processor, DoFnContext("test") |
| 170 | + ) |
| 171 | + |
| 172 | + return inner |
| 173 | + |
| 174 | + |
| 175 | +@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder]) |
| 176 | +def test_invoker_normal(init_beam, fn): |
| 177 | + invoker = init_beam(fn) |
| 178 | + print("Normal testing {} with {} invoker.".format(fn, invoker)) |
| 179 | + windowed_value = WindowedValue(False, 0, [None]) |
| 180 | + invoker.invoke_process(windowed_value) |
| 181 | + |
| 182 | + |
| 183 | +@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder]) |
| 184 | +def test_invoker_exception(init_beam, capture_events, capture_exceptions, fn): |
| 185 | + invoker = init_beam(fn) |
| 186 | + events = capture_events() |
| 187 | + |
| 188 | + print("Exception testing {} with {} invoker.".format(fn, invoker)) |
| 189 | + # Window value will always have one value for the process to run. |
| 190 | + windowed_value = WindowedValue(True, 0, [None]) |
| 191 | + try: |
| 192 | + invoker.invoke_process(windowed_value) |
| 193 | + except Exception: |
| 194 | + pass |
| 195 | + |
| 196 | + event, = events |
| 197 | + exception, = event["exception"]["values"] |
| 198 | + assert exception["type"] == "ZeroDivisionError" |
| 199 | + assert exception["mechanism"]["type"] == "beam" |
0 commit comments