From 9350e74cb14671802bde327e1c8b6df1eeb42d7e Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Fri, 13 Dec 2019 17:44:30 -0800 Subject: [PATCH 1/7] add more debug logging --- optimizely/event/event_processor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 3f82a7fe..255d6497 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -222,7 +222,7 @@ def flush(self): def _flush_queue(self): """ Flushes event_queue by dispatching events. """ - + self.logger.debug('Flushing the queue.') if len(self._current_batch) == 0: return @@ -267,6 +267,7 @@ def _add_to_batch(self, user_event): user_event: UserEvent Instance. """ if self._should_split(user_event): + self.logger.debug('Flush on split.') self._flush_queue() self._current_batch = list() @@ -277,6 +278,7 @@ def _add_to_batch(self, user_event): with self.LOCK: self._current_batch.append(user_event) if len(self._current_batch) >= self.batch_size: + self.logger.debug('Flushing on batch size.') self._flush_queue() def _should_split(self, user_event): From 9c0cca8e745f1819a7eb850ddc8c711e9f442e38 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Fri, 13 Dec 2019 17:59:07 -0800 Subject: [PATCH 2/7] take out in seconds. already in seconds --- optimizely/event/event_processor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 255d6497..0c31e911 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -169,7 +169,7 @@ def start(self): self.logger.warning('BatchEventProcessor already started.') return - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval) self.executor = threading.Thread(target=self._run) self.executor.setDaemon(True) self.executor.start() @@ -183,7 +183,7 @@ def _run(self): if self._get_time() >= self.flushing_interval_deadline: self._flush_queue() self.flushing_interval_deadline = self._get_time() + \ - self._get_time(self.flush_interval.total_seconds()) + self._get_time(self.flush_interval) self.logger.debug('Flush interval deadline. Flushed queue.') try: @@ -273,7 +273,7 @@ def _add_to_batch(self, user_event): # Reset the deadline if starting a new batch. if len(self._current_batch) == 0: - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval) with self.LOCK: self._current_batch.append(user_event) @@ -312,7 +312,7 @@ def stop(self): self.logger.warning('Stopping Scheduler.') if self.executor: - self.executor.join(self.timeout_interval.total_seconds()) + self.executor.join(self.timeout_interval) if self.is_running: self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') From a2fe99599372a0e74918459e54c534382983a777 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Fri, 13 Dec 2019 18:02:29 -0800 Subject: [PATCH 3/7] Revert "take out in seconds. already in seconds" This reverts commit 9c0cca8e745f1819a7eb850ddc8c711e9f442e38. --- optimizely/event/event_processor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 0c31e911..255d6497 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -169,7 +169,7 @@ def start(self): self.logger.warning('BatchEventProcessor already started.') return - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval) + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) self.executor = threading.Thread(target=self._run) self.executor.setDaemon(True) self.executor.start() @@ -183,7 +183,7 @@ def _run(self): if self._get_time() >= self.flushing_interval_deadline: self._flush_queue() self.flushing_interval_deadline = self._get_time() + \ - self._get_time(self.flush_interval) + self._get_time(self.flush_interval.total_seconds()) self.logger.debug('Flush interval deadline. Flushed queue.') try: @@ -273,7 +273,7 @@ def _add_to_batch(self, user_event): # Reset the deadline if starting a new batch. if len(self._current_batch) == 0: - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval) + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) with self.LOCK: self._current_batch.append(user_event) @@ -312,7 +312,7 @@ def stop(self): self.logger.warning('Stopping Scheduler.') if self.executor: - self.executor.join(self.timeout_interval) + self.executor.join(self.timeout_interval.total_seconds()) if self.is_running: self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') From b776edc3a3450f6d4116987f2370fc7638acb424 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Sun, 15 Dec 2019 16:42:26 -0800 Subject: [PATCH 4/7] update logging to log when batch is empty on flush or flush of batch size --- optimizely/event/event_processor.py | 7 +++++-- tests/test_event_processor.py | 5 +++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 255d6497..2053a3ce 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -222,10 +222,13 @@ def flush(self): def _flush_queue(self): """ Flushes event_queue by dispatching events. """ - self.logger.debug('Flushing the queue.') - if len(self._current_batch) == 0: + batch_len = len(self._current_batch) + if batch_len == 0: + self.logger.debug('Nothing to flush.') return + self.logger.debug('Flushing batch size ' + str(batch_len)) + with self.LOCK: to_process_batch = list(self._current_batch) self._current_batch = list() diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index a8a954f4..9b9c77fb 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -179,7 +179,7 @@ def test_flush_once_max_timeout(self): self.optimizely.logger = SimpleLogger(enums.LogLevels.DEBUG) with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._set_event_processor(event_dispatcher, mock_config_logging) + self._set_event_processor(event_dispatcher, self.optimizely.logger) user_event = self._build_conversion_event(self.event_name) self.event_processor.process(user_event) @@ -192,7 +192,8 @@ def test_flush_once_max_timeout(self): self.assertTrue(mock_config_logging.debug.called) mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.') mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.') - self.assertTrue(mock_config_logging.debug.call_count == 2) + mock_config_logging.debug.assert_any_call('Flushing batch size 1') + self.assertTrue(mock_config_logging.debug.call_count == 3) self.optimizely.logger = SimpleLogger() def test_flush_max_batch_size(self): From 027f277e851531584cbc74c0f47d495d57a4420c Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Sun, 15 Dec 2019 17:07:50 -0800 Subject: [PATCH 5/7] use mock logger --- tests/test_event_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 9b9c77fb..43aee710 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -179,7 +179,7 @@ def test_flush_once_max_timeout(self): self.optimizely.logger = SimpleLogger(enums.LogLevels.DEBUG) with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._set_event_processor(event_dispatcher, self.optimizely.logger) + self._set_event_processor(event_dispatcher, mock_config_logging) user_event = self._build_conversion_event(self.event_name) self.event_processor.process(user_event) From ee08eb8104d6036b0a52bf837d81a1dd06e89fab Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Mon, 16 Dec 2019 09:12:23 -0800 Subject: [PATCH 6/7] rename flush_queue to flush_batch and update debug messages. fix one bug where current_batch was being reset without a lock. --- optimizely/event/event_processor.py | 17 ++++++++--------- tests/test_event_processor.py | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 2053a3ce..172fcb42 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -181,10 +181,10 @@ def _run(self): try: while True: if self._get_time() >= self.flushing_interval_deadline: - self._flush_queue() + self._flush_batch() self.flushing_interval_deadline = self._get_time() + \ self._get_time(self.flush_interval.total_seconds()) - self.logger.debug('Flush interval deadline. Flushed queue.') + self.logger.debug('Flush interval deadline. Flushed batch.') try: interval = self.flushing_interval_deadline - self._get_time() @@ -202,7 +202,7 @@ def _run(self): if item == self._FLUSH_SIGNAL: self.logger.debug('Received flush signal.') - self._flush_queue() + self._flush_batch() continue if isinstance(item, UserEvent): @@ -213,14 +213,14 @@ def _run(self): finally: self.logger.info('Exiting processing loop. Attempting to flush pending events.') - self._flush_queue() + self._flush_batch() def flush(self): """ Adds flush signal to event_queue. """ self.event_queue.put(self._FLUSH_SIGNAL) - def _flush_queue(self): + def _flush_batch(self): """ Flushes event_queue by dispatching events. """ batch_len = len(self._current_batch) if batch_len == 0: @@ -270,9 +270,8 @@ def _add_to_batch(self, user_event): user_event: UserEvent Instance. """ if self._should_split(user_event): - self.logger.debug('Flush on split.') - self._flush_queue() - self._current_batch = list() + self.logger.debug('Flushing batch on split.') + self._flush_batch() # Reset the deadline if starting a new batch. if len(self._current_batch) == 0: @@ -282,7 +281,7 @@ def _add_to_batch(self, user_event): self._current_batch.append(user_event) if len(self._current_batch) >= self.batch_size: self.logger.debug('Flushing on batch size.') - self._flush_queue() + self._flush_batch() def _should_split(self, user_event): """ Method to check if current event batch should split into two. diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 43aee710..40b28467 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -191,8 +191,8 @@ def test_flush_once_max_timeout(self): self.assertEqual(0, self.event_processor.event_queue.qsize()) self.assertTrue(mock_config_logging.debug.called) mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.') - mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.') mock_config_logging.debug.assert_any_call('Flushing batch size 1') + mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed batch.') self.assertTrue(mock_config_logging.debug.call_count == 3) self.optimizely.logger = SimpleLogger() From 0c9adbaa8999e3fcc0fbe856b0aea6b1bb158453 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Mon, 16 Dec 2019 09:24:56 -0800 Subject: [PATCH 7/7] cleanup incorrect comments --- optimizely/event/event_processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 172fcb42..dac1faa5 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -175,8 +175,8 @@ def start(self): self.executor.start() def _run(self): - """ Triggered as part of the thread which batches events or flushes event_queue and sleeps - periodically if queue is empty. + """ Triggered as part of the thread which batches events or flushes event_queue and hangs on get + for flush interval if queue is empty. """ try: while True: @@ -221,7 +221,7 @@ def flush(self): self.event_queue.put(self._FLUSH_SIGNAL) def _flush_batch(self): - """ Flushes event_queue by dispatching events. """ + """ Flushes current batch by dispatching event. """ batch_len = len(self._current_batch) if batch_len == 0: self.logger.debug('Nothing to flush.')