Skip to content

Commit 5a0dae9

Browse files
Implement MinMaxSumCount aggregator (open-telemetry#422)
Adding one of the core aggregators in the metrics API. This aggregator is the default aggregator for measure metrics and keeps the minimum, maximum, sum and count of those measures.
1 parent 120ae29 commit 5a0dae9

File tree

7 files changed

+264
-152
lines changed

7 files changed

+264
-152
lines changed

examples/metrics/simple_example.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Copyright 2019, OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
"""
16+
This module serves as an example for a simple application using metrics
17+
It shows:
18+
- How to configure a meter passing a sateful or stateless.
19+
- How to configure an exporter and how to create a controller.
20+
- How to create some metrics intruments and how to capture data with them.
21+
"""
22+
import sys
23+
import time
24+
25+
from opentelemetry import metrics
26+
from opentelemetry.sdk.metrics import Counter, Measure, Meter
27+
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
28+
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
29+
from opentelemetry.sdk.metrics.export.controller import PushController
30+
31+
batcher_mode = "stateful"
32+
33+
34+
def usage(argv):
35+
print("usage:")
36+
print("{} [mode]".format(argv[0]))
37+
print("mode: stateful (default) or stateless")
38+
39+
40+
if len(sys.argv) >= 2:
41+
batcher_mode = sys.argv[1]
42+
if batcher_mode not in ("stateful", "stateless"):
43+
print("bad mode specified.")
44+
usage(sys.argv)
45+
sys.exit(1)
46+
47+
# Batcher used to collect all created metrics from meter ready for exporting
48+
# Pass in True/False to indicate whether the batcher is stateful.
49+
# True indicates the batcher computes checkpoints from over the process
50+
# lifetime.
51+
# False indicates the batcher computes checkpoints which describe the updates
52+
# of a single collection period (deltas)
53+
batcher = UngroupedBatcher(batcher_mode == "stateful")
54+
55+
# If a batcher is not provided, a default batcher is used
56+
# Meter is responsible for creating and recording metrics
57+
metrics.set_preferred_meter_implementation(lambda _: Meter(batcher))
58+
meter = metrics.meter()
59+
60+
# Exporter to export metrics to the console
61+
exporter = ConsoleMetricsExporter()
62+
63+
# A PushController collects metrics created from meter and exports it via the
64+
# exporter every interval
65+
controller = PushController(meter, exporter, 5)
66+
67+
# Metric instruments allow to capture measurements
68+
requests_counter = meter.create_metric(
69+
"requests", "number of requests", 1, int, Counter, ("environment",)
70+
)
71+
72+
clicks_counter = meter.create_metric(
73+
"clicks", "number of clicks", 1, int, Counter, ("environment",)
74+
)
75+
76+
requests_size = meter.create_metric(
77+
"requests_size", "size of requests", 1, int, Measure, ("environment",)
78+
)
79+
80+
# Labelsets are used to identify key-values that are associated with a specific
81+
# metric that you want to record. These are useful for pre-aggregation and can
82+
# be used to store custom dimensions pertaining to a metric
83+
staging_label_set = meter.get_label_set({"environment": "staging"})
84+
testing_label_set = meter.get_label_set({"environment": "testing"})
85+
86+
# Update the metric instruments using the direct calling convention
87+
requests_size.record(100, staging_label_set)
88+
requests_counter.add(25, staging_label_set)
89+
# Sleep for 5 seconds, exported value should be 25
90+
time.sleep(5)
91+
92+
requests_size.record(5000, staging_label_set)
93+
requests_counter.add(50, staging_label_set)
94+
# Exported value should be 75
95+
time.sleep(5)
96+
97+
requests_size.record(2, testing_label_set)
98+
requests_counter.add(35, testing_label_set)
99+
# There should be two exported values 75 and 35, one for each labelset
100+
time.sleep(5)
101+
102+
clicks_counter.add(5, staging_label_set)
103+
# There should be three exported values, labelsets can be reused for different
104+
# metrics but will be recorded seperately, 75, 35 and 5
105+
106+
time.sleep(5)

examples/metrics/stateful.py

Lines changed: 0 additions & 72 deletions
This file was deleted.

examples/metrics/stateless.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import abc
16+
from collections import namedtuple
1617

1718

1819
class Aggregator(abc.ABC):
@@ -56,3 +57,52 @@ def take_checkpoint(self):
5657

5758
def merge(self, other):
5859
self.checkpoint += other.checkpoint
60+
61+
62+
class MinMaxSumCountAggregator(Aggregator):
63+
"""Agregator for Measure metrics that keeps min, max, sum and count."""
64+
65+
_TYPE = namedtuple("minmaxsumcount", "min max sum count")
66+
67+
@classmethod
68+
def _min(cls, val1, val2):
69+
if val1 is None and val2 is None:
70+
return None
71+
return min(val1 or val2, val2 or val1)
72+
73+
@classmethod
74+
def _max(cls, val1, val2):
75+
if val1 is None and val2 is None:
76+
return None
77+
return max(val1 or val2, val2 or val1)
78+
79+
@classmethod
80+
def _sum(cls, val1, val2):
81+
if val1 is None and val2 is None:
82+
return None
83+
return (val1 or 0) + (val2 or 0)
84+
85+
def __init__(self):
86+
super().__init__()
87+
self.current = self._TYPE(None, None, None, 0)
88+
self.checkpoint = self._TYPE(None, None, None, 0)
89+
90+
def update(self, value):
91+
self.current = self._TYPE(
92+
self._min(self.current.min, value),
93+
self._max(self.current.max, value),
94+
self._sum(self.current.sum, value),
95+
self.current.count + 1,
96+
)
97+
98+
def take_checkpoint(self):
99+
self.checkpoint = self.current
100+
self.current = self._TYPE(None, None, None, 0)
101+
102+
def merge(self, other):
103+
self.checkpoint = self._TYPE(
104+
self._min(self.checkpoint.min, other.checkpoint.min),
105+
self._max(self.checkpoint.max, other.checkpoint.max),
106+
self._sum(self.checkpoint.sum, other.checkpoint.sum),
107+
self.checkpoint.count + other.checkpoint.count,
108+
)

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
import abc
1616
from typing import Sequence, Type
1717

18-
from opentelemetry.metrics import Counter, MetricT
18+
from opentelemetry.metrics import Counter, Measure, MetricT
1919
from opentelemetry.sdk.metrics.export import MetricRecord
2020
from opentelemetry.sdk.metrics.export.aggregate import (
2121
Aggregator,
2222
CounterAggregator,
23+
MinMaxSumCountAggregator,
2324
)
2425

2526

@@ -45,8 +46,10 @@ def aggregator_for(self, metric_type: Type[MetricT]) -> Aggregator:
4546
Aggregators keep track of and updates values when metrics get updated.
4647
"""
4748
# pylint:disable=R0201
48-
if metric_type == Counter:
49+
if issubclass(metric_type, Counter):
4950
return CounterAggregator()
51+
if issubclass(metric_type, Measure):
52+
return MinMaxSumCountAggregator()
5053
# TODO: Add other aggregators
5154
return CounterAggregator()
5255

0 commit comments

Comments
 (0)