Skip to content

Commit af7fb6f

Browse files
use mutex and resource to wait for flush interval or be signalled when there is something to process
1 parent b1f60dc commit af7fb6f

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

lib/optimizely/event/batch_event_processor.rb

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,20 @@ def start!
7171
end
7272
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
7373
@logger.log(Logger::INFO, 'Starting scheduler.')
74+
if @wait_mutex.nil?
75+
@wait_mutex = Mutex.new
76+
@resource = ConditionVariable.new
77+
end
7478
@thread = Thread.new { run_queue }
7579
@started = true
7680
@stopped = false
7781
end
7882

7983
def flush
8084
@event_queue << FLUSH_SIGNAL
85+
@wait_mutex.synchronize {
86+
@resource.signal
87+
}
8188
end
8289

8390
def process(user_event)
@@ -94,6 +101,9 @@ def process(user_event)
94101

95102
begin
96103
@event_queue.push(user_event, true)
104+
@wait_mutex.synchronize {
105+
@resource.signal
106+
}
97107
rescue => e
98108
@logger.log(Logger::WARN, 'Payload not accepted by the queue: ' + e.message)
99109
return
@@ -105,6 +115,9 @@ def stop!
105115

106116
@logger.log(Logger::INFO, 'Stopping scheduler.')
107117
@event_queue << SHUTDOWN_SIGNAL
118+
@wait_mutex.synchronize {
119+
@resource.signal
120+
}
108121
@thread.join(DEFAULT_TIMEOUT_INTERVAL)
109122
@started = false
110123
@stopped = true
@@ -144,15 +157,13 @@ def run_queue
144157

145158
break unless process_queue
146159

147-
# what is the current interval to flush
148-
interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp)
149-
150-
interval /= 10.0
151-
152-
# convert to seconds from milliseconds
153-
interval *= 0.001
154-
155-
sleep interval if interval.positive?
160+
# what is the current interval to flush in seconds
161+
interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) * 0.001
162+
if interval.positive?
163+
@wait_mutex.synchronize {
164+
@resource.wait(@wait_mutex, interval)
165+
}
166+
end
156167
end
157168
rescue SignalException
158169
@logger.log(Logger::ERROR, 'Interrupted while processing buffer.')

spec/event/batch_event_processor_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@
344344
)
345345

346346
user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
347-
11.times do
347+
12.times do
348348
@event_processor.process(user_event)
349349
end
350350

0 commit comments

Comments
 (0)