@@ -71,13 +71,20 @@ def start!
71
71
end
72
72
@flushing_interval_deadline = Helpers ::DateTimeUtils . create_timestamp + @flush_interval
73
73
@logger . log ( Logger ::INFO , 'Starting scheduler.' )
74
+ if @wait_mutex . nil?
75
+ @wait_mutex = Mutex . new
76
+ @resource = ConditionVariable . new
77
+ end
74
78
@thread = Thread . new { run_queue }
75
79
@started = true
76
80
@stopped = false
77
81
end
78
82
79
83
def flush
80
84
@event_queue << FLUSH_SIGNAL
85
+ @wait_mutex . synchronize {
86
+ @resource . signal
87
+ }
81
88
end
82
89
83
90
def process ( user_event )
@@ -94,6 +101,9 @@ def process(user_event)
94
101
95
102
begin
96
103
@event_queue . push ( user_event , true )
104
+ @wait_mutex . synchronize {
105
+ @resource . signal
106
+ }
97
107
rescue => e
98
108
@logger . log ( Logger ::WARN , 'Payload not accepted by the queue: ' + e . message )
99
109
return
@@ -105,6 +115,9 @@ def stop!
105
115
106
116
@logger . log ( Logger ::INFO , 'Stopping scheduler.' )
107
117
@event_queue << SHUTDOWN_SIGNAL
118
+ @wait_mutex . synchronize {
119
+ @resource . signal
120
+ }
108
121
@thread . join ( DEFAULT_TIMEOUT_INTERVAL )
109
122
@started = false
110
123
@stopped = true
@@ -144,15 +157,13 @@ def run_queue
144
157
145
158
break unless process_queue
146
159
147
- # what is the current interval to flush
148
- interval = ( @flushing_interval_deadline - Helpers ::DateTimeUtils . create_timestamp )
149
-
150
- interval /= 10.0
151
-
152
- # convert to seconds from milliseconds
153
- interval *= 0.001
154
-
155
- sleep interval if interval . positive?
160
+ # what is the current interval to flush in seconds
161
+ interval = ( @flushing_interval_deadline - Helpers ::DateTimeUtils . create_timestamp ) * 0.001
162
+ if interval . positive?
163
+ @wait_mutex . synchronize {
164
+ @resource . wait ( @wait_mutex , interval )
165
+ }
166
+ end
156
167
end
157
168
rescue SignalException
158
169
@logger . log ( Logger ::ERROR , 'Interrupted while processing buffer.' )
0 commit comments