From d88232812445052981188ff025973b4be2de6eac Mon Sep 17 00:00:00 2001 From: Mike Ng Date: Fri, 6 Dec 2019 10:24:11 -0800 Subject: [PATCH 1/3] fix: Sleep the thread when no events are in the queue. --- lib/optimizely.rb | 1 + lib/optimizely/event/batch_event_processor.rb | 17 +++-------------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/lib/optimizely.rb b/lib/optimizely.rb index 6863256f..c0f24db2 100644 --- a/lib/optimizely.rb +++ b/lib/optimizely.rb @@ -104,6 +104,7 @@ def initialize( else ForwardingEventProcessor.new(@event_dispatcher, @logger, @notification_center) end + @logger.log(Logger::INFO, "------------>>> THIS IS LOCALLY") end # Buckets visitor and sends impression event to Optimizely. diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index b49a55f7..8b9e8b47 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -108,32 +108,21 @@ def stop! private def run - # if we receive a number of item nils that reach MAX_NIL_COUNT, - # then we hang on the pop via setting use_pop to false - @nil_count = 0 - # hang on pop if true - @use_pop = false loop do if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline @logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.') flush_queue! + @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval - @use_pop = true if @nil_count > MAX_NIL_COUNT end - item = @event_queue.pop if @event_queue.length.positive? || @use_pop + item = @event_queue.pop if @event_queue.length.positive? if item.nil? - # when nil count is greater than MAX_NIL_COUNT, we hang on the pop until there is an item available. - # this avoids to much spinning of the loop. - @nil_count += 1 + sleep(@flush_interval / 1000) next end - # reset nil_count and use_pop if we have received an item. - @nil_count = 0 - @use_pop = false - if item == SHUTDOWN_SIGNAL @logger.log(Logger::DEBUG, 'Received shutdown signal.') break From 022fa1001635c5a5c2d25b3ee22d168cfb8a67de Mon Sep 17 00:00:00 2001 From: Mike Ng Date: Fri, 6 Dec 2019 10:24:58 -0800 Subject: [PATCH 2/3] remove test log message. --- lib/optimizely.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/optimizely.rb b/lib/optimizely.rb index c0f24db2..6863256f 100644 --- a/lib/optimizely.rb +++ b/lib/optimizely.rb @@ -104,7 +104,6 @@ def initialize( else ForwardingEventProcessor.new(@event_dispatcher, @logger, @notification_center) end - @logger.log(Logger::INFO, "------------>>> THIS IS LOCALLY") end # Buckets visitor and sends impression event to Optimizely. From 2a16ed34e9d6ab1178194abc047c6bd2171e83dd Mon Sep 17 00:00:00 2001 From: Mike Ng Date: Fri, 6 Dec 2019 12:06:49 -0800 Subject: [PATCH 3/3] use impl. --- lib/optimizely/event/batch_event_processor.rb | 57 ++++++++----------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 8b9e8b47..6b901bef 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -31,18 +31,16 @@ class BatchEventProcessor < EventProcessor DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds DEFAULT_QUEUE_CAPACITY = 1000 DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds - MAX_NIL_COUNT = 3 - FLUSH_SIGNAL = 'FLUSH_SIGNAL' SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL' def initialize( - event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), - event_dispatcher: nil, - batch_size: DEFAULT_BATCH_SIZE, - flush_interval: DEFAULT_BATCH_INTERVAL, - logger: NoOpLogger.new, - notification_center: nil + event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), + event_dispatcher: nil, + batch_size: DEFAULT_BATCH_SIZE, + flush_interval: DEFAULT_BATCH_INTERVAL, + logger: NoOpLogger.new, + notification_center: nil ) @event_queue = event_queue @logger = logger @@ -112,29 +110,26 @@ def run if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline @logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.') flush_queue! - @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval end - item = @event_queue.pop if @event_queue.length.positive? + if @event_queue.length.positive? + @logger.log(Logger::DEBUG, 'Getting item.') + item = @event_queue.pop - if item.nil? - sleep(@flush_interval / 1000) - next - end + if item == SHUTDOWN_SIGNAL + @logger.log(Logger::DEBUG, 'Received shutdown signal.') + break + end - if item == SHUTDOWN_SIGNAL - @logger.log(Logger::DEBUG, 'Received shutdown signal.') - break - end + if item == FLUSH_SIGNAL + @logger.log(Logger::DEBUG, 'Received flush signal.') + flush_queue! + next + end - if item == FLUSH_SIGNAL - @logger.log(Logger::DEBUG, 'Received flush signal.') - flush_queue! - next + add_to_batch(item) if item.is_a? Optimizely::UserEvent end - - add_to_batch(item) if item.is_a? Optimizely::UserEvent end rescue SignalException @logger.log(Logger::ERROR, 'Interrupted while processing buffer.') @@ -142,8 +137,8 @@ def run @logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}") ensure @logger.log( - Logger::INFO, - 'Exiting processing loop. Attempting to flush pending events.' + Logger::INFO, + 'Exiting processing loop. Attempting to flush pending events.' ) flush_queue! end @@ -154,14 +149,14 @@ def flush_queue! log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger) begin @logger.log( - Logger::INFO, - 'Flushing Queue.' + Logger::INFO, + 'Flushing Queue.' ) @event_dispatcher.dispatch_event(log_event) @notification_center&.send_notifications( - NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], - log_event + NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event ) rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") @@ -172,9 +167,7 @@ def flush_queue! def add_to_batch(user_event) if should_split?(user_event) flush_queue! - @current_batch = [] end - # Reset the deadline if starting a new batch. @flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?