15
15
"""OTLP Metrics Exporter"""
16
16
17
17
import logging
18
- from typing import List , Sequence , Type , TypeVar , Union
18
+ from typing import List , Sequence , Type , TypeVar
19
19
20
20
# pylint: disable=duplicate-code
21
21
from opentelemetry .exporter .otlp .exporter import (
22
22
OTLPExporterMixin ,
23
23
_get_resource_data ,
24
24
)
25
- from opentelemetry .metrics import InstrumentT
26
25
from opentelemetry .proto .collector .metrics .v1 .metrics_service_pb2 import (
27
26
ExportMetricsServiceRequest ,
28
27
)
31
30
)
32
31
from opentelemetry .proto .common .v1 .common_pb2 import StringKeyValue
33
32
from opentelemetry .proto .metrics .v1 .metrics_pb2 import (
33
+ AggregationTemporality ,
34
34
DoubleDataPoint ,
35
+ DoubleGauge ,
36
+ DoubleSum ,
35
37
InstrumentationLibraryMetrics ,
36
- Int64DataPoint ,
37
- )
38
- from opentelemetry .proto .metrics .v1 .metrics_pb2 import (
39
- Metric as CollectorMetric ,
40
- )
41
- from opentelemetry .proto .metrics .v1 .metrics_pb2 import (
42
- MetricDescriptor ,
43
- ResourceMetrics ,
38
+ IntDataPoint ,
39
+ IntGauge ,
40
+ IntSum ,
44
41
)
42
+ from opentelemetry .proto .metrics .v1 .metrics_pb2 import Metric as OTLPMetric
43
+ from opentelemetry .proto .metrics .v1 .metrics_pb2 import ResourceMetrics
45
44
from opentelemetry .sdk .metrics import (
46
45
Counter ,
47
46
SumObserver ,
57
56
)
58
57
59
58
logger = logging .getLogger (__name__ )
60
- DataPointT = TypeVar ("DataPointT" , Int64DataPoint , DoubleDataPoint )
59
+ DataPointT = TypeVar ("DataPointT" , IntDataPoint , DoubleDataPoint )
61
60
62
61
63
62
def _get_data_points (
@@ -93,45 +92,6 @@ def _get_data_points(
93
92
return data_points
94
93
95
94
96
- def _get_temporality (
97
- instrument : InstrumentT ,
98
- ) -> "MetricDescriptor.TemporalityValue" :
99
- # pylint: disable=no-member
100
- if isinstance (instrument , (Counter , UpDownCounter )):
101
- temporality = MetricDescriptor .Temporality .DELTA
102
- elif isinstance (instrument , (ValueRecorder , ValueObserver )):
103
- temporality = MetricDescriptor .Temporality .INSTANTANEOUS
104
- elif isinstance (instrument , (SumObserver , UpDownSumObserver )):
105
- temporality = MetricDescriptor .Temporality .CUMULATIVE
106
- else :
107
- raise Exception (
108
- "No temporality defined for instrument type {}" .format (
109
- type (instrument )
110
- )
111
- )
112
-
113
- return temporality
114
-
115
-
116
- def _get_type (value_type : Union [int , float ]) -> "MetricDescriptor.TypeValue" :
117
- # pylint: disable=no-member
118
- if value_type is int : # type: ignore[comparison-overlap]
119
- type_ = MetricDescriptor .Type .INT64
120
-
121
- elif value_type is float : # type: ignore[comparison-overlap]
122
- type_ = MetricDescriptor .Type .DOUBLE
123
-
124
- # FIXME What are the types that correspond with
125
- # MetricDescriptor.Type.HISTOGRAM and
126
- # MetricDescriptor.Type.SUMMARY?
127
- else :
128
- raise Exception (
129
- "No type defined for valie type {}" .format (type (value_type ))
130
- )
131
-
132
- return type_
133
-
134
-
135
95
class OTLPMetricsExporter (
136
96
MetricsExporter ,
137
97
OTLPExporterMixin [
@@ -150,6 +110,7 @@ class OTLPMetricsExporter(
150
110
_stub = MetricsServiceStub
151
111
_result = MetricsExportResult
152
112
113
+ # pylint: disable=no-self-use
153
114
def _translate_data (
154
115
self , data : Sequence [MetricRecord ]
155
116
) -> ExportMetricsServiceRequest :
@@ -158,6 +119,22 @@ def _translate_data(
158
119
159
120
sdk_resource_instrumentation_library_metrics = {}
160
121
122
+ # The criteria to decide how to translate data is based on this table
123
+ # taken directly from OpenTelemetry Proto v0.5.0:
124
+
125
+ # TODO: Update table after the decision on:
126
+ # https://github.com/open-telemetry/opentelemetry-specification/issues/731.
127
+ # By default, metrics recording using the OpenTelemetry API are exported as
128
+ # (the table does not include MeasurementValueType to avoid extra rows):
129
+ #
130
+ # Instrument Type
131
+ # ----------------------------------------------
132
+ # Counter Sum(aggregation_temporality=delta;is_monotonic=true)
133
+ # UpDownCounter Sum(aggregation_temporality=delta;is_monotonic=false)
134
+ # ValueRecorder TBD
135
+ # SumObserver Sum(aggregation_temporality=cumulative;is_monotonic=true)
136
+ # UpDownSumObserver Sum(aggregation_temporality=cumulative;is_monotonic=false)
137
+ # ValueObserver Gauge()
161
138
for sdk_metric in data :
162
139
163
140
if sdk_metric .instrument .meter .resource not in (
@@ -167,37 +144,90 @@ def _translate_data(
167
144
sdk_metric .instrument .meter .resource
168
145
] = InstrumentationLibraryMetrics ()
169
146
170
- self ._metric_descriptor_kwargs = {}
147
+ type_class = {
148
+ int : {
149
+ "sum" : {"class" : IntSum , "argument" : "int_sum" },
150
+ "gauge" : {"class" : IntGauge , "argument" : "int_gauge" },
151
+ "data_point_class" : IntDataPoint ,
152
+ },
153
+ float : {
154
+ "sum" : {"class" : DoubleSum , "argument" : "double_sum" },
155
+ "gauge" : {
156
+ "class" : DoubleGauge ,
157
+ "argument" : "double_gauge" ,
158
+ },
159
+ "data_point_class" : DoubleDataPoint ,
160
+ },
161
+ }
162
+
163
+ value_type = sdk_metric .instrument .value_type
164
+
165
+ sum_class = type_class [value_type ]["sum" ]["class" ]
166
+ gauge_class = type_class [value_type ]["gauge" ]["class" ]
167
+ data_point_class = type_class [value_type ]["data_point_class" ]
168
+
169
+ if isinstance (sdk_metric .instrument , Counter ):
170
+ otlp_metric_data = sum_class (
171
+ data_points = _get_data_points (sdk_metric , data_point_class ),
172
+ aggregation_temporality = (
173
+ AggregationTemporality .AGGREGATION_TEMPORALITY_DELTA
174
+ ),
175
+ is_monotonic = True ,
176
+ )
177
+ argument = type_class [value_type ]["sum" ]["argument" ]
171
178
172
- metric_descriptor = MetricDescriptor (
173
- name = sdk_metric .instrument .name ,
174
- description = sdk_metric .instrument .description ,
175
- unit = sdk_metric .instrument .unit ,
176
- type = _get_type (sdk_metric .instrument .value_type ),
177
- temporality = _get_temporality (sdk_metric .instrument ),
178
- )
179
+ elif isinstance (sdk_metric .instrument , UpDownCounter ):
180
+ otlp_metric_data = sum_class (
181
+ data_points = _get_data_points (sdk_metric , data_point_class ),
182
+ aggregation_temporality = (
183
+ AggregationTemporality .AGGREGATION_TEMPORALITY_DELTA
184
+ ),
185
+ is_monotonic = False ,
186
+ )
187
+ argument = type_class [value_type ]["sum" ]["argument" ]
179
188
180
- if metric_descriptor .type == MetricDescriptor .Type .INT64 :
189
+ elif isinstance (sdk_metric .instrument , (ValueRecorder )):
190
+ logger .warning ("Skipping exporting of ValueRecorder metric" )
191
+ continue
181
192
182
- collector_metric = CollectorMetric (
183
- metric_descriptor = metric_descriptor ,
184
- int64_data_points = _get_data_points (
185
- sdk_metric , Int64DataPoint
193
+ elif isinstance (sdk_metric .instrument , SumObserver ):
194
+ otlp_metric_data = sum_class (
195
+ data_points = _get_data_points (sdk_metric , data_point_class ),
196
+ aggregation_temporality = (
197
+ AggregationTemporality .AGGREGATION_TEMPORALITY_CUMULATIVE
186
198
),
199
+ is_monotonic = True ,
187
200
)
201
+ argument = type_class [value_type ]["sum" ]["argument" ]
188
202
189
- elif metric_descriptor .type == MetricDescriptor .Type .DOUBLE :
190
-
191
- collector_metric = CollectorMetric (
192
- metric_descriptor = metric_descriptor ,
193
- double_data_points = _get_data_points (
194
- sdk_metric , DoubleDataPoint
203
+ elif isinstance (sdk_metric .instrument , UpDownSumObserver ):
204
+ otlp_metric_data = sum_class (
205
+ data_points = _get_data_points (sdk_metric , data_point_class ),
206
+ aggregation_temporality = (
207
+ AggregationTemporality .AGGREGATION_TEMPORALITY_CUMULATIVE
195
208
),
209
+ is_monotonic = False ,
210
+ )
211
+ argument = type_class [value_type ]["sum" ]["argument" ]
212
+
213
+ elif isinstance (sdk_metric .instrument , (ValueObserver )):
214
+ otlp_metric_data = gauge_class (
215
+ data_points = _get_data_points (sdk_metric , data_point_class )
196
216
)
217
+ argument = type_class [value_type ]["gauge" ]["argument" ]
197
218
198
219
sdk_resource_instrumentation_library_metrics [
199
220
sdk_metric .instrument .meter .resource
200
- ].metrics .append (collector_metric )
221
+ ].metrics .append (
222
+ OTLPMetric (
223
+ ** {
224
+ "name" : sdk_metric .instrument .name ,
225
+ "description" : sdk_metric .instrument .description ,
226
+ "unit" : sdk_metric .instrument .unit ,
227
+ argument : otlp_metric_data ,
228
+ }
229
+ )
230
+ )
201
231
202
232
return ExportMetricsServiceRequest (
203
233
resource_metrics = _get_resource_data (
0 commit comments