Skip to content

Commit eea093d

Browse files
authored
Merge branch 'main' into do_now_allow_emit_unsafe_log_record
2 parents acc0294 + 605c3e8 commit eea093d

File tree

7 files changed

+129
-124
lines changed

7 files changed

+129
-124
lines changed

api/include/opentelemetry/trace/trace_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class OPENTELEMETRY_EXPORT TraceState
9595
[&header_s, &first](nostd::string_view key, nostd::string_view value) noexcept {
9696
if (!first)
9797
{
98-
header_s.append(",");
98+
header_s.append(1, kMembersSeparator);
9999
}
100100
else
101101
{

sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,20 +115,25 @@ class BatchLogRecordProcessor : public LogRecordProcessor
115115

116116
/* Important boolean flags to handle the workflow of the processor */
117117
std::atomic<bool> is_force_wakeup_background_worker{false};
118-
std::atomic<bool> is_force_flush_pending{false};
119-
std::atomic<bool> is_force_flush_notified{false};
120-
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
121118
std::atomic<bool> is_shutdown{false};
119+
std::atomic<uint64_t> force_flush_pending_sequence{0};
120+
std::atomic<uint64_t> force_flush_notified_sequence{0};
121+
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
122+
123+
// Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs
124+
// and may not initialize the member correctly. See also
125+
// https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
126+
inline SynchronizationData() {}
122127
};
123128

124129
/**
125130
* @brief Notify completion of shutdown and force flush. This may be called from the any thread at
126131
* any time
127132
*
128-
* @param notify_force_flush Flag to indicate whether to notify force flush completion.
133+
* @param notify_force_flush Sequence to indicate whether to notify force flush completion.
129134
* @param synchronization_data Synchronization data to be notified.
130135
*/
131-
static void NotifyCompletion(bool notify_force_flush,
136+
static void NotifyCompletion(uint64_t notify_force_flush,
132137
const std::unique_ptr<LogRecordExporter> &exporter,
133138
const std::shared_ptr<SynchronizationData> &synchronization_data);
134139

sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <atomic>
77
#include <chrono>
88
#include <condition_variable>
9+
#include <cstdint>
910
#include <memory>
1011
#include <mutex>
1112
#include <thread>
@@ -50,9 +51,9 @@ class PeriodicExportingMetricReader : public MetricReader
5051
std::thread worker_thread_;
5152

5253
/* Synchronization primitives */
53-
std::atomic<bool> is_force_flush_pending_{false};
5454
std::atomic<bool> is_force_wakeup_background_worker_{false};
55-
std::atomic<bool> is_force_flush_notified_{false};
55+
std::atomic<uint64_t> force_flush_pending_sequence_{0};
56+
std::atomic<uint64_t> force_flush_notified_sequence_{0};
5657
std::condition_variable cv_, force_flush_cv_;
5758
std::mutex cv_m_, force_flush_m_;
5859
};

sdk/include/opentelemetry/sdk/trace/batch_span_processor.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <atomic>
77
#include <condition_variable>
8+
#include <cstdint>
89
#include <memory>
910
#include <mutex>
1011
#include <thread>
@@ -115,20 +116,25 @@ class BatchSpanProcessor : public SpanProcessor
115116

116117
/* Important boolean flags to handle the workflow of the processor */
117118
std::atomic<bool> is_force_wakeup_background_worker{false};
118-
std::atomic<bool> is_force_flush_pending{false};
119-
std::atomic<bool> is_force_flush_notified{false};
120-
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
121119
std::atomic<bool> is_shutdown{false};
120+
std::atomic<uint64_t> force_flush_pending_sequence{0};
121+
std::atomic<uint64_t> force_flush_notified_sequence{0};
122+
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
123+
124+
// Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs
125+
// and may not initialize the member correctly. See also
126+
// https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
127+
inline SynchronizationData() {}
122128
};
123129

124130
/**
125131
* @brief Notify completion of shutdown and force flush. This may be called from the any thread at
126132
* any time
127133
*
128-
* @param notify_force_flush Flag to indicate whether to notify force flush completion.
134+
* @param notify_force_flush Sequence to indicate whether to notify force flush completion.
129135
* @param synchronization_data Synchronization data to be notified.
130136
*/
131-
static void NotifyCompletion(bool notify_force_flush,
137+
static void NotifyCompletion(uint64_t notify_force_flush,
132138
const std::unique_ptr<SpanExporter> &exporter,
133139
const std::shared_ptr<SynchronizationData> &synchronization_data);
134140

sdk/src/logs/batch_log_record_processor.cc

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr<Recordable> &&record) noexc
6565
{
6666
// signal the worker thread
6767
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
68-
synchronization_data_->cv.notify_one();
68+
synchronization_data_->cv.notify_all();
6969
}
7070
}
7171

@@ -79,21 +79,25 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
7979
// Now wait for the worker thread to signal back from the Export method
8080
std::unique_lock<std::mutex> lk_cv(synchronization_data_->force_flush_cv_m);
8181

82-
synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release);
82+
std::uint64_t current_sequence =
83+
synchronization_data_->force_flush_pending_sequence.fetch_add(1, std::memory_order_release) +
84+
1;
8385
synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release);
84-
auto break_condition = [this]() {
86+
auto break_condition = [this, current_sequence]() {
8587
if (synchronization_data_->is_shutdown.load() == true)
8688
{
8789
return true;
8890
}
8991

9092
// Wake up the worker thread once.
91-
if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire))
93+
if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) >
94+
synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire))
9295
{
93-
synchronization_data_->cv.notify_one();
96+
synchronization_data_->cv.notify_all();
9497
}
9598

96-
return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire);
99+
return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >=
100+
current_sequence;
97101
};
98102

99103
// Fix timeout to meet requirement of wait_for
@@ -110,35 +114,22 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
110114
bool result = false;
111115
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
112116
{
113-
// When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
114-
// between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
115-
// for ever
117+
// When force_flush_notified_sequence.compare_exchange_strong(...) and
118+
// force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and
119+
// force_flush_cv.wait(). We must not wait for ever
116120
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
117-
result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_,
118-
break_condition);
119-
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
120-
}
121+
std::chrono::microseconds wait_timeout = scheduled_delay_millis_;
121122

122-
// If it's already signaled, we must wait util notified.
123-
// We use a spin lock here
124-
if (false ==
125-
synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel))
126-
{
127-
for (int retry_waiting_times = 0;
128-
false == synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire);
129-
++retry_waiting_times)
123+
if (wait_timeout > timeout_steady)
130124
{
131-
opentelemetry::common::SpinLockMutex::fast_yield();
132-
if ((retry_waiting_times & 127) == 127)
133-
{
134-
std::this_thread::yield();
135-
}
125+
wait_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady);
136126
}
127+
result = synchronization_data_->force_flush_cv.wait_for(lk_cv, wait_timeout, break_condition);
128+
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
137129
}
138130

139-
synchronization_data_->is_force_flush_notified.store(false, std::memory_order_release);
140-
141-
return result;
131+
return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >=
132+
current_sequence;
142133
}
143134

144135
void BatchLogRecordProcessor::DoBackgroundWork()
@@ -182,8 +173,8 @@ void BatchLogRecordProcessor::Export()
182173
{
183174
std::vector<std::unique_ptr<Recordable>> records_arr;
184175
size_t num_records_to_export;
185-
bool notify_force_flush =
186-
synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel);
176+
std::uint64_t notify_force_flush =
177+
synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire);
187178
if (notify_force_flush)
188179
{
189180
num_records_to_export = buffer_.size();
@@ -217,7 +208,7 @@ void BatchLogRecordProcessor::Export()
217208
}
218209

219210
void BatchLogRecordProcessor::NotifyCompletion(
220-
bool notify_force_flush,
211+
std::uint64_t notify_force_flush,
221212
const std::unique_ptr<LogRecordExporter> &exporter,
222213
const std::shared_ptr<SynchronizationData> &synchronization_data)
223214
{
@@ -226,7 +217,8 @@ void BatchLogRecordProcessor::NotifyCompletion(
226217
return;
227218
}
228219

229-
if (notify_force_flush)
220+
if (notify_force_flush >
221+
synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire))
230222
{
231223
if (exporter)
232224
{
@@ -236,8 +228,15 @@ void BatchLogRecordProcessor::NotifyCompletion(
236228
std::chrono::microseconds::zero());
237229
exporter->ForceFlush(timeout);
238230
}
239-
synchronization_data->is_force_flush_notified.store(true, std::memory_order_release);
240-
synchronization_data->force_flush_cv.notify_one();
231+
232+
std::uint64_t notified_sequence =
233+
synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire);
234+
while (notify_force_flush > notified_sequence)
235+
{
236+
synchronization_data->force_flush_notified_sequence.compare_exchange_strong(
237+
notified_sequence, notify_force_flush, std::memory_order_acq_rel);
238+
synchronization_data->force_flush_cv.notify_all();
239+
}
241240
}
242241
}
243242

@@ -246,7 +245,8 @@ void BatchLogRecordProcessor::DrainQueue()
246245
while (true)
247246
{
248247
if (buffer_.empty() &&
249-
false == synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire))
248+
synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) <=
249+
synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire))
250250
{
251251
break;
252252
}
@@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce
285285
if (worker_thread_.joinable())
286286
{
287287
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
288-
synchronization_data_->cv.notify_one();
288+
synchronization_data_->cv.notify_all();
289289
worker_thread_.join();
290290
}
291291

sdk/src/metrics/export/periodic_exporting_metric_reader.cc

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
9090
});
9191

9292
std::future_status status;
93+
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
9394
do
9495
{
9596
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
@@ -99,12 +100,13 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
99100
break;
100101
}
101102
} while (status != std::future_status::ready);
102-
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
103-
if (notify_force_flush)
103+
104+
std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
105+
while (notify_force_flush > notified_sequence)
104106
{
105-
std::unique_lock<std::mutex> lk(force_flush_m_);
106-
is_force_flush_notified_.store(true, std::memory_order_release);
107-
force_flush_cv_.notify_one();
107+
force_flush_notified_sequence_.compare_exchange_strong(notified_sequence, notify_force_flush,
108+
std::memory_order_acq_rel);
109+
force_flush_cv_.notify_all();
108110
}
109111

110112
return true;
@@ -113,24 +115,27 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
113115
bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
114116
{
115117
std::unique_lock<std::mutex> lk_cv(force_flush_m_);
116-
is_force_flush_pending_.store(true, std::memory_order_release);
117-
auto break_condition = [this]() {
118+
std::uint64_t current_sequence =
119+
force_flush_pending_sequence_.fetch_add(1, std::memory_order_release) + 1;
120+
auto break_condition = [this, current_sequence]() {
118121
if (IsShutdown())
119122
{
120123
return true;
121124
}
122125

123-
// Wake up the worker thread once.
124-
if (is_force_flush_pending_.load(std::memory_order_acquire))
126+
// Wake up the worker thread.
127+
if (force_flush_pending_sequence_.load(std::memory_order_acquire) >
128+
force_flush_notified_sequence_.load(std::memory_order_acquire))
125129
{
126130
is_force_wakeup_background_worker_.store(true, std::memory_order_release);
127-
cv_.notify_one();
131+
cv_.notify_all();
128132
}
129-
return is_force_flush_notified_.load(std::memory_order_acquire);
133+
return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence;
130134
};
131135

132-
auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
133-
timeout, std::chrono::microseconds::zero());
136+
std::chrono::microseconds wait_timeout =
137+
opentelemetry::common::DurationUtil::AdjustWaitForTimeout(timeout,
138+
std::chrono::microseconds::zero());
134139
std::chrono::steady_clock::duration timeout_steady =
135140
std::chrono::duration_cast<std::chrono::steady_clock::duration>(wait_timeout);
136141
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
@@ -141,29 +146,19 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo
141146
bool result = false;
142147
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
143148
{
144-
// When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called
145-
// between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait
146-
// for ever
149+
// When force_flush_notified_sequence_.compare_exchange_strong(...) and
150+
// force_flush_cv_.notify_all() is called between force_flush_pending_sequence_.load(...) and
151+
// force_flush_cv_.wait(). We must not wait for ever
147152
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
148-
result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition);
149-
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
150-
}
151153

152-
// If it will be already signaled, we must wait until notified.
153-
// We use a spin lock here
154-
if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel))
155-
{
156-
for (int retry_waiting_times = 0;
157-
false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times)
154+
wait_timeout = export_interval_millis_;
155+
if (wait_timeout > timeout_steady)
158156
{
159-
opentelemetry::common::SpinLockMutex::fast_yield();
160-
if ((retry_waiting_times & 127) == 127)
161-
{
162-
std::this_thread::yield();
163-
}
157+
wait_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady);
164158
}
159+
result = force_flush_cv_.wait_for(lk_cv, wait_timeout, break_condition);
160+
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
165161
}
166-
is_force_flush_notified_.store(false, std::memory_order_release);
167162

168163
if (result)
169164
{
@@ -186,18 +181,15 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo
186181
result = false;
187182
}
188183
}
189-
return result;
184+
return result &&
185+
force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence;
190186
}
191187

192188
bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
193189
{
194190
if (worker_thread_.joinable())
195191
{
196-
{
197-
// ensure that `cv_` is awaiting, and the update doesn't get lost
198-
std::unique_lock<std::mutex> lk(cv_m_);
199-
cv_.notify_all();
200-
}
192+
cv_.notify_all();
201193
worker_thread_.join();
202194
}
203195
return exporter_->Shutdown(timeout);

0 commit comments

Comments
 (0)