@@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr<Recordable> &&record) noexc
65
65
{
66
66
// signal the worker thread
67
67
synchronization_data_->is_force_wakeup_background_worker .store (true , std::memory_order_release);
68
- synchronization_data_->cv .notify_one ();
68
+ synchronization_data_->cv .notify_all ();
69
69
}
70
70
}
71
71
@@ -79,21 +79,25 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
79
79
// Now wait for the worker thread to signal back from the Export method
80
80
std::unique_lock<std::mutex> lk_cv (synchronization_data_->force_flush_cv_m );
81
81
82
- synchronization_data_->is_force_flush_pending .store (true , std::memory_order_release);
82
+ std::uint64_t current_sequence =
83
+ synchronization_data_->force_flush_pending_sequence .fetch_add (1 , std::memory_order_release) +
84
+ 1 ;
83
85
synchronization_data_->force_flush_timeout_us .store (timeout.count (), std::memory_order_release);
84
- auto break_condition = [this ]() {
86
+ auto break_condition = [this , current_sequence ]() {
85
87
if (synchronization_data_->is_shutdown .load () == true )
86
88
{
87
89
return true ;
88
90
}
89
91
90
92
// Wake up the worker thread once.
91
- if (synchronization_data_->is_force_flush_pending .load (std::memory_order_acquire))
93
+ if (synchronization_data_->force_flush_pending_sequence .load (std::memory_order_acquire) >
94
+ synchronization_data_->force_flush_notified_sequence .load (std::memory_order_acquire))
92
95
{
93
- synchronization_data_->cv .notify_one ();
96
+ synchronization_data_->cv .notify_all ();
94
97
}
95
98
96
- return synchronization_data_->is_force_flush_notified .load (std::memory_order_acquire);
99
+ return synchronization_data_->force_flush_notified_sequence .load (std::memory_order_acquire) >=
100
+ current_sequence;
97
101
};
98
102
99
103
// Fix timeout to meet requirement of wait_for
@@ -110,35 +114,22 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
110
114
bool result = false ;
111
115
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero ())
112
116
{
113
- // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
114
- // between is_force_flush_pending.load () and force_flush_cv.wait(). We must not wait
115
- // for ever
117
+ // When force_flush_notified_sequence.compare_exchange_strong(...) and
118
+ // force_flush_cv.notify_all () is called between force_flush_pending_sequence.load(...) and
119
+ // force_flush_cv.wait(). We must not wait for ever
116
120
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now ();
117
- result = synchronization_data_->force_flush_cv .wait_for (lk_cv, scheduled_delay_millis_,
118
- break_condition);
119
- timeout_steady -= std::chrono::steady_clock::now () - start_timepoint;
120
- }
121
+ std::chrono::microseconds wait_timeout = scheduled_delay_millis_;
121
122
122
- // If it's already signaled, we must wait util notified.
123
- // We use a spin lock here
124
- if (false ==
125
- synchronization_data_->is_force_flush_pending .exchange (false , std::memory_order_acq_rel))
126
- {
127
- for (int retry_waiting_times = 0 ;
128
- false == synchronization_data_->is_force_flush_notified .load (std::memory_order_acquire);
129
- ++retry_waiting_times)
123
+ if (wait_timeout > timeout_steady)
130
124
{
131
- opentelemetry::common::SpinLockMutex::fast_yield ();
132
- if ((retry_waiting_times & 127 ) == 127 )
133
- {
134
- std::this_thread::yield ();
135
- }
125
+ wait_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady);
136
126
}
127
+ result = synchronization_data_->force_flush_cv .wait_for (lk_cv, wait_timeout, break_condition);
128
+ timeout_steady -= std::chrono::steady_clock::now () - start_timepoint;
137
129
}
138
130
139
- synchronization_data_->is_force_flush_notified .store (false , std::memory_order_release);
140
-
141
- return result;
131
+ return synchronization_data_->force_flush_notified_sequence .load (std::memory_order_acquire) >=
132
+ current_sequence;
142
133
}
143
134
144
135
void BatchLogRecordProcessor::DoBackgroundWork ()
@@ -182,8 +173,8 @@ void BatchLogRecordProcessor::Export()
182
173
{
183
174
std::vector<std::unique_ptr<Recordable>> records_arr;
184
175
size_t num_records_to_export;
185
- bool notify_force_flush =
186
- synchronization_data_->is_force_flush_pending . exchange ( false , std::memory_order_acq_rel );
176
+ std:: uint64_t notify_force_flush =
177
+ synchronization_data_->force_flush_pending_sequence . load ( std::memory_order_acquire );
187
178
if (notify_force_flush)
188
179
{
189
180
num_records_to_export = buffer_.size ();
@@ -217,7 +208,7 @@ void BatchLogRecordProcessor::Export()
217
208
}
218
209
219
210
void BatchLogRecordProcessor::NotifyCompletion (
220
- bool notify_force_flush,
211
+ std:: uint64_t notify_force_flush,
221
212
const std::unique_ptr<LogRecordExporter> &exporter,
222
213
const std::shared_ptr<SynchronizationData> &synchronization_data)
223
214
{
@@ -226,7 +217,8 @@ void BatchLogRecordProcessor::NotifyCompletion(
226
217
return ;
227
218
}
228
219
229
- if (notify_force_flush)
220
+ if (notify_force_flush >
221
+ synchronization_data->force_flush_notified_sequence .load (std::memory_order_acquire))
230
222
{
231
223
if (exporter)
232
224
{
@@ -236,8 +228,15 @@ void BatchLogRecordProcessor::NotifyCompletion(
236
228
std::chrono::microseconds::zero ());
237
229
exporter->ForceFlush (timeout);
238
230
}
239
- synchronization_data->is_force_flush_notified .store (true , std::memory_order_release);
240
- synchronization_data->force_flush_cv .notify_one ();
231
+
232
+ std::uint64_t notified_sequence =
233
+ synchronization_data->force_flush_notified_sequence .load (std::memory_order_acquire);
234
+ while (notify_force_flush > notified_sequence)
235
+ {
236
+ synchronization_data->force_flush_notified_sequence .compare_exchange_strong (
237
+ notified_sequence, notify_force_flush, std::memory_order_acq_rel);
238
+ synchronization_data->force_flush_cv .notify_all ();
239
+ }
241
240
}
242
241
}
243
242
@@ -246,7 +245,8 @@ void BatchLogRecordProcessor::DrainQueue()
246
245
while (true )
247
246
{
248
247
if (buffer_.empty () &&
249
- false == synchronization_data_->is_force_flush_pending .load (std::memory_order_acquire))
248
+ synchronization_data_->force_flush_pending_sequence .load (std::memory_order_acquire) <=
249
+ synchronization_data_->force_flush_notified_sequence .load (std::memory_order_acquire))
250
250
{
251
251
break ;
252
252
}
@@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce
285
285
if (worker_thread_.joinable ())
286
286
{
287
287
synchronization_data_->is_force_wakeup_background_worker .store (true , std::memory_order_release);
288
- synchronization_data_->cv .notify_one ();
288
+ synchronization_data_->cv .notify_all ();
289
289
worker_thread_.join ();
290
290
}
291
291
0 commit comments