diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index ab63d1e3..02bc9136 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -120,10 +120,11 @@ class Errors: NONE_VARIABLE_KEY_PARAMETER: Final = '"None" is an invalid value for variable key.' UNSUPPORTED_DATAFILE_VERSION: Final = ( 'This version of the Python SDK does not support the given datafile version: "{}".') - INVALID_SEGMENT_IDENTIFIER = 'Audience segments fetch failed (invalid identifier).' - FETCH_SEGMENTS_FAILED = 'Audience segments fetch failed ({}).' - ODP_EVENT_FAILED = 'ODP event send failed ({}).' - ODP_NOT_ENABLED = 'ODP is not enabled. ' + INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).' + FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).' + ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).' + ODP_NOT_ENABLED: Final = 'ODP is not enabled.' + ODP_NOT_INTEGRATED: Final = 'ODP is not integrated.' class ForcedDecisionLogs: @@ -205,3 +206,11 @@ class OdpRestApiConfig: class OdpGraphQLApiConfig: """ODP GraphQL API configs.""" REQUEST_TIMEOUT: Final = 10 + + +class OdpEventManagerConfig: + """ODP Event Manager configs.""" + DEFAULT_QUEUE_CAPACITY: Final = 1000 + DEFAULT_BATCH_SIZE: Final = 10 + DEFAULT_FLUSH_INTERVAL: Final = 1 + DEFAULT_RETRY_COUNT: Final = 3 diff --git a/optimizely/helpers/validator.py b/optimizely/helpers/validator.py index 244337b0..7ffe0422 100644 --- a/optimizely/helpers/validator.py +++ b/optimizely/helpers/validator.py @@ -31,6 +31,7 @@ from optimizely.event.event_processor import BaseEventProcessor from optimizely.helpers.event_tag_utils import EventTags from optimizely.optimizely_user_context import UserAttributes + from optimizely.odp.odp_event import OdpDataDict def is_datafile_valid(datafile: Optional[str | bytes]) -> bool: @@ -306,3 +307,8 @@ def are_values_same_type(first_val: Any, second_val: Any) -> bool: return True return False + + +def are_odp_data_types_valid(data: OdpDataDict) -> bool: + valid_types = (str, int, float, bool, type(None)) + return all(isinstance(v, valid_types) for v in data.values()) diff --git a/optimizely/odp/odp_config.py b/optimizely/odp/odp_config.py index 64809626..17e435dc 100644 --- a/optimizely/odp/odp_config.py +++ b/optimizely/odp/odp_config.py @@ -12,11 +12,19 @@ # limitations under the License. from __future__ import annotations +from enum import Enum from typing import Optional from threading import Lock +class OdpConfigState(Enum): + """State of the ODP integration.""" + UNDETERMINED = 1 + INTEGRATED = 2 + NOT_INTEGRATED = 3 + + class OdpConfig: """ Contains configuration used for ODP integration. @@ -37,6 +45,9 @@ def __init__( self._api_host = api_host self._segments_to_check = segments_to_check or [] self.lock = Lock() + self._odp_state = OdpConfigState.UNDETERMINED + if self._api_host and self._api_key: + self._odp_state = OdpConfigState.INTEGRATED def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_check: list[str]) -> bool: """ @@ -51,8 +62,14 @@ def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_ch Returns: True if the provided values were different than the existing values. """ + updated = False with self.lock: + if api_key and api_host: + self._odp_state = OdpConfigState.INTEGRATED + else: + self._odp_state = OdpConfigState.NOT_INTEGRATED + if self._api_key != api_key or self._api_host != api_host or self._segments_to_check != segments_to_check: self._api_key = api_key self._api_host = api_host @@ -73,7 +90,7 @@ def get_segments_to_check(self) -> list[str]: with self.lock: return self._segments_to_check.copy() - def odp_integrated(self) -> bool: - """Returns True if ODP is integrated.""" + def odp_state(self) -> OdpConfigState: + """Returns the state of ODP integration (UNDETERMINED, INTEGRATED, or NOT_INTEGRATED).""" with self.lock: - return self._api_key is not None and self._api_host is not None + return self._odp_state diff --git a/optimizely/odp/odp_event.py b/optimizely/odp/odp_event.py index ac3e5d93..fafaa94f 100644 --- a/optimizely/odp/odp_event.py +++ b/optimizely/odp/odp_event.py @@ -13,15 +13,47 @@ from __future__ import annotations -from typing import Any +from typing import Any, Union, Dict +import uuid +import json +from optimizely import version + +OdpDataDict = Dict[str, Union[str, int, float, bool, None]] class OdpEvent: """ Representation of an odp event which can be sent to the Optimizely odp platform. """ - def __init__(self, type: str, action: str, - identifiers: dict[str, str], data: dict[str, Any]) -> None: + def __init__(self, type: str, action: str, identifiers: dict[str, str], data: OdpDataDict) -> None: self.type = type self.action = action self.identifiers = identifiers - self.data = data + self.data = self._add_common_event_data(data) + + def __repr__(self) -> str: + return str(self.__dict__) + + def __eq__(self, other: object) -> bool: + if isinstance(other, OdpEvent): + return self.__dict__ == other.__dict__ + elif isinstance(other, dict): + return self.__dict__ == other + else: + return False + + def _add_common_event_data(self, custom_data: OdpDataDict) -> OdpDataDict: + data: OdpDataDict = { + 'idempotence_id': str(uuid.uuid4()), + 'data_source_type': 'sdk', + 'data_source': 'python-sdk', + 'data_source_version': version.__version__ + } + data.update(custom_data) + return data + + +class OdpEventEncoder(json.JSONEncoder): + def default(self, obj: object) -> Any: + if isinstance(obj, OdpEvent): + return obj.__dict__ + return json.JSONEncoder.default(self, obj) diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py new file mode 100644 index 00000000..df02e3ed --- /dev/null +++ b/optimizely/odp/odp_event_manager.py @@ -0,0 +1,238 @@ +# Copyright 2022, Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations +from enum import Enum +from threading import Thread +from typing import Optional +import time +from queue import Empty, Queue, Full + +from optimizely import logger as _logging +from .odp_event import OdpEvent, OdpDataDict +from .odp_config import OdpConfig, OdpConfigState +from .zaius_rest_api_manager import ZaiusRestApiManager +from optimizely.helpers.enums import OdpEventManagerConfig, Errors + + +class Signal(Enum): + """Enum for sending signals to the event queue.""" + SHUTDOWN = 1 + FLUSH = 2 + + +class OdpEventManager: + """ + Class that sends batches of ODP events. + + The OdpEventManager maintains a single consumer thread that pulls events off of + the queue and buffers them before events are sent to ODP. + Sends events when the batch size is met or when the flush timeout has elapsed. + """ + + def __init__( + self, + odp_config: OdpConfig, + logger: Optional[_logging.Logger] = None, + api_manager: Optional[ZaiusRestApiManager] = None + ): + """OdpEventManager init method to configure event batching. + + Args: + odp_config: ODP integration config. + logger: Optional component which provides a log method to log messages. By default nothing would be logged. + api_manager: Optional component which sends events to ODP. + """ + self.logger = logger or _logging.NoOpLogger() + self.zaius_manager = api_manager or ZaiusRestApiManager(self.logger) + self.odp_config = odp_config + self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY) + self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE + self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL + self._flush_deadline: float = 0 + self.retry_count = OdpEventManagerConfig.DEFAULT_RETRY_COUNT + self._current_batch: list[OdpEvent] = [] + """_current_batch should only be modified by the processing thread, as it is not thread safe""" + self.thread = Thread(target=self._run, daemon=True) + self.thread_exception = False + """thread_exception will be True if the processing thread did not exit cleanly""" + + @property + def is_running(self) -> bool: + """Property to check if consumer thread is alive or not.""" + return self.thread.is_alive() + + def start(self) -> None: + """Starts the batch processing thread to batch events.""" + if self.is_running: + self.logger.warning('ODP event queue already started.') + return + + self.thread.start() + + def _run(self) -> None: + """Processes the event queue from a child thread. Events are batched until + the batch size is met or until the flush timeout has elapsed. + """ + try: + while True: + timeout = self._get_queue_timeout() + + try: + item = self.event_queue.get(True, timeout) + except Empty: + item = None + + if item == Signal.SHUTDOWN: + self.logger.debug('ODP event queue: received shutdown signal.') + break + + elif item == Signal.FLUSH: + self.logger.debug('ODP event queue: received flush signal.') + self._flush_batch() + self.event_queue.task_done() + continue + + elif isinstance(item, OdpEvent): + self._add_to_batch(item) + self.event_queue.task_done() + + elif len(self._current_batch) > 0: + self.logger.debug('ODP event queue: flushing on interval.') + self._flush_batch() + + except Exception as exception: + self.thread_exception = True + self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}') + + finally: + self.logger.info('Exiting ODP event processing loop. Attempting to flush pending events.') + self._flush_batch() + if item == Signal.SHUTDOWN: + self.event_queue.task_done() + + def flush(self) -> None: + """Adds flush signal to event_queue.""" + try: + self.event_queue.put_nowait(Signal.FLUSH) + except Full: + self.logger.error("Error flushing ODP event queue") + + def _flush_batch(self) -> None: + """Flushes current batch by dispatching event. + Should only be called by the processing thread.""" + batch_len = len(self._current_batch) + if batch_len == 0: + self.logger.debug('ODP event queue: nothing to flush.') + return + + api_key = self.odp_config.get_api_key() + api_host = self.odp_config.get_api_host() + + if not api_key or not api_host: + self.logger.debug(Errors.ODP_NOT_INTEGRATED) + self._current_batch.clear() + return + + self.logger.debug(f'ODP event queue: flushing batch size {batch_len}.') + should_retry = False + + for i in range(1 + self.retry_count): + try: + should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch) + except Exception as error: + should_retry = False + self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Error: {error} {self._current_batch}')) + + if not should_retry: + break + if i < self.retry_count: + self.logger.debug('Error dispatching ODP events, scheduled to retry.') + + if should_retry: + self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Failed after {i} retries: {self._current_batch}')) + + self._current_batch.clear() + + def _add_to_batch(self, odp_event: OdpEvent) -> None: + """Appends received ODP event to current batch, flushing if batch is greater than batch size. + Should only be called by the processing thread.""" + if not self._current_batch: + self._set_flush_deadline() + + self._current_batch.append(odp_event) + if len(self._current_batch) >= self.batch_size: + self.logger.debug('ODP event queue: flushing on batch size.') + self._flush_batch() + + def _set_flush_deadline(self) -> None: + """Sets time that next flush will occur.""" + self._flush_deadline = time.time() + self.flush_interval + + def _get_time_till_flush(self) -> float: + """Returns seconds until next flush; no less than 0.""" + return max(0, self._flush_deadline - time.time()) + + def _get_queue_timeout(self) -> Optional[float]: + """Returns seconds until next flush or None if current batch is empty.""" + if len(self._current_batch) == 0: + return None + return self._get_time_till_flush() + + def stop(self) -> None: + """Flushes and then stops ODP event queue.""" + try: + self.event_queue.put_nowait(Signal.SHUTDOWN) + except Full: + self.logger.error('Error stopping ODP event queue.') + return + + self.logger.warning('Stopping ODP event queue.') + + if self.is_running: + self.thread.join() + + if len(self._current_batch) > 0: + self.logger.error(Errors.ODP_EVENT_FAILED.format(self._current_batch)) + + if self.is_running: + self.logger.error('Error stopping ODP event queue.') + + def send_event(self, type: str, action: str, identifiers: dict[str, str], data: OdpDataDict) -> None: + """Create OdpEvent and add it to the event queue.""" + odp_state = self.odp_config.odp_state() + if odp_state == OdpConfigState.UNDETERMINED: + self.logger.debug('ODP event queue: cannot send before the datafile has loaded.') + return + + if odp_state == OdpConfigState.NOT_INTEGRATED: + self.logger.debug(Errors.ODP_NOT_INTEGRATED) + return + + self.dispatch(OdpEvent(type, action, identifiers, data)) + + def dispatch(self, event: OdpEvent) -> None: + """Add OdpEvent to the event queue.""" + if self.thread_exception: + self.logger.error(Errors.ODP_EVENT_FAILED.format('Queue is down')) + return + + if not self.is_running: + self.logger.warning('ODP event queue is shutdown, not accepting events.') + return + + try: + self.logger.debug('ODP event queue: adding event.') + self.event_queue.put_nowait(event) + except Full: + self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full")) diff --git a/optimizely/odp/zaius_rest_api_manager.py b/optimizely/odp/zaius_rest_api_manager.py index 9cbe2638..62f7c1c7 100644 --- a/optimizely/odp/zaius_rest_api_manager.py +++ b/optimizely/odp/zaius_rest_api_manager.py @@ -21,7 +21,7 @@ from optimizely import logger as optimizely_logger from optimizely.helpers.enums import Errors, OdpRestApiConfig -from optimizely.odp.odp_event import OdpEvent +from optimizely.odp.odp_event import OdpEvent, OdpEventEncoder """ ODP REST Events API @@ -60,7 +60,7 @@ def send_odp_events(self, api_key: str, api_host: str, events: list[OdpEvent]) - request_headers = {'content-type': 'application/json', 'x-api-key': api_key} try: - payload_dict = json.dumps(events) + payload_dict = json.dumps(events, cls=OdpEventEncoder) except TypeError as err: self.logger.error(Errors.ODP_EVENT_FAILED.format(err)) return should_retry diff --git a/tests/base.py b/tests/base.py index d4aeae8e..6e74e3aa 100644 --- a/tests/base.py +++ b/tests/base.py @@ -14,12 +14,25 @@ import json import unittest from typing import Optional +from copy import deepcopy +from unittest import mock from requests import Response from optimizely import optimizely +class CopyingMock(mock.MagicMock): + """ + Forces mock to make a copy of the args instead of keeping a reference. + Otherwise mutable args (lists, dicts) can change after they're captured. + """ + def __call__(self, *args, **kwargs): + args = deepcopy(args) + kwargs = deepcopy(kwargs) + return super().__call__(*args, **kwargs) + + class BaseTest(unittest.TestCase): def assertStrictTrue(self, to_assert): self.assertIs(to_assert, True) diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py new file mode 100644 index 00000000..ffbab40d --- /dev/null +++ b/tests/test_odp_event_manager.py @@ -0,0 +1,515 @@ +# Copyright 2022, Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from unittest import mock +from copy import deepcopy +import uuid + +from optimizely.odp.odp_event import OdpEvent +from optimizely.odp.odp_event_manager import OdpEventManager +from optimizely.odp.odp_config import OdpConfig +from .base import BaseTest, CopyingMock +from optimizely.version import __version__ +from optimizely.helpers import validator +from optimizely.helpers.enums import Errors + + +class MockOdpEventManager(OdpEventManager): + def _add_to_batch(self, *args): + raise Exception("Unexpected error") + + +TEST_UUID = str(uuid.uuid4()) + + +@mock.patch('uuid.uuid4', return_value=TEST_UUID, new=mock.DEFAULT) +class OdpEventManagerTest(BaseTest): + user_key = "vuid" + user_value = "test-user-value" + api_key = "test-api-key" + api_host = "https://test-host.com" + odp_config = OdpConfig(api_key, api_host) + + events = [ + { + "type": "t1", + "action": "a1", + "identifiers": {"id-key-1": "id-value-1"}, + "data": {"key-1": "value1", "key-2": 2, "key-3": 3.0, "key-4": None, 'key-5': True} + }, + { + "type": "t2", + "action": "a2", + "identifiers": {"id-key-2": "id-value-2"}, + "data": {"key-2": "value2"} + } + ] + + processed_events = [ + { + "type": "t1", + "action": "a1", + "identifiers": {"id-key-1": "id-value-1"}, + "data": { + "idempotence_id": TEST_UUID, + "data_source_type": "sdk", + "data_source": "python-sdk", + "data_source_version": __version__, + "key-1": "value1", + "key-2": 2, + "key-3": 3.0, + "key-4": None, + "key-5": True + } + }, + { + "type": "t2", + "action": "a2", + "identifiers": {"id-key-2": "id-value-2"}, + "data": { + "idempotence_id": TEST_UUID, + "data_source_type": "sdk", + "data_source": "python-sdk", + "data_source_version": __version__, + "key-2": "value2" + } + } + ] + + def test_odp_event_init(self, *args): + event = self.events[0] + self.assertStrictTrue(validator.are_odp_data_types_valid(event['data'])) + odp_event = OdpEvent(**event) + self.assertEqual(odp_event, self.processed_events[0]) + + def test_invalid_odp_event(self, *args): + event = deepcopy(self.events[0]) + event['data']['invalid-item'] = {} + self.assertStrictFalse(validator.are_odp_data_types_valid(event['data'])) + + def test_odp_event_manager_success(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch('requests.post', return_value=self.fake_server_response(status_code=200)): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.stop() + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('ODP event queue: flushing batch size 2.') + mock_logger.debug.assert_any_call('ODP event queue: received shutdown signal.') + self.assertStrictFalse(event_manager.is_running) + + def test_odp_event_manager_batch(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + event_manager.batch_size = 2 + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('ODP event queue: flushing on batch size.') + event_manager.stop() + + def test_odp_event_manager_multiple_batches(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + event_manager.batch_size = 2 + batch_count = 4 + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + for _ in range(batch_count): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + self.assertEqual(mock_send.call_count, batch_count) + mock_send.assert_has_calls( + [mock.call(self.api_key, self.api_host, self.processed_events)] * batch_count + ) + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('ODP event queue: flushing on batch size.'), + mock.call('ODP event queue: flushing batch size 2.') + ] * batch_count, any_order=True) + event_manager.stop() + + def test_odp_event_manager_backlog(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + + event_manager.batch_size = 2 + batch_count = 4 + + # create events before starting processing to simulate backlog + with mock.patch('optimizely.odp.odp_event_manager.OdpEventManager.is_running', True): + for _ in range(batch_count - 1): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.start() + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.stop() + event_manager.event_queue.join() + + self.assertEqual(mock_send.call_count, batch_count) + mock_send.assert_has_calls( + [mock.call(self.api_key, self.api_host, self.processed_events)] * batch_count + ) + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('ODP event queue: flushing on batch size.'), + mock.call('ODP event queue: flushing batch size 2.') + ] * batch_count, any_order=True) + + def test_odp_event_manager_flush(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + mock_logger.error.assert_not_called() + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_any_call('ODP event queue: received flush signal.') + event_manager.stop() + + def test_odp_event_manager_multiple_flushes(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + flush_count = 4 + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + for _ in range(flush_count): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + self.assertEqual(mock_send.call_count, flush_count) + for call in mock_send.call_args_list: + self.assertEqual(call, mock.call(self.api_key, self.api_host, self.processed_events)) + mock_logger.error.assert_not_called() + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_has_calls([ + mock.call('ODP event queue: received flush signal.'), + mock.call('ODP event queue: flushing batch size 2.') + ] * flush_count, any_order=True) + event_manager.stop() + + def test_odp_event_manager_retry_failure(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + number_of_tries = event_manager.retry_count + 1 + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=True + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_has_calls( + [mock.call(self.api_key, self.api_host, self.processed_events)] * number_of_tries + ) + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.') + mock_logger.error.assert_called_once_with( + f'ODP event send failed (Failed after 3 retries: {self.processed_events}).' + ) + event_manager.stop() + + def test_odp_event_manager_retry_success(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, side_effect=[True, True, False] + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_has_calls([mock.call(self.api_key, self.api_host, self.processed_events)] * 3) + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.') + mock_logger.error.assert_not_called() + self.assertStrictTrue(event_manager.is_running) + event_manager.stop() + + def test_odp_event_manager_send_failure(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, + 'send_odp_events', + new_callable=CopyingMock, + side_effect=Exception('Unexpected error') + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_any_call(f"ODP event send failed (Error: Unexpected error {self.processed_events}).") + self.assertStrictTrue(event_manager.is_running) + event_manager.stop() + + def test_odp_event_manager_disabled(self, *args): + mock_logger = mock.Mock() + odp_config = OdpConfig() + odp_config.update(None, None, None) + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.start() + + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call(Errors.ODP_NOT_INTEGRATED) + self.assertStrictTrue(event_manager.is_running) + event_manager.stop() + + def test_odp_event_manager_queue_full(self, *args): + mock_logger = mock.Mock() + + with mock.patch('optimizely.helpers.enums.OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY', 1): + event_manager = OdpEventManager(self.odp_config, mock_logger) + + with mock.patch('optimizely.odp.odp_event_manager.OdpEventManager.is_running', True): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + + # warning when adding event to full queue + mock_logger.warning.assert_called_once_with('ODP event send failed (Queue is full).') + # error when trying to flush with full queue + mock_logger.error.assert_called_once_with('Error flushing ODP event queue') + + def test_odp_event_manager_thread_exception(self, *args): + mock_logger = mock.Mock() + event_manager = MockOdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + event_manager.send_event(**self.events[0]) + time.sleep(.1) + event_manager.send_event(**self.events[0]) + + event_manager.thread.join() + mock_logger.error.assert_has_calls([ + mock.call('Uncaught exception processing ODP events. Error: Unexpected error'), + mock.call('ODP event send failed (Queue is down).') + ]) + event_manager.stop() + + def test_odp_event_manager_override_default_data(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + event = deepcopy(self.events[0]) + event['data']['data_source'] = 'my-app' + + processed_event = deepcopy(self.processed_events[0]) + processed_event['data']['data_source'] = 'my-app' + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.send_event(**event) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, [processed_event]) + event_manager.stop() + + def test_odp_event_manager_flush_timeout(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.flush_interval = .5 + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + time.sleep(1) + + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('ODP event queue: flushing on interval.') + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + event_manager.stop() + + def test_odp_event_manager_events_before_odp_ready(self, *args): + mock_logger = mock.Mock() + odp_config = OdpConfig() + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + odp_config.update(self.api_key, self.api_host, []) + event_manager.event_queue.join() + + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + + event_manager.event_queue.join() + + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('ODP event queue: cannot send before the datafile has loaded.'), + mock.call('ODP event queue: cannot send before the datafile has loaded.'), + mock.call('ODP event queue: adding event.'), + mock.call('ODP event queue: adding event.'), + mock.call('ODP event queue: received flush signal.'), + mock.call('ODP event queue: flushing batch size 2.') + ]) + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + event_manager.stop() + + def test_odp_event_manager_events_before_odp_disabled(self, *args): + mock_logger = mock.Mock() + odp_config = OdpConfig() + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.start() + + with mock.patch.object(event_manager.zaius_manager, 'send_odp_events') as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + odp_config.update(None, None, []) + event_manager.event_queue.join() + + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + event_manager.event_queue.join() + + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('ODP event queue: cannot send before the datafile has loaded.'), + mock.call('ODP event queue: cannot send before the datafile has loaded.'), + mock.call(Errors.ODP_NOT_INTEGRATED), + mock.call(Errors.ODP_NOT_INTEGRATED) + ]) + self.assertEqual(len(event_manager._current_batch), 0) + mock_send.assert_not_called() + event_manager.stop() + + def test_odp_event_manager_disabled_after_init(self, *args): + mock_logger = mock.Mock() + odp_config = OdpConfig(self.api_key, self.api_host) + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.start() + event_manager.batch_size = 2 + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + odp_config.update(None, None, []) + + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + event_manager.event_queue.join() + + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('ODP event queue: flushing batch size 2.'), + mock.call(Errors.ODP_NOT_INTEGRATED), + mock.call(Errors.ODP_NOT_INTEGRATED) + ]) + self.assertEqual(len(event_manager._current_batch), 0) + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + event_manager.stop() + + def test_odp_event_manager_disabled_after_events_in_queue(self, *args): + mock_logger = mock.Mock() + odp_config = OdpConfig(self.api_key, self.api_host) + + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.batch_size = 2 + + with mock.patch('optimizely.odp.odp_event_manager.OdpEventManager.is_running', True): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + with mock.patch.object(event_manager.zaius_manager, 'send_odp_events') as mock_send: + odp_config.update(None, None, []) + event_manager.start() + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_any_call(Errors.ODP_NOT_INTEGRATED) + mock_logger.error.assert_not_called() + mock_send.assert_not_called() + event_manager.stop()