Skip to content

Commit ef86358

Browse files
refactor batch processor
1 parent 5ed5d94 commit ef86358

File tree

1 file changed

+12
-33
lines changed

1 file changed

+12
-33
lines changed

lib/optimizely/event/batch_event_processor.rb

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,17 @@ def initialize(
6262
@notification_center = notification_center
6363
@current_batch = []
6464
@started = false
65-
start!
65+
@task = Concurrent::TimerTask.new(execution_interval: @flush_interval, timeout_interval: DEFAULT_TIMEOUT_INTERVAL) do
66+
flush_queue!
67+
end
6668
end
6769

6870
def start!
6971
if @started == true
7072
@logger.log(Logger::WARN, 'Service already started.')
7173
return
7274
end
73-
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
74-
@logger.log(Logger::INFO, 'Starting scheduler.')
75+
@logger.log(Logger::INFO, 'Starting event queue processing.')
7576
@thread = Thread.new { run }
7677
@started = true
7778
end
@@ -83,13 +84,10 @@ def flush
8384
def process(user_event)
8485
@logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")
8586

86-
if !@started || !@thread.alive?
87-
@logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
88-
return
89-
end
90-
9187
begin
9288
@event_queue.push(user_event, true)
89+
@process_count += 1
90+
start! if @event_queue.length.positive? && (@event_queue.length % @batch_size).zero?
9391
rescue Exception
9492
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
9593
return
@@ -103,41 +101,26 @@ def stop!
103101
@event_queue << SHUTDOWN_SIGNAL
104102
@thread.join(DEFAULT_TIMEOUT_INTERVAL)
105103
@started = false
104+
@task.shutdown
105+
flush_queue!
106106
end
107107

108108
private
109109

110110
def run
111-
# if we receive a number of item nils that reach MAX_NIL_COUNT,
112-
# then we hang on the pop via setting use_pop to false
113-
@nil_count = 0
114-
# hang on pop if true
115-
@use_pop = false
116-
loop do
117-
if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline
118-
@logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.')
119-
flush_queue!
120-
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
121-
@use_pop = true if @nil_count > MAX_NIL_COUNT
122-
end
111+
while @event_queue.length.positive?
123112

124113
@logger.log(Logger::DEBUG, 'Getting item.')
125114
@logger.log(Logger::DEBUG, ' use pop is equal to ' + @use_pop.to_s)
126-
item = @event_queue.pop if @event_queue.length.positive? || @use_pop
115+
item = @event_queue.pop
127116
@logger.log(Logger::DEBUG, 'Should hang. ' + item.to_s) if @use_pop
128117
@logger.log(Logger::DEBUG, 'Got item. ' + item.to_s)
129118

130119
if item.nil?
131-
# when nil count is greater than MAX_NIL_COUNT, we hang on the pop until there is an item available.
132-
# this avoids to much spinning of the loop.
133-
@nil_count += 1
120+
@logger.log(Logger::WARNING, 'Something went wrong. Got back nil item from event queue. ')
134121
next
135122
end
136123

137-
# reset nil_count and use_pop if we have received an item.
138-
@nil_count = 0
139-
@use_pop = false
140-
141124
if item == SHUTDOWN_SIGNAL
142125
@logger.log(Logger::DEBUG, 'Received shutdown signal.')
143126
break
@@ -160,13 +143,13 @@ def run
160143
Logger::INFO,
161144
'Exiting processing loop. Attempting to flush pending events.'
162145
)
163-
flush_queue!
164146
end
165147

166148
def flush_queue!
167149
return if @current_batch.empty?
168150

169151
log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
152+
@current_batch = []
170153
begin
171154
@logger.log(
172155
Logger::INFO,
@@ -181,7 +164,6 @@ def flush_queue!
181164
rescue StandardError => e
182165
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
183166
end
184-
@current_batch = []
185167
end
186168

187169
def add_to_batch(user_event)
@@ -190,9 +172,6 @@ def add_to_batch(user_event)
190172
@current_batch = []
191173
end
192174

193-
# Reset the deadline if starting a new batch.
194-
@flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?
195-
196175
@logger.log(Logger::DEBUG, "Adding user event: #{user_event} to batch.")
197176
@current_batch << user_event
198177
return unless @current_batch.length >= @batch_size

0 commit comments

Comments
 (0)