Skip to content

Commit 3410567

Browse files
refactor for unicorn threads
1 parent c835128 commit 3410567

File tree

1 file changed

+25
-29
lines changed

1 file changed

+25
-29
lines changed

lib/optimizely/event/batch_event_processor.rb

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -107,36 +107,12 @@ def stop!
107107

108108
private
109109

110-
def run
111-
# if we receive a number of item nils that reach MAX_NIL_COUNT,
112-
# then we hang on the pop via setting use_pop to false
113-
@nil_count = 0
114-
# hang on pop if true
115-
@use_pop = false
116-
loop do
117-
if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline
118-
@logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.')
119-
flush_queue!
120-
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
121-
@use_pop = true if @nil_count > MAX_NIL_COUNT
122-
end
123-
124-
item = @event_queue.pop if @event_queue.length.positive? || @use_pop
125-
126-
if item.nil?
127-
# when nil count is greater than MAX_NIL_COUNT, we hang on the pop until there is an item available.
128-
# this avoids to much spinning of the loop.
129-
@nil_count += 1
130-
next
131-
end
132-
133-
# reset nil_count and use_pop if we have received an item.
134-
@nil_count = 0
135-
@use_pop = false
136-
110+
def process_events
111+
while @event_queue.length.positive?
112+
item = @event_queue.pop
137113
if item == SHUTDOWN_SIGNAL
138114
@logger.log(Logger::DEBUG, 'Received shutdown signal.')
139-
break
115+
return false
140116
end
141117

142118
if item == FLUSH_SIGNAL
@@ -147,9 +123,29 @@ def run
147123

148124
add_to_batch(item) if item.is_a? Optimizely::UserEvent
149125
end
126+
return true
127+
end
128+
129+
def run
130+
loop do
131+
if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline
132+
@logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.')
133+
134+
break unless process_events
135+
136+
flush_queue!
137+
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
138+
end
139+
140+
break unless process_events
141+
142+
interval = (Helpers::DateTimeUtils.create_timestamp - @flushing_interval_deadline)/1.0
143+
144+
sleep interval if interval > 0
145+
end
150146
rescue SignalException
151147
@logger.log(Logger::ERROR, 'Interrupted while processing buffer.')
152-
rescue Exception => e
148+
rescue => e
153149
@logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}")
154150
ensure
155151
@logger.log(

0 commit comments

Comments
 (0)