Skip to content

Commit adce43e

Browse files
committed
views
1 parent e966f7e commit adce43e

File tree

6 files changed

+150
-145
lines changed

6 files changed

+150
-145
lines changed

docs/examples/basic_meter/view.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,35 @@
1616
This example shows how to use the different modes to capture metrics.
1717
It shows the usage of the direct, bound and batch calling conventions.
1818
"""
19-
import time
20-
2119
from opentelemetry import metrics
22-
from opentelemetry.sdk.metrics import Counter, Measure, MeterProvider
23-
from opentelemetry.sdk.metrics.export.aggregate import CountAggregation
20+
from opentelemetry.sdk.metrics import (
21+
MeterProvider,
22+
UpDownCounter,
23+
ValueRecorder
24+
)
25+
from opentelemetry.sdk.metrics.export.aggregate import SumAggregator
2426
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
25-
from opentelemetry.sdk.metrics.export.controller import PushController
2627
from opentelemetry.sdk.metrics.view import View, ViewConfig
2728

2829
# Use the meter type provided by the SDK package
2930
metrics.set_meter_provider(MeterProvider())
3031
meter = metrics.get_meter(__name__)
31-
exporter = ConsoleMetricsExporter()
32-
controller = PushController(meter=meter, exporter=exporter, interval=5)
32+
metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 5)
3333

3434
requests_counter = meter.create_metric(
3535
name="requests",
3636
description="number of requests",
3737
unit="1",
3838
value_type=int,
39-
metric_type=Counter,
39+
metric_type=UpDownCounter,
4040
)
4141

4242
requests_size = meter.create_metric(
4343
name="requests_size",
4444
description="size of requests",
4545
unit="1",
4646
value_type=int,
47-
metric_type=Measure,
47+
metric_type=ValueRecorder,
4848
)
4949

5050
# Views are used to define an aggregation type and label keys to aggregate by
@@ -55,21 +55,21 @@
5555
# dropped from the aggregation
5656
counter_view1 = View(
5757
requests_counter,
58-
CountAggregation(),
58+
SumAggregator,
5959
label_keys=["environment"],
6060
config=ViewConfig.LABEL_KEYS
6161
)
6262
counter_view2 = View(
6363
requests_counter,
64-
CountAggregation(),
64+
SumAggregator,
6565
label_keys=["os_type"],
6666
config=ViewConfig.LABEL_KEYS
6767
)
6868
# This view has ViewConfig set to UNGROUPED, meaning all recorded metrics take
6969
# the labels directly without and consideration for label_keys
7070
counter_view3 = View(
7171
requests_counter,
72-
CountAggregation(),
72+
SumAggregator,
7373
label_keys=["environment"], # is not used due to ViewConfig.UNGROUPED
7474
config=ViewConfig.UNGROUPED
7575
)

opentelemetry-api/src/opentelemetry/metrics/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,6 @@ def create_metric(
411411
unit: str,
412412
value_type: Type[ValueT],
413413
metric_type: Type[MetricT],
414-
label_keys: Sequence[str] = (),
415414
enabled: bool = True,
416415
) -> "Metric":
417416
# pylint: disable=no-self-use

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ def __init__(
5151
):
5252
self._labels = labels
5353
self._metric = metric
54+
self.view_datas = metric.meter.view_manager.generate_view_datas(metric, labels)
55+
self._view_datas_lock = threading.Lock()
56+
self._ref_count = 0
57+
self._ref_count_lock = threading.Lock()
5458

5559
def _validate_update(self, value: metrics_api.ValueT) -> bool:
5660
if not self._metric.enabled:
@@ -64,8 +68,26 @@ def _validate_update(self, value: metrics_api.ValueT) -> bool:
6468
return True
6569

6670
def update(self, value: metrics_api.ValueT):
67-
# The view manager handles all updates to aggregators
68-
self._metric.meter.view_manager.record(self._metric, self._labels, value)
71+
with self._view_datas_lock:
72+
# record the value for each view_data belonging to this aggregator
73+
for view_data in self.view_datas:
74+
view_data.record(value)
75+
76+
def release(self):
77+
self.decrease_ref_count()
78+
79+
def decrease_ref_count(self):
80+
with self._ref_count_lock:
81+
self._ref_count -= 1
82+
83+
def increase_ref_count(self):
84+
with self._ref_count_lock:
85+
self._ref_count += 1
86+
87+
def ref_count(self):
88+
with self._ref_count_lock:
89+
return self._ref_count
90+
6991

7092
class BoundCounter(metrics_api.BoundCounter, BaseBoundInstrument):
7193
def add(self, value: metrics_api.ValueT) -> None:
@@ -144,6 +166,7 @@ def bind(self, labels: Dict[str, str]) -> BaseBoundInstrument:
144166
self,
145167
)
146168
self.bound_instruments[key] = bound_instrument
169+
bound_instrument.increase_ref_count()
147170
return bound_instrument
148171

149172
def __repr__(self):
@@ -164,6 +187,7 @@ def add(self, value: metrics_api.ValueT, labels: Dict[str, str]) -> None:
164187
"""See `opentelemetry.metrics.Counter.add`."""
165188
bound_intrument = self.bind(labels)
166189
bound_intrument.add(value)
190+
bound_intrument.release()
167191

168192
UPDATE_FUNCTION = add
169193

@@ -194,6 +218,7 @@ def record(
194218
"""See `opentelemetry.metrics.ValueRecorder.record`."""
195219
bound_intrument = self.bind(labels)
196220
bound_intrument.record(value)
221+
bound_intrument.release()
197222

198223
UPDATE_FUNCTION = record
199224

@@ -326,11 +351,13 @@ def __init__(
326351
instrumentation_info: "InstrumentationInfo",
327352
):
328353
self.instrumentation_info = instrumentation_info
329-
self.batcher = Batcher(stateful)
354+
self.batcher = Batcher(True)
330355
self.resource = source.resource
356+
self.metrics = set()
331357
self.observers = set()
332-
self.view_manager = ViewManager()
358+
self.metrics_lock = threading.Lock()
333359
self.observers_lock = threading.Lock()
360+
self.view_manager = ViewManager()
334361

335362
def collect(self) -> None:
336363
"""Collects all the metrics created with this `Meter` for export.
@@ -344,15 +371,22 @@ def collect(self) -> None:
344371
self._collect_observers()
345372

346373
def _collect_metrics(self) -> None:
347-
# Iterate through metrics created by this meter that have been recorded
348-
for (metric, view_datas) in self.view_manager.view_datas.items():
349-
if not metric.enabled:
350-
continue
351-
for view_data in view_datas:
352-
for (labels, aggregator) in view_data.aggregators.items():
353-
record = Record(metric, labels, aggregator)
354-
# Checkpoints the current aggregators
355-
self.batcher.process(record)
374+
for metric in self.metrics:
375+
if not metric.enabled:
376+
continue
377+
to_remove = []
378+
with metric.bound_instruments_lock:
379+
for labels, bound_instrument in metric.bound_instruments.items():
380+
for view_data in bound_instrument.view_datas:
381+
record = Record(metric, view_data.labels, view_data.aggregator)
382+
self.batcher.process(record)
383+
384+
if bound_instrument.ref_count() == 0:
385+
to_remove.append(labels)
386+
387+
# Remove handles that were released
388+
for labels in to_remove:
389+
del metric.bound_instruments[labels]
356390

357391
def _collect_observers(self) -> None:
358392
with self.observers_lock:
@@ -397,6 +431,8 @@ def create_metric(
397431
self,
398432
enabled=enabled,
399433
)
434+
with self.metrics_lock:
435+
self.metrics.add(metric)
400436
return metric
401437

402438
def register_observer(

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,43 +22,6 @@
2222
logger = logging.getLogger(__name__)
2323

2424

25-
class Aggregation:
26-
27-
def __init__(self, config=None):
28-
self._config = config
29-
30-
@abc.abstractmethod
31-
def new_aggregator(self):
32-
"""Get a new Aggregator for this aggregation."""
33-
34-
35-
class CountAggregation(Aggregation):
36-
"""Describes that the data collected and aggregated with this method will
37-
be turned into a count value
38-
"""
39-
def new_aggregator(self):
40-
"""Get a new Aggregator for this aggregation."""
41-
return CounterAggregator()
42-
43-
44-
class SummaryAggregation(Aggregation):
45-
"""Describes that the data collected and aggregated with this method will
46-
be turned into a summary
47-
"""
48-
def new_aggregator(self):
49-
"""Get a new Aggregator for this aggregation."""
50-
return MinMaxSumCountAggregator()
51-
52-
53-
class HistogramAggregation(Aggregation):
54-
"""Describes that the data collected and aggregated with this method will
55-
be turned into a histogram
56-
"""
57-
def new_aggregator(self):
58-
"""Get a new Aggregator for this aggregation."""
59-
return HistogramAggregator(self._config)
60-
61-
6225
class Aggregator(abc.ABC):
6326
"""Base class for aggregators.
6427
@@ -85,7 +48,7 @@ def merge(self, other):
8548

8649

8750
class SumAggregator(Aggregator):
88-
"""Aggregator for Counter metrics."""
51+
"""Aggregator for counter metrics."""
8952

9053
def __init__(self, config=None):
9154
super().__init__(config=config)
@@ -169,7 +132,7 @@ def merge(self, other):
169132

170133

171134
class HistogramAggregator(Aggregator):
172-
"""Agregator for Measure metrics that keeps a histogram of values."""
135+
"""Agregator for ValueRecorder metrics that keeps a histogram of values."""
173136

174137
def __init__(self, config=None):
175138
super().__init__(config=config)

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,7 @@
1414

1515
from typing import Sequence
1616

17-
from opentelemetry.metrics import (
18-
Counter,
19-
InstrumentT,
20-
SumObserver,
21-
UpDownCounter,
22-
UpDownSumObserver,
23-
ValueObserver,
24-
ValueRecorder,
25-
)
2617
from opentelemetry.sdk.metrics.export import MetricRecord
27-
from opentelemetry.sdk.metrics.export.aggregate import (
28-
Aggregator,
29-
LastValueAggregator,
30-
MinMaxSumCountAggregator,
31-
SumAggregator,
32-
ValueObserverAggregator,
33-
)
3418

3519

3620
class Batcher:
@@ -49,23 +33,6 @@ def __init__(self, stateful: bool):
4933
# (deltas)
5034
self.stateful = stateful
5135

52-
def aggregator_for(self, instrument_type: Type[InstrumentT]) -> Aggregator:
53-
"""Returns an aggregator based on metric instrument type.
54-
55-
Aggregators keep track of and updates values when metrics get updated.
56-
"""
57-
# pylint:disable=R0201
58-
if issubclass(instrument_type, (Counter, UpDownCounter)):
59-
return SumAggregator()
60-
if issubclass(instrument_type, (SumObserver, UpDownSumObserver)):
61-
return LastValueAggregator()
62-
if issubclass(instrument_type, ValueRecorder):
63-
return MinMaxSumCountAggregator()
64-
if issubclass(instrument_type, ValueObserver):
65-
return ValueObserverAggregator()
66-
# TODO: Add other aggregators
67-
return SumAggregator()
68-
6936
def checkpoint_set(self) -> Sequence[MetricRecord]:
7037
"""Returns a list of MetricRecords used for exporting.
7138

0 commit comments

Comments
 (0)