@@ -31,7 +31,6 @@ class BatchEventProcessor < EventProcessor
31
31
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
32
32
DEFAULT_QUEUE_CAPACITY = 1000
33
33
DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds
34
- MAX_NIL_COUNT = 3
35
34
36
35
FLUSH_SIGNAL = 'FLUSH_SIGNAL'
37
36
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL'
@@ -62,7 +61,7 @@ def initialize(
62
61
@notification_center = notification_center
63
62
@current_batch = [ ]
64
63
@started = false
65
- start!
64
+ @stopped = false
66
65
end
67
66
68
67
def start!
@@ -74,6 +73,7 @@ def start!
74
73
@logger . log ( Logger ::INFO , 'Starting scheduler.' )
75
74
@thread = Thread . new { run_queue }
76
75
@started = true
76
+ @stopped = false
77
77
end
78
78
79
79
def flush
@@ -83,10 +83,7 @@ def flush
83
83
def process ( user_event )
84
84
@logger . log ( Logger ::DEBUG , "Received userEvent: #{ user_event } " )
85
85
86
- unless @started
87
- @logger . log ( Logger ::WARN , 'Executor shutdown, not accepting tasks.' )
88
- return
89
- end
86
+ start! unless @started || @stopped
90
87
91
88
begin
92
89
@event_queue . push ( user_event , true )
@@ -103,6 +100,7 @@ def stop!
103
100
@event_queue << SHUTDOWN_SIGNAL
104
101
@thread . join ( DEFAULT_TIMEOUT_INTERVAL )
105
102
@started = false
103
+ @stopped = true
106
104
end
107
105
108
106
private
@@ -139,7 +137,10 @@ def run_queue
139
137
140
138
break unless process_queue
141
139
142
- interval = ( @flushing_interval_deadline - Helpers ::DateTimeUtils . create_timestamp ) / 1000.0
140
+ interval = ( @flushing_interval_deadline - Helpers ::DateTimeUtils . create_timestamp )
141
+
142
+ interval = interval / 5.0 if interval == @flush_interval
143
+ interval = interval * 0.001
143
144
144
145
sleep interval if interval . positive?
145
146
end
0 commit comments