26
26
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
27
27
.. _Specification: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#zipkin-exporter
28
28
29
+ .. envvar:: OTEL_EXPORTER_ZIPKIN_ENDPOINT
30
+ .. envvar:: OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT
31
+
29
32
.. code:: python
30
33
31
34
from opentelemetry import trace
55
58
with tracer.start_as_current_span("foo"):
56
59
print("Hello world!")
57
60
58
- The exporter supports endpoint configuration via the OTEL_EXPORTER_ZIPKIN_ENDPOINT environment variables as defined in the `Specification`_
61
+ The exporter supports the following environment variables for configuration:
62
+
63
+ :envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`: target to which the exporter will
64
+ send data. This may include a path (e.g. http://example.com:9411/api/v2/spans).
65
+
66
+ :envvar:`OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT`: transport interchange format
67
+ to use when sending data. Currently only Zipkin's v2 json and protobuf formats
68
+ are supported, with v2 json being the default.
59
69
60
70
API
61
71
---
62
72
"""
63
73
64
74
import json
65
75
import logging
66
- import os
67
- from typing import Optional , Sequence
76
+ from typing import Optional , Sequence , Union
68
77
from urllib .parse import urlparse
69
78
70
79
import requests
71
80
81
+ from opentelemetry .configuration import Configuration
82
+ from opentelemetry .exporter .zipkin .gen import zipkin_pb2
72
83
from opentelemetry .sdk .trace .export import SpanExporter , SpanExportResult
73
84
from opentelemetry .trace import Span , SpanContext , SpanKind
74
85
86
+ TRANSPORT_FORMAT_JSON = "json"
87
+ TRANSPORT_FORMAT_PROTOBUF = "protobuf"
88
+
75
89
DEFAULT_RETRY = False
76
90
DEFAULT_URL = "http://localhost:9411/api/v2/spans"
77
91
DEFAULT_MAX_TAG_VALUE_LENGTH = 128
78
- ZIPKIN_HEADERS = {"Content-Type" : "application/json" }
79
92
80
- SPAN_KIND_MAP = {
93
+ SPAN_KIND_MAP_JSON = {
81
94
SpanKind .INTERNAL : None ,
82
95
SpanKind .SERVER : "SERVER" ,
83
96
SpanKind .CLIENT : "CLIENT" ,
84
97
SpanKind .PRODUCER : "PRODUCER" ,
85
98
SpanKind .CONSUMER : "CONSUMER" ,
86
99
}
87
100
101
+ SPAN_KIND_MAP_PROTOBUF = {
102
+ SpanKind .INTERNAL : zipkin_pb2 .Span .Kind .SPAN_KIND_UNSPECIFIED ,
103
+ SpanKind .SERVER : zipkin_pb2 .Span .Kind .SERVER ,
104
+ SpanKind .CLIENT : zipkin_pb2 .Span .Kind .CLIENT ,
105
+ SpanKind .PRODUCER : zipkin_pb2 .Span .Kind .PRODUCER ,
106
+ SpanKind .CONSUMER : zipkin_pb2 .Span .Kind .CONSUMER ,
107
+ }
108
+
88
109
SUCCESS_STATUS_CODES = (200 , 202 )
89
110
90
111
logger = logging .getLogger (__name__ )
@@ -100,6 +121,7 @@ class ZipkinSpanExporter(SpanExporter):
100
121
ipv4: Primary IPv4 address associated with this connection.
101
122
ipv6: Primary IPv6 address associated with this connection.
102
123
retry: Set to True to configure the exporter to retry on failure.
124
+ transport_format: transport interchange format to use
103
125
"""
104
126
105
127
def __init__ (
@@ -110,12 +132,13 @@ def __init__(
110
132
ipv6 : Optional [str ] = None ,
111
133
retry : Optional [str ] = DEFAULT_RETRY ,
112
134
max_tag_value_length : Optional [int ] = DEFAULT_MAX_TAG_VALUE_LENGTH ,
135
+ transport_format : Union [
136
+ TRANSPORT_FORMAT_JSON , TRANSPORT_FORMAT_PROTOBUF , None
137
+ ] = None ,
113
138
):
114
139
self .service_name = service_name
115
140
if url is None :
116
- self .url = os .environ .get (
117
- "OTEL_EXPORTER_ZIPKIN_ENDPOINT" , DEFAULT_URL
118
- )
141
+ self .url = Configuration ().EXPORTER_ZIPKIN_ENDPOINT or DEFAULT_URL
119
142
else :
120
143
self .url = url
121
144
@@ -126,10 +149,27 @@ def __init__(
126
149
self .retry = retry
127
150
self .max_tag_value_length = max_tag_value_length
128
151
152
+ if transport_format is None :
153
+ self .transport_format = (
154
+ Configuration ().EXPORTER_ZIPKIN_TRANSPORT_FORMAT
155
+ or TRANSPORT_FORMAT_JSON
156
+ )
157
+ else :
158
+ self .transport_format = transport_format
159
+
129
160
def export (self , spans : Sequence [Span ]) -> SpanExportResult :
130
- zipkin_spans = self ._translate_to_zipkin (spans )
161
+ if self .transport_format == TRANSPORT_FORMAT_JSON :
162
+ content_type = "application/json"
163
+ elif self .transport_format == TRANSPORT_FORMAT_PROTOBUF :
164
+ content_type = "application/x-protobuf"
165
+ else :
166
+ logger .error ("Invalid transport format %s" , self .transport_format )
167
+ return SpanExportResult .FAILURE
168
+
131
169
result = requests .post (
132
- url = self .url , data = json .dumps (zipkin_spans ), headers = ZIPKIN_HEADERS
170
+ url = self .url ,
171
+ data = self ._translate_to_transport_format (spans ),
172
+ headers = {"Content-Type" : content_type },
133
173
)
134
174
135
175
if result .status_code not in SUCCESS_STATUS_CODES :
@@ -147,8 +187,14 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
147
187
def shutdown (self ) -> None :
148
188
pass
149
189
150
- def _translate_to_zipkin (self , spans : Sequence [Span ]):
190
+ def _translate_to_transport_format (self , spans : Sequence [Span ]):
191
+ return (
192
+ self ._translate_to_json (spans )
193
+ if self .transport_format == TRANSPORT_FORMAT_JSON
194
+ else self ._translate_to_protobuf (spans )
195
+ )
151
196
197
+ def _translate_to_json (self , spans : Sequence [Span ]):
152
198
local_endpoint = {"serviceName" : self .service_name , "port" : self .port }
153
199
154
200
if self .ipv4 is not None :
@@ -165,8 +211,8 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
165
211
166
212
# Timestamp in zipkin spans is int of microseconds.
167
213
# see: https://zipkin.io/pages/instrumenting.html
168
- start_timestamp_mus = _nsec_to_usec_round (span .start_time )
169
- duration_mus = _nsec_to_usec_round (span .end_time - span .start_time )
214
+ start_timestamp_mus = nsec_to_usec_round (span .start_time )
215
+ duration_mus = nsec_to_usec_round (span .end_time - span .start_time )
170
216
171
217
zipkin_span = {
172
218
# Ensure left-zero-padding of traceId, spanId, parentId
@@ -176,7 +222,7 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
176
222
"timestamp" : start_timestamp_mus ,
177
223
"duration" : duration_mus ,
178
224
"localEndpoint" : local_endpoint ,
179
- "kind" : SPAN_KIND_MAP [span .kind ],
225
+ "kind" : SPAN_KIND_MAP_JSON [span .kind ],
180
226
"tags" : self ._extract_tags_from_span (span ),
181
227
"annotations" : self ._extract_annotations_from_events (
182
228
span .events
@@ -211,7 +257,94 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
211
257
zipkin_span ["parentId" ] = format (span .parent .span_id , "016x" )
212
258
213
259
zipkin_spans .append (zipkin_span )
214
- return zipkin_spans
260
+
261
+ return json .dumps (zipkin_spans )
262
+
263
+ def _translate_to_protobuf (self , spans : Sequence [Span ]):
264
+
265
+ local_endpoint = zipkin_pb2 .Endpoint (
266
+ service_name = self .service_name , port = self .port
267
+ )
268
+
269
+ if self .ipv4 is not None :
270
+ local_endpoint .ipv4 = self .ipv4
271
+
272
+ if self .ipv6 is not None :
273
+ local_endpoint .ipv6 = self .ipv6
274
+
275
+ pbuf_spans = zipkin_pb2 .ListOfSpans ()
276
+
277
+ for span in spans :
278
+ context = span .get_span_context ()
279
+ trace_id = context .trace_id .to_bytes (
280
+ length = 16 , byteorder = "big" , signed = False ,
281
+ )
282
+ span_id = self .format_pbuf_span_id (context .span_id )
283
+
284
+ # Timestamp in zipkin spans is int of microseconds.
285
+ # see: https://zipkin.io/pages/instrumenting.html
286
+ start_timestamp_mus = nsec_to_usec_round (span .start_time )
287
+ duration_mus = nsec_to_usec_round (span .end_time - span .start_time )
288
+
289
+ # pylint: disable=no-member
290
+ pbuf_span = zipkin_pb2 .Span (
291
+ trace_id = trace_id ,
292
+ id = span_id ,
293
+ name = span .name ,
294
+ timestamp = start_timestamp_mus ,
295
+ duration = duration_mus ,
296
+ local_endpoint = local_endpoint ,
297
+ kind = SPAN_KIND_MAP_PROTOBUF [span .kind ],
298
+ tags = self ._extract_tags_from_span (span ),
299
+ )
300
+
301
+ annotations = self ._extract_annotations_from_events (span .events )
302
+
303
+ if annotations is not None :
304
+ for annotation in annotations :
305
+ pbuf_span .annotations .append (
306
+ zipkin_pb2 .Annotation (
307
+ timestamp = annotation ["timestamp" ],
308
+ value = annotation ["value" ],
309
+ )
310
+ )
311
+
312
+ if span .instrumentation_info is not None :
313
+ pbuf_span .tags .update (
314
+ {
315
+ "otel.instrumentation_library.name" : span .instrumentation_info .name ,
316
+ "otel.instrumentation_library.version" : span .instrumentation_info .version ,
317
+ }
318
+ )
319
+
320
+ if span .status is not None :
321
+ pbuf_span .tags .update (
322
+ {"otel.status_code" : str (span .status .status_code .value )}
323
+ )
324
+ if span .status .description is not None :
325
+ pbuf_span .tags .update (
326
+ {"otel.status_description" : span .status .description }
327
+ )
328
+
329
+ if context .trace_flags .sampled :
330
+ pbuf_span .debug = True
331
+
332
+ if isinstance (span .parent , Span ):
333
+ pbuf_span .parent_id = self .format_pbuf_span_id (
334
+ span .parent .get_span_context ().span_id
335
+ )
336
+ elif isinstance (span .parent , SpanContext ):
337
+ pbuf_span .parent_id = self .format_pbuf_span_id (
338
+ span .parent .span_id
339
+ )
340
+
341
+ pbuf_spans .spans .append (pbuf_span )
342
+
343
+ return pbuf_spans .SerializeToString ()
344
+
345
+ @staticmethod
346
+ def format_pbuf_span_id (span_id : int ):
347
+ return span_id .to_bytes (length = 8 , byteorder = "big" , signed = False )
215
348
216
349
def _extract_tags_from_dict (self , tags_dict ):
217
350
tags = {}
@@ -251,13 +384,13 @@ def _extract_annotations_from_events(self, events):
251
384
252
385
annotations .append (
253
386
{
254
- "timestamp" : _nsec_to_usec_round (event .timestamp ),
387
+ "timestamp" : nsec_to_usec_round (event .timestamp ),
255
388
"value" : json .dumps ({event .name : attrs }),
256
389
}
257
390
)
258
391
return annotations
259
392
260
393
261
- def _nsec_to_usec_round (nsec ):
394
+ def nsec_to_usec_round (nsec ):
262
395
"""Round nanoseconds to microseconds"""
263
396
return (nsec + 500 ) // 10 ** 3
0 commit comments