Skip to content

Commit e8f7b6f

Browse files
authored
views: properly hash config dict, don't copy aggregator when stateful (open-telemetry#967)
1 parent 3efe854 commit e8f7b6f

File tree

3 files changed

+138
-15
lines changed

3 files changed

+138
-15
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ def process(self, record) -> None:
6161
"""Stores record information to be ready for exporting."""
6262
# Checkpoints the current aggregator value to be collected for export
6363
aggregator = record.aggregator
64+
aggregator.take_checkpoint()
6465

6566
# The uniqueness of a batch record is defined by a specific metric
6667
# using an aggregator type with a specific set of labels.
@@ -72,19 +73,20 @@ def process(self, record) -> None:
7273
get_dict_as_key(aggregator.config),
7374
record.labels,
7475
)
76+
7577
batch_value = self._batch_map.get(key)
78+
7679
if batch_value:
77-
if batch_value != aggregator:
78-
aggregator.take_checkpoint()
79-
batch_value.merge(aggregator)
80-
else:
81-
aggregator.take_checkpoint()
82-
83-
if self.stateful:
84-
# if stateful batcher, create a copy of the aggregator and update
85-
# it with the current checkpointed value for long-term storage
86-
aggregator = record.aggregator.__class__(
87-
config=record.aggregator.config
88-
)
89-
aggregator.merge(record.aggregator)
80+
# Update the stored checkpointed value if exists. The call to merge
81+
# here combines only identical records (same key).
82+
batch_value.merge(aggregator)
83+
return
84+
85+
# create a copy of the aggregator and update
86+
# it with the current checkpointed value for long-term storage
87+
aggregator = record.aggregator.__class__(
88+
config=record.aggregator.config
89+
)
90+
aggregator.merge(record.aggregator)
91+
9092
self._batch_map[key] = aggregator

opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,16 @@ def ns_to_iso_str(nanoseconds):
3333

3434
def get_dict_as_key(labels):
3535
"""Converts a dict to be used as a unique key"""
36-
return tuple(sorted(labels.items()))
36+
return tuple(
37+
sorted(
38+
map(
39+
lambda kv: (kv[0], tuple(kv[1]))
40+
if isinstance(kv[1], list)
41+
else kv,
42+
labels.items(),
43+
)
44+
)
45+
)
3746

3847

3948
class BoundedList(Sequence):

opentelemetry-sdk/tests/metrics/test_view.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
from unittest import mock
1717

1818
from opentelemetry.sdk import metrics
19-
from opentelemetry.sdk.metrics import Counter, view
19+
from opentelemetry.sdk.metrics import Counter, ValueRecorder, view
2020
from opentelemetry.sdk.metrics.export import aggregate
2121
from opentelemetry.sdk.metrics.export.aggregate import (
22+
HistogramAggregator,
2223
MinMaxSumCountAggregator,
2324
SumAggregator,
2425
)
@@ -185,6 +186,117 @@ def test_multiple_views(self):
185186
self.assertTrue((label2, 5) in sum_set)
186187

187188

189+
class TestHistogramView(unittest.TestCase):
190+
def test_histogram_stateful(self):
191+
meter = metrics.MeterProvider(stateful=True).get_meter(__name__)
192+
exporter = InMemoryMetricsExporter()
193+
controller = PushController(meter, exporter, 30)
194+
195+
requests_size = meter.create_metric(
196+
name="requests_size",
197+
description="size of requests",
198+
unit="1",
199+
value_type=int,
200+
metric_type=ValueRecorder,
201+
)
202+
203+
size_view = View(
204+
requests_size,
205+
HistogramAggregator,
206+
aggregator_config={"bounds": [20, 40, 60, 80, 100]},
207+
label_keys=["environment"],
208+
view_config=ViewConfig.LABEL_KEYS,
209+
)
210+
211+
meter.register_view(size_view)
212+
213+
# Since this is using the HistogramAggregator, the bucket counts will be reflected
214+
# with each record
215+
requests_size.record(25, {"environment": "staging", "test": "value"})
216+
requests_size.record(1, {"environment": "staging", "test": "value2"})
217+
requests_size.record(200, {"environment": "staging", "test": "value3"})
218+
219+
controller.tick()
220+
221+
metrics_list = exporter.get_exported_metrics()
222+
self.assertEqual(len(metrics_list), 1)
223+
checkpoint = metrics_list[0].aggregator.checkpoint
224+
self.assertEqual(
225+
tuple(checkpoint.items()),
226+
((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (">", 1)),
227+
)
228+
exporter.clear()
229+
230+
requests_size.record(25, {"environment": "staging", "test": "value"})
231+
requests_size.record(1, {"environment": "staging", "test": "value2"})
232+
requests_size.record(200, {"environment": "staging", "test": "value3"})
233+
234+
controller.tick()
235+
236+
metrics_list = exporter.get_exported_metrics()
237+
self.assertEqual(len(metrics_list), 1)
238+
checkpoint = metrics_list[0].aggregator.checkpoint
239+
self.assertEqual(
240+
tuple(checkpoint.items()),
241+
((20, 2), (40, 2), (60, 0), (80, 0), (100, 0), (">", 2)),
242+
)
243+
244+
def test_histogram_stateless(self):
245+
# Use the meter type provided by the SDK package
246+
meter = metrics.MeterProvider(stateful=False).get_meter(__name__)
247+
exporter = InMemoryMetricsExporter()
248+
controller = PushController(meter, exporter, 30)
249+
250+
requests_size = meter.create_metric(
251+
name="requests_size",
252+
description="size of requests",
253+
unit="1",
254+
value_type=int,
255+
metric_type=ValueRecorder,
256+
)
257+
258+
size_view = View(
259+
requests_size,
260+
HistogramAggregator,
261+
aggregator_config={"bounds": [20, 40, 60, 80, 100]},
262+
label_keys=["environment"],
263+
view_config=ViewConfig.LABEL_KEYS,
264+
)
265+
266+
meter.register_view(size_view)
267+
268+
# Since this is using the HistogramAggregator, the bucket counts will be reflected
269+
# with each record
270+
requests_size.record(25, {"environment": "staging", "test": "value"})
271+
requests_size.record(1, {"environment": "staging", "test": "value2"})
272+
requests_size.record(200, {"environment": "staging", "test": "value3"})
273+
274+
controller.tick()
275+
276+
metrics_list = exporter.get_exported_metrics()
277+
self.assertEqual(len(metrics_list), 1)
278+
checkpoint = metrics_list[0].aggregator.checkpoint
279+
self.assertEqual(
280+
tuple(checkpoint.items()),
281+
((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (">", 1)),
282+
)
283+
exporter.clear()
284+
285+
requests_size.record(25, {"environment": "staging", "test": "value"})
286+
requests_size.record(1, {"environment": "staging", "test": "value2"})
287+
requests_size.record(200, {"environment": "staging", "test": "value3"})
288+
289+
controller.tick()
290+
291+
metrics_list = exporter.get_exported_metrics()
292+
self.assertEqual(len(metrics_list), 1)
293+
checkpoint = metrics_list[0].aggregator.checkpoint
294+
self.assertEqual(
295+
tuple(checkpoint.items()),
296+
((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (">", 1)),
297+
)
298+
299+
188300
class DummyMetric(metrics.Metric):
189301
# pylint: disable=W0231
190302
def __init__(self):

0 commit comments

Comments
 (0)