Skip to content

Commit 7d9b0e5

Browse files
authored
Data points in exporter shouldnt use bound instruments (open-telemetry#1237)
1 parent ffa3b39 commit 7d9b0e5

File tree

6 files changed

+158
-66
lines changed

6 files changed

+158
-66
lines changed

exporter/opentelemetry-exporter-otlp/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
- Add Env variables in OTLP exporter
66
([#1101](https://github.com/open-telemetry/opentelemetry-python/pull/1101))
7+
- Do not use bound instruments in OTLP exporter
8+
([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237))
79

810
## Version 0.14b0
911

exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -58,48 +58,54 @@
5858
MetricsExporter,
5959
MetricsExportResult,
6060
)
61+
from opentelemetry.sdk.metrics.export.aggregate import (
62+
HistogramAggregator,
63+
LastValueAggregator,
64+
MinMaxSumCountAggregator,
65+
SumAggregator,
66+
ValueObserverAggregator,
67+
)
6168

6269
logger = logging.getLogger(__name__)
6370
DataPointT = TypeVar("DataPointT", IntDataPoint, DoubleDataPoint)
6471

6572

6673
def _get_data_points(
67-
sdk_metric: MetricRecord, data_point_class: Type[DataPointT]
74+
sdk_metric_record: MetricRecord, data_point_class: Type[DataPointT]
6875
) -> List[DataPointT]:
6976

70-
data_points = []
71-
72-
for (
73-
label,
74-
bound_counter,
75-
) in sdk_metric.instrument.bound_instruments.items():
76-
77-
string_key_values = []
78-
79-
for label_key, label_value in label:
80-
string_key_values.append(
81-
StringKeyValue(key=label_key, value=label_value)
82-
)
83-
84-
for view_data in bound_counter.view_datas:
85-
86-
if view_data.labels == label:
87-
88-
data_points.append(
89-
data_point_class(
90-
labels=string_key_values,
91-
value=view_data.aggregator.current,
92-
start_time_unix_nano=(
93-
view_data.aggregator.last_checkpoint_timestamp
94-
),
95-
time_unix_nano=(
96-
view_data.aggregator.last_update_timestamp
97-
),
98-
)
99-
)
100-
break
101-
102-
return data_points
77+
if isinstance(sdk_metric_record.aggregator, SumAggregator):
78+
value = sdk_metric_record.aggregator.checkpoint
79+
80+
elif isinstance(sdk_metric_record.aggregator, MinMaxSumCountAggregator):
81+
# FIXME: How are values to be interpreted from this aggregator?
82+
raise Exception("MinMaxSumCount aggregator data not supported")
83+
84+
elif isinstance(sdk_metric_record.aggregator, HistogramAggregator):
85+
# FIXME: How are values to be interpreted from this aggregator?
86+
raise Exception("Histogram aggregator data not supported")
87+
88+
elif isinstance(sdk_metric_record.aggregator, LastValueAggregator):
89+
value = sdk_metric_record.aggregator.checkpoint
90+
91+
elif isinstance(sdk_metric_record.aggregator, ValueObserverAggregator):
92+
value = sdk_metric_record.aggregator.checkpoint.last
93+
94+
return [
95+
data_point_class(
96+
labels=[
97+
StringKeyValue(key=str(label_key), value=str(label_value))
98+
for label_key, label_value in sdk_metric_record.labels
99+
],
100+
value=value,
101+
start_time_unix_nano=(
102+
sdk_metric_record.aggregator.initial_checkpoint_timestamp
103+
),
104+
time_unix_nano=(
105+
sdk_metric_record.aggregator.last_update_timestamp
106+
),
107+
)
108+
]
103109

104110

105111
class OTLPMetricsExporter(
@@ -179,13 +185,13 @@ def _translate_data(
179185
# SumObserver Sum(aggregation_temporality=cumulative;is_monotonic=true)
180186
# UpDownSumObserver Sum(aggregation_temporality=cumulative;is_monotonic=false)
181187
# ValueObserver Gauge()
182-
for sdk_metric in data:
188+
for sdk_metric_record in data:
183189

184-
if sdk_metric.resource not in (
190+
if sdk_metric_record.resource not in (
185191
sdk_resource_instrumentation_library_metrics.keys()
186192
):
187193
sdk_resource_instrumentation_library_metrics[
188-
sdk_metric.resource
194+
sdk_metric_record.resource
189195
] = InstrumentationLibraryMetrics()
190196

191197
type_class = {
@@ -204,70 +210,82 @@ def _translate_data(
204210
},
205211
}
206212

207-
value_type = sdk_metric.instrument.value_type
213+
value_type = sdk_metric_record.instrument.value_type
208214

209215
sum_class = type_class[value_type]["sum"]["class"]
210216
gauge_class = type_class[value_type]["gauge"]["class"]
211217
data_point_class = type_class[value_type]["data_point_class"]
212218

213-
if isinstance(sdk_metric.instrument, Counter):
219+
if isinstance(sdk_metric_record.instrument, Counter):
214220
otlp_metric_data = sum_class(
215-
data_points=_get_data_points(sdk_metric, data_point_class),
221+
data_points=_get_data_points(
222+
sdk_metric_record, data_point_class
223+
),
216224
aggregation_temporality=(
217225
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
218226
),
219227
is_monotonic=True,
220228
)
221229
argument = type_class[value_type]["sum"]["argument"]
222230

223-
elif isinstance(sdk_metric.instrument, UpDownCounter):
231+
elif isinstance(sdk_metric_record.instrument, UpDownCounter):
224232
otlp_metric_data = sum_class(
225-
data_points=_get_data_points(sdk_metric, data_point_class),
233+
data_points=_get_data_points(
234+
sdk_metric_record, data_point_class
235+
),
226236
aggregation_temporality=(
227237
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
228238
),
229239
is_monotonic=False,
230240
)
231241
argument = type_class[value_type]["sum"]["argument"]
232242

233-
elif isinstance(sdk_metric.instrument, (ValueRecorder)):
243+
elif isinstance(sdk_metric_record.instrument, (ValueRecorder)):
234244
logger.warning("Skipping exporting of ValueRecorder metric")
235245
continue
236246

237-
elif isinstance(sdk_metric.instrument, SumObserver):
247+
elif isinstance(sdk_metric_record.instrument, SumObserver):
238248
otlp_metric_data = sum_class(
239-
data_points=_get_data_points(sdk_metric, data_point_class),
249+
data_points=_get_data_points(
250+
sdk_metric_record, data_point_class
251+
),
240252
aggregation_temporality=(
241253
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
242254
),
243255
is_monotonic=True,
244256
)
245257
argument = type_class[value_type]["sum"]["argument"]
246258

247-
elif isinstance(sdk_metric.instrument, UpDownSumObserver):
259+
elif isinstance(sdk_metric_record.instrument, UpDownSumObserver):
248260
otlp_metric_data = sum_class(
249-
data_points=_get_data_points(sdk_metric, data_point_class),
261+
data_points=_get_data_points(
262+
sdk_metric_record, data_point_class
263+
),
250264
aggregation_temporality=(
251265
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
252266
),
253267
is_monotonic=False,
254268
)
255269
argument = type_class[value_type]["sum"]["argument"]
256270

257-
elif isinstance(sdk_metric.instrument, (ValueObserver)):
271+
elif isinstance(sdk_metric_record.instrument, (ValueObserver)):
258272
otlp_metric_data = gauge_class(
259-
data_points=_get_data_points(sdk_metric, data_point_class)
273+
data_points=_get_data_points(
274+
sdk_metric_record, data_point_class
275+
)
260276
)
261277
argument = type_class[value_type]["gauge"]["argument"]
262278

263279
sdk_resource_instrumentation_library_metrics[
264-
sdk_metric.resource
280+
sdk_metric_record.resource
265281
].metrics.append(
266282
OTLPMetric(
267283
**{
268-
"name": sdk_metric.instrument.name,
269-
"description": sdk_metric.instrument.description,
270-
"unit": sdk_metric.instrument.unit,
284+
"name": sdk_metric_record.instrument.name,
285+
"description": (
286+
sdk_metric_record.instrument.description
287+
),
288+
"unit": sdk_metric_record.instrument.unit,
271289
argument: otlp_metric_data,
272290
}
273291
)

exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ def setUp(self):
5252

5353
self.counter_metric_record = MetricRecord(
5454
Counter(
55-
"a",
56-
"b",
5755
"c",
56+
"d",
57+
"e",
5858
int,
5959
MeterProvider(resource=resource,).get_meter(__name__),
60-
("d",),
60+
("f",),
6161
),
62-
OrderedDict([("e", "f")]),
62+
[("g", "h")],
6363
SumAggregator(),
6464
resource,
6565
)
@@ -97,7 +97,9 @@ def test_translate_metrics(self, mock_time_ns):
9797

9898
mock_time_ns.configure_mock(**{"return_value": 1})
9999

100-
self.counter_metric_record.instrument.add(1, OrderedDict([("a", "b")]))
100+
self.counter_metric_record.aggregator.checkpoint = 1
101+
self.counter_metric_record.aggregator.initial_checkpoint_timestamp = 1
102+
self.counter_metric_record.aggregator.last_update_timestamp = 1
101103

102104
expected = ExportMetricsServiceRequest(
103105
resource_metrics=[
@@ -114,19 +116,20 @@ def test_translate_metrics(self, mock_time_ns):
114116
InstrumentationLibraryMetrics(
115117
metrics=[
116118
OTLPMetric(
117-
name="a",
118-
description="b",
119-
unit="c",
119+
name="c",
120+
description="d",
121+
unit="e",
120122
int_sum=IntSum(
121123
data_points=[
122124
IntDataPoint(
123125
labels=[
124126
StringKeyValue(
125-
key="a", value="b"
127+
key="g", value="h"
126128
)
127129
],
128130
value=1,
129131
time_unix_nano=1,
132+
start_time_unix_nano=1,
130133
)
131134
],
132135
aggregation_temporality=(

opentelemetry-sdk/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
([#1282](https://github.com/open-telemetry/opentelemetry-python/pull/1282))
1515
- Span.is_recording() returns false after span has ended
1616
([#1289](https://github.com/open-telemetry/opentelemetry-python/pull/1289))
17+
- Set initial checkpoint timestamp in aggregators
18+
([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237))
1719

1820
## Version 0.14b0
1921

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class Aggregator(abc.ABC):
3333
def __init__(self, config=None):
3434
self._lock = threading.Lock()
3535
self.last_update_timestamp = 0
36-
self.last_checkpoint_timestamp = 0
36+
self.initial_checkpoint_timestamp = 0
37+
self.checkpointed = True
3738
if config is not None:
3839
self.config = config
3940
else:
@@ -42,21 +43,25 @@ def __init__(self, config=None):
4243
@abc.abstractmethod
4344
def update(self, value):
4445
"""Updates the current with the new value."""
46+
if self.checkpointed:
47+
self.initial_checkpoint_timestamp = time_ns()
48+
self.checkpointed = False
4549
self.last_update_timestamp = time_ns()
4650

4751
@abc.abstractmethod
4852
def take_checkpoint(self):
4953
"""Stores a snapshot of the current value."""
50-
self.last_checkpoint_timestamp = time_ns()
54+
self.checkpointed = True
5155

5256
@abc.abstractmethod
5357
def merge(self, other):
5458
"""Combines two aggregator values."""
5559
self.last_update_timestamp = max(
5660
self.last_update_timestamp, other.last_update_timestamp
5761
)
58-
self.last_checkpoint_timestamp = max(
59-
self.last_checkpoint_timestamp, other.last_checkpoint_timestamp
62+
self.initial_checkpoint_timestamp = max(
63+
self.initial_checkpoint_timestamp,
64+
other.initial_checkpoint_timestamp,
6065
)
6166

6267
def _verify_type(self, other):
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from time import sleep
16+
17+
from opentelemetry.context import attach, detach, set_value
18+
from opentelemetry.metrics import Meter
19+
from opentelemetry.sdk.metrics.export import MetricsExporter
20+
21+
22+
class DebugController:
23+
"""A debug controller, used to replace Push controller when debugging
24+
25+
Push controller uses a thread which makes it hard to use the IPython
26+
debugger. This controller does not use a thread, but relies on the user
27+
manually calling its ``run`` method to start the controller.
28+
29+
Args:
30+
meter: The meter used to collect metrics.
31+
exporter: The exporter used to export metrics.
32+
interval: The collect/export interval in seconds.
33+
"""
34+
35+
daemon = True
36+
37+
def __init__(
38+
self, meter: Meter, exporter: MetricsExporter, interval: float
39+
):
40+
super().__init__()
41+
self.meter = meter
42+
self.exporter = exporter
43+
self.interval = interval
44+
45+
def run(self):
46+
while True:
47+
self.tick()
48+
sleep(self.interval)
49+
50+
def shutdown(self):
51+
# Run one more collection pass to flush metrics batched in the meter
52+
self.tick()
53+
54+
def tick(self):
55+
# Collect all of the meter's metrics to be exported
56+
self.meter.collect()
57+
# Export the collected metrics
58+
token = attach(set_value("suppress_instrumentation", True))
59+
self.exporter.export(self.meter.processor.checkpoint_set())
60+
detach(token)
61+
# Perform post-exporting logic
62+
self.meter.processor.finished_collection()

0 commit comments

Comments
 (0)