diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index 0c9dea9c90..ded2be1820 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -58,7 +58,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage public: SyncMetricStorage(InstrumentDescriptor instrument_descriptor, const AggregationType aggregation_type, - const AttributesProcessor *attributes_processor, + std::shared_ptr attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType exempler_filter_type, nostd::shared_ptr &&exemplar_reservoir, @@ -67,7 +67,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage size_t attributes_limit = kAggregationCardinalityLimit) : instrument_descriptor_(instrument_descriptor), attributes_hashmap_(new AttributesHashMap(attributes_limit)), - attributes_processor_(attributes_processor), + attributes_processor_(std::move(attributes_processor)), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_filter_type_(exempler_filter_type), exemplar_reservoir_(exemplar_reservoir), @@ -119,7 +119,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_ - ->GetOrSetDefault(attributes, attributes_processor_, create_default_aggregation_) + ->GetOrSetDefault(attributes, attributes_processor_.get(), create_default_aggregation_) ->Aggregate(value); } @@ -160,7 +160,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage #endif std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_ - ->GetOrSetDefault(attributes, attributes_processor_, create_default_aggregation_) + ->GetOrSetDefault(attributes, attributes_processor_.get(), create_default_aggregation_) ->Aggregate(value); } @@ -175,7 +175,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage // hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call) std::unique_ptr attributes_hashmap_; std::function()> create_default_aggregation_; - const AttributesProcessor *attributes_processor_; + std::shared_ptr attributes_processor_; #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType exemplar_filter_type_; nostd::shared_ptr exemplar_reservoir_; diff --git a/sdk/include/opentelemetry/sdk/metrics/view/view.h b/sdk/include/opentelemetry/sdk/metrics/view/view.h index dc0888f47d..e1c6237b36 100644 --- a/sdk/include/opentelemetry/sdk/metrics/view/view.h +++ b/sdk/include/opentelemetry/sdk/metrics/view/view.h @@ -53,10 +53,10 @@ class View return aggregation_config_.get(); } - virtual const opentelemetry::sdk::metrics::AttributesProcessor &GetAttributesProcessor() - const noexcept + virtual std::shared_ptr + GetAttributesProcessor() const noexcept { - return *attributes_processor_.get(); + return attributes_processor_; } private: @@ -65,7 +65,7 @@ class View std::string unit_; AggregationType aggregation_type_; std::shared_ptr aggregation_config_; - std::unique_ptr attributes_processor_; + std::shared_ptr attributes_processor_; }; } // namespace metrics } // namespace sdk diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 4424bbb4c9..4040e6791f 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -538,7 +538,7 @@ std::unique_ptr Meter::RegisterSyncMetricStorage( { WarnOnDuplicateInstrument(GetInstrumentationScope(), storage_registry_, view_instr_desc); sync_storage = std::shared_ptr(new SyncMetricStorage( - view_instr_desc, view.GetAggregationType(), &view.GetAttributesProcessor(), + view_instr_desc, view.GetAggregationType(), view.GetAttributesProcessor(), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_filter_type, GetExemplarReservoir(view.GetAggregationType(), view.GetAggregationConfig(), diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc index 801096b269..6a6dbe6240 100644 --- a/sdk/test/metrics/cardinality_limit_test.cc +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -109,9 +109,9 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati const size_t attributes_limit = 10; InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; - SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(), + SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), diff --git a/sdk/test/metrics/meter_test.cc b/sdk/test/metrics/meter_test.cc index 199ccc682c..ce925fb0d7 100644 --- a/sdk/test/metrics/meter_test.cc +++ b/sdk/test/metrics/meter_test.cc @@ -6,14 +6,17 @@ #include #include #include +#include #include #include #include +#include #include #include #include "common.h" #include +#include "opentelemetry/common/key_value_iterable.h" #include "opentelemetry/context/context.h" #include "opentelemetry/metrics/async_instruments.h" #include "opentelemetry/metrics/meter.h" @@ -21,6 +24,7 @@ #include "opentelemetry/sdk/instrumentationscope/scope_configurator.h" #include "opentelemetry/sdk/metrics/instruments.h" #include "opentelemetry/sdk/metrics/meter_config.h" +#include "opentelemetry/sdk/metrics/view/attributes_processor.h" #include "opentelemetry/sdk/metrics/view/view_registry.h" #include "opentelemetry/sdk/resource/resource.h" @@ -31,6 +35,7 @@ #include "opentelemetry/metrics/sync_instruments.h" // IWYU pragma: keep #include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/nostd/shared_ptr.h" +#include "opentelemetry/nostd/string_view.h" #include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/common/attribute_utils.h" #include "opentelemetry/sdk/common/global_log_handler.h" @@ -184,6 +189,22 @@ class MeterCreateInstrumentTest : public ::testing::Test std::shared_ptr metric_reader_ptr_{new MockMetricReader()}; }; +class TestProcessor : public sdk::metrics::AttributesProcessor +{ +public: + explicit TestProcessor() = default; + ~TestProcessor() override = default; + + sdk::metrics::MetricAttributes process( + const opentelemetry::common::KeyValueIterable &attributes) const noexcept override + { + // Just forward attributes + return sdk::metrics::MetricAttributes(attributes); + } + + bool isPresent(nostd::string_view /*key*/) const noexcept override { return true; } +}; + } // namespace TEST(MeterTest, BasicAsyncTests) @@ -851,3 +872,46 @@ TEST_F(MeterCreateInstrumentTest, ViewCorrectedDuplicateAsyncInstrumentsByDescri return true; }); } + +TEST(MeterTest, RecordAfterProviderDestructionWithCustomProcessor_NoResetInMain) +{ + std::unique_ptr processor(new TestProcessor()); + + // MeterProvider is owned by unique_ptr for explicit control + std::unique_ptr provider(new MeterProvider()); + + // Register a View with custom processor + std::unique_ptr view( + new View("my_counter", "", "", AggregationType::kSum, nullptr, std::move(processor))); + std::unique_ptr instr_selector( + new InstrumentSelector(InstrumentType::kCounter, "my_counter", "")); + std::unique_ptr meter_selector(new MeterSelector("test_meter", "", "")); + provider->AddView(std::move(instr_selector), std::move(meter_selector), std::move(view)); + + auto meter = provider->GetMeter("test_meter"); + auto counter = meter->CreateUInt64Counter("my_counter"); + + // Move the counter to the thread + std::atomic thread_ready{false}; + std::atomic thread_done{false}; + + std::thread t([c = std::move(counter), &thread_ready, &thread_done]() mutable { + thread_ready = true; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // Safe after provider destruction + c->Add(12345, {{"thread", "after_provider_destruction"}}); + thread_done = true; + }); + + // Wait for thread to be ready + while (!thread_ready.load()) + std::this_thread::yield(); + + // Destroy the provider (and its storage etc) + provider.reset(); + + // Wait for thread to finish + while (!thread_done.load()) + std::this_thread::yield(); + t.join(); +} diff --git a/sdk/test/metrics/sync_metric_storage_counter_test.cc b/sdk/test/metrics/sync_metric_storage_counter_test.cc index 9ca84fd0cb..daf326a019 100644 --- a/sdk/test/metrics/sync_metric_storage_counter_test.cc +++ b/sdk/test/metrics/sync_metric_storage_counter_test.cc @@ -49,10 +49,10 @@ TEST_P(WritableMetricStorageTestFixture, LongCounterSumAggregation) std::map attributes_get = {{"RequestType", "GET"}}; std::map attributes_put = {{"RequestType", "PUT"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kSum, default_attributes_processor.get(), + instr_desc, AggregationType::kSum, default_attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif @@ -189,10 +189,10 @@ TEST_P(WritableMetricStorageTestFixture, DoubleCounterSumAggregation) std::map attributes_get = {{"RequestType", "GET"}}; std::map attributes_put = {{"RequestType", "PUT"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kSum, default_attributes_processor.get(), + instr_desc, AggregationType::kSum, default_attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif diff --git a/sdk/test/metrics/sync_metric_storage_gauge_test.cc b/sdk/test/metrics/sync_metric_storage_gauge_test.cc index 4ab04eeae1..183dab6e4c 100644 --- a/sdk/test/metrics/sync_metric_storage_gauge_test.cc +++ b/sdk/test/metrics/sync_metric_storage_gauge_test.cc @@ -37,10 +37,10 @@ TEST_P(WritableMetricStorageTestFixture, LongGaugeLastValueAggregation) std::map attributes_roomA = {{"Room.id", "Rack A"}}; std::map attributes_roomB = {{"Room.id", "Rack B"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kLastValue, default_attributes_processor.get(), + instr_desc, AggregationType::kLastValue, default_attributes_processor, # ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), # endif @@ -121,10 +121,10 @@ TEST_P(WritableMetricStorageTestFixture, DoubleGaugeLastValueAggregation) std::map attributes_roomA = {{"Room.id", "Rack A"}}; std::map attributes_roomB = {{"Room.id", "Rack B"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kLastValue, default_attributes_processor.get(), + instr_desc, AggregationType::kLastValue, default_attributes_processor, # ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), # endif diff --git a/sdk/test/metrics/sync_metric_storage_histogram_test.cc b/sdk/test/metrics/sync_metric_storage_histogram_test.cc index 89dedb397f..be328bac5c 100644 --- a/sdk/test/metrics/sync_metric_storage_histogram_test.cc +++ b/sdk/test/metrics/sync_metric_storage_histogram_test.cc @@ -51,10 +51,10 @@ TEST_P(WritableMetricStorageHistogramTestFixture, LongHistogram) std::map attributes_get = {{"RequestType", "GET"}}; std::map attributes_put = {{"RequestType", "PUT"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kHistogram, default_attributes_processor.get(), + instr_desc, AggregationType::kHistogram, default_attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif @@ -192,10 +192,10 @@ TEST_P(WritableMetricStorageHistogramTestFixture, DoubleHistogram) std::map attributes_get = {{"RequestType", "GET"}}; std::map attributes_put = {{"RequestType", "PUT"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kHistogram, default_attributes_processor.get(), + instr_desc, AggregationType::kHistogram, default_attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif @@ -340,10 +340,10 @@ TEST_P(WritableMetricStorageHistogramTestFixture, Base2ExponentialDoubleHistogra std::map attributes_get = {{"RequestType", "GET"}}; std::map attributes_put = {{"RequestType", "PUT"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kBase2ExponentialHistogram, default_attributes_processor.get(), + instr_desc, AggregationType::kBase2ExponentialHistogram, default_attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif diff --git a/sdk/test/metrics/sync_metric_storage_up_down_counter_test.cc b/sdk/test/metrics/sync_metric_storage_up_down_counter_test.cc index b7b549051d..0fa273e9cb 100644 --- a/sdk/test/metrics/sync_metric_storage_up_down_counter_test.cc +++ b/sdk/test/metrics/sync_metric_storage_up_down_counter_test.cc @@ -48,10 +48,10 @@ TEST_P(WritableMetricStorageTestFixture, LongUpDownCounterSumAggregation) std::map attributes_get = {{"RequestType", "GET"}}; std::map attributes_put = {{"RequestType", "PUT"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kSum, default_attributes_processor.get(), + instr_desc, AggregationType::kSum, default_attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif @@ -198,10 +198,10 @@ TEST_P(WritableMetricStorageTestFixture, DoubleUpDownCounterSumAggregation) std::map attributes_get = {{"RequestType", "GET"}}; std::map attributes_put = {{"RequestType", "PUT"}}; - std::unique_ptr default_attributes_processor{ + std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( - instr_desc, AggregationType::kSum, default_attributes_processor.get(), + instr_desc, AggregationType::kSum, default_attributes_processor, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif