@@ -62,16 +62,17 @@ def initialize(
62
62
@notification_center = notification_center
63
63
@current_batch = [ ]
64
64
@started = false
65
- start!
65
+ @task = Concurrent ::TimerTask . new ( execution_interval : @flush_interval , timeout_interval : DEFAULT_TIMEOUT_INTERVAL ) do
66
+ flush_queue!
67
+ end
66
68
end
67
69
68
70
def start!
69
71
if @started == true
70
72
@logger . log ( Logger ::WARN , 'Service already started.' )
71
73
return
72
74
end
73
- @flushing_interval_deadline = Helpers ::DateTimeUtils . create_timestamp + @flush_interval
74
- @logger . log ( Logger ::INFO , 'Starting scheduler.' )
75
+ @logger . log ( Logger ::INFO , 'Starting event queue processing.' )
75
76
@thread = Thread . new { run }
76
77
@started = true
77
78
end
@@ -83,13 +84,10 @@ def flush
83
84
def process ( user_event )
84
85
@logger . log ( Logger ::DEBUG , "Received userEvent: #{ user_event } " )
85
86
86
- if !@started || !@thread . alive?
87
- @logger . log ( Logger ::WARN , 'Executor shutdown, not accepting tasks.' )
88
- return
89
- end
90
-
91
87
begin
92
88
@event_queue . push ( user_event , true )
89
+ @process_count += 1
90
+ start! if @event_queue . length . positive? && ( @event_queue . length % @batch_size ) . zero?
93
91
rescue Exception
94
92
@logger . log ( Logger ::WARN , 'Payload not accepted by the queue.' )
95
93
return
@@ -103,41 +101,26 @@ def stop!
103
101
@event_queue << SHUTDOWN_SIGNAL
104
102
@thread . join ( DEFAULT_TIMEOUT_INTERVAL )
105
103
@started = false
104
+ @task . shutdown
105
+ flush_queue!
106
106
end
107
107
108
108
private
109
109
110
110
def run
111
- # if we receive a number of item nils that reach MAX_NIL_COUNT,
112
- # then we hang on the pop via setting use_pop to false
113
- @nil_count = 0
114
- # hang on pop if true
115
- @use_pop = false
116
- loop do
117
- if Helpers ::DateTimeUtils . create_timestamp >= @flushing_interval_deadline
118
- @logger . log ( Logger ::DEBUG , 'Deadline exceeded flushing current batch.' )
119
- flush_queue!
120
- @flushing_interval_deadline = Helpers ::DateTimeUtils . create_timestamp + @flush_interval
121
- @use_pop = true if @nil_count > MAX_NIL_COUNT
122
- end
111
+ while @event_queue . length . positive?
123
112
124
113
@logger . log ( Logger ::DEBUG , 'Getting item.' )
125
114
@logger . log ( Logger ::DEBUG , ' use pop is equal to ' + @use_pop . to_s )
126
- item = @event_queue . pop if @event_queue . length . positive? || @use_pop
115
+ item = @event_queue . pop
127
116
@logger . log ( Logger ::DEBUG , 'Should hang. ' + item . to_s ) if @use_pop
128
117
@logger . log ( Logger ::DEBUG , 'Got item. ' + item . to_s )
129
118
130
119
if item . nil?
131
- # when nil count is greater than MAX_NIL_COUNT, we hang on the pop until there is an item available.
132
- # this avoids to much spinning of the loop.
133
- @nil_count += 1
120
+ @logger . log ( Logger ::WARNING , 'Something went wrong. Got back nil item from event queue. ' )
134
121
next
135
122
end
136
123
137
- # reset nil_count and use_pop if we have received an item.
138
- @nil_count = 0
139
- @use_pop = false
140
-
141
124
if item == SHUTDOWN_SIGNAL
142
125
@logger . log ( Logger ::DEBUG , 'Received shutdown signal.' )
143
126
break
@@ -160,13 +143,13 @@ def run
160
143
Logger ::INFO ,
161
144
'Exiting processing loop. Attempting to flush pending events.'
162
145
)
163
- flush_queue!
164
146
end
165
147
166
148
def flush_queue!
167
149
return if @current_batch . empty?
168
150
169
151
log_event = Optimizely ::EventFactory . create_log_event ( @current_batch , @logger )
152
+ @current_batch = [ ]
170
153
begin
171
154
@logger . log (
172
155
Logger ::INFO ,
@@ -181,7 +164,6 @@ def flush_queue!
181
164
rescue StandardError => e
182
165
@logger . log ( Logger ::ERROR , "Error dispatching event: #{ log_event } #{ e . message } ." )
183
166
end
184
- @current_batch = [ ]
185
167
end
186
168
187
169
def add_to_batch ( user_event )
@@ -190,9 +172,6 @@ def add_to_batch(user_event)
190
172
@current_batch = [ ]
191
173
end
192
174
193
- # Reset the deadline if starting a new batch.
194
- @flushing_interval_deadline = ( Helpers ::DateTimeUtils . create_timestamp + @flush_interval ) if @current_batch . empty?
195
-
196
175
@logger . log ( Logger ::DEBUG , "Adding user event: #{ user_event } to batch." )
197
176
@current_batch << user_event
198
177
return unless @current_batch . length >= @batch_size
0 commit comments