diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index b49a55f7..6b901bef 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -31,18 +31,16 @@ class BatchEventProcessor < EventProcessor DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds DEFAULT_QUEUE_CAPACITY = 1000 DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds - MAX_NIL_COUNT = 3 - FLUSH_SIGNAL = 'FLUSH_SIGNAL' SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL' def initialize( - event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), - event_dispatcher: nil, - batch_size: DEFAULT_BATCH_SIZE, - flush_interval: DEFAULT_BATCH_INTERVAL, - logger: NoOpLogger.new, - notification_center: nil + event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), + event_dispatcher: nil, + batch_size: DEFAULT_BATCH_SIZE, + flush_interval: DEFAULT_BATCH_INTERVAL, + logger: NoOpLogger.new, + notification_center: nil ) @event_queue = event_queue @logger = logger @@ -108,44 +106,30 @@ def stop! private def run - # if we receive a number of item nils that reach MAX_NIL_COUNT, - # then we hang on the pop via setting use_pop to false - @nil_count = 0 - # hang on pop if true - @use_pop = false loop do if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline @logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.') flush_queue! @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval - @use_pop = true if @nil_count > MAX_NIL_COUNT end - item = @event_queue.pop if @event_queue.length.positive? || @use_pop + if @event_queue.length.positive? + @logger.log(Logger::DEBUG, 'Getting item.') + item = @event_queue.pop - if item.nil? - # when nil count is greater than MAX_NIL_COUNT, we hang on the pop until there is an item available. - # this avoids to much spinning of the loop. - @nil_count += 1 - next - end + if item == SHUTDOWN_SIGNAL + @logger.log(Logger::DEBUG, 'Received shutdown signal.') + break + end - # reset nil_count and use_pop if we have received an item. - @nil_count = 0 - @use_pop = false + if item == FLUSH_SIGNAL + @logger.log(Logger::DEBUG, 'Received flush signal.') + flush_queue! + next + end - if item == SHUTDOWN_SIGNAL - @logger.log(Logger::DEBUG, 'Received shutdown signal.') - break - end - - if item == FLUSH_SIGNAL - @logger.log(Logger::DEBUG, 'Received flush signal.') - flush_queue! - next + add_to_batch(item) if item.is_a? Optimizely::UserEvent end - - add_to_batch(item) if item.is_a? Optimizely::UserEvent end rescue SignalException @logger.log(Logger::ERROR, 'Interrupted while processing buffer.') @@ -153,8 +137,8 @@ def run @logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}") ensure @logger.log( - Logger::INFO, - 'Exiting processing loop. Attempting to flush pending events.' + Logger::INFO, + 'Exiting processing loop. Attempting to flush pending events.' ) flush_queue! end @@ -165,14 +149,14 @@ def flush_queue! log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger) begin @logger.log( - Logger::INFO, - 'Flushing Queue.' + Logger::INFO, + 'Flushing Queue.' ) @event_dispatcher.dispatch_event(log_event) @notification_center&.send_notifications( - NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], - log_event + NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event ) rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") @@ -183,9 +167,7 @@ def flush_queue! def add_to_batch(user_event) if should_split?(user_event) flush_queue! - @current_batch = [] end - # Reset the deadline if starting a new batch. @flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?