diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 3f82a7fe..dac1faa5 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -175,16 +175,16 @@ def start(self): self.executor.start() def _run(self): - """ Triggered as part of the thread which batches events or flushes event_queue and sleeps - periodically if queue is empty. + """ Triggered as part of the thread which batches events or flushes event_queue and hangs on get + for flush interval if queue is empty. """ try: while True: if self._get_time() >= self.flushing_interval_deadline: - self._flush_queue() + self._flush_batch() self.flushing_interval_deadline = self._get_time() + \ self._get_time(self.flush_interval.total_seconds()) - self.logger.debug('Flush interval deadline. Flushed queue.') + self.logger.debug('Flush interval deadline. Flushed batch.') try: interval = self.flushing_interval_deadline - self._get_time() @@ -202,7 +202,7 @@ def _run(self): if item == self._FLUSH_SIGNAL: self.logger.debug('Received flush signal.') - self._flush_queue() + self._flush_batch() continue if isinstance(item, UserEvent): @@ -213,19 +213,22 @@ def _run(self): finally: self.logger.info('Exiting processing loop. Attempting to flush pending events.') - self._flush_queue() + self._flush_batch() def flush(self): """ Adds flush signal to event_queue. """ self.event_queue.put(self._FLUSH_SIGNAL) - def _flush_queue(self): - """ Flushes event_queue by dispatching events. """ - - if len(self._current_batch) == 0: + def _flush_batch(self): + """ Flushes current batch by dispatching event. """ + batch_len = len(self._current_batch) + if batch_len == 0: + self.logger.debug('Nothing to flush.') return + self.logger.debug('Flushing batch size ' + str(batch_len)) + with self.LOCK: to_process_batch = list(self._current_batch) self._current_batch = list() @@ -267,8 +270,8 @@ def _add_to_batch(self, user_event): user_event: UserEvent Instance. """ if self._should_split(user_event): - self._flush_queue() - self._current_batch = list() + self.logger.debug('Flushing batch on split.') + self._flush_batch() # Reset the deadline if starting a new batch. if len(self._current_batch) == 0: @@ -277,7 +280,8 @@ def _add_to_batch(self, user_event): with self.LOCK: self._current_batch.append(user_event) if len(self._current_batch) >= self.batch_size: - self._flush_queue() + self.logger.debug('Flushing on batch size.') + self._flush_batch() def _should_split(self, user_event): """ Method to check if current event batch should split into two. diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index a8a954f4..40b28467 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -191,8 +191,9 @@ def test_flush_once_max_timeout(self): self.assertEqual(0, self.event_processor.event_queue.qsize()) self.assertTrue(mock_config_logging.debug.called) mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.') - mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.') - self.assertTrue(mock_config_logging.debug.call_count == 2) + mock_config_logging.debug.assert_any_call('Flushing batch size 1') + mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed batch.') + self.assertTrue(mock_config_logging.debug.call_count == 3) self.optimizely.logger = SimpleLogger() def test_flush_max_batch_size(self):