Skip to content

Commit 8ca9e46

Browse files
authored
Zipkin exporter v2 api support for protobuf format (open-telemetry#1318)
1 parent 03b2480 commit 8ca9e46

File tree

10 files changed

+1141
-29
lines changed

10 files changed

+1141
-29
lines changed

.flake8

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ exclude =
1818
__pycache__
1919
exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/gen/
2020
exporter/opentelemetry-exporter-jaeger/build/*
21+
exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen
2122
docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/
2223
docs/examples/opentelemetry-example-app/build/*
2324
opentelemetry-proto/build/*

.pylintrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ contextmanager-decorators=contextlib.contextmanager
165165
# List of members which are set dynamically and missed by pylint inference
166166
# system, and so shouldn't trigger E1101 when accessed. Python regular
167167
# expressions are accepted.
168-
generated-members=
168+
generated-members=zipkin_pb2.*
169169

170170
# Tells whether missing members accessed in mixin class should be ignored. A
171171
# mixin class is detected if its name ends with "mixin" (case insensitive).

exporter/opentelemetry-exporter-zipkin/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Unreleased
44

5+
- Support for v2 api protobuf format ([#1318](https://github.com/open-telemetry/opentelemetry-python/pull/1318))
6+
57
## Version 0.14b0
68

79
Released 2020-10-13

exporter/opentelemetry-exporter-zipkin/setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ package_dir=
3939
=src
4040
packages=find_namespace:
4141
install_requires =
42+
protobuf >= 3.12
4243
requests ~= 2.7
4344
opentelemetry-api == 0.16.dev0
4445
opentelemetry-sdk == 0.16.dev0

exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py

Lines changed: 150 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
2727
.. _Specification: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#zipkin-exporter
2828
29+
.. envvar:: OTEL_EXPORTER_ZIPKIN_ENDPOINT
30+
.. envvar:: OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT
31+
2932
.. code:: python
3033
3134
from opentelemetry import trace
@@ -55,36 +58,54 @@
5558
with tracer.start_as_current_span("foo"):
5659
print("Hello world!")
5760
58-
The exporter supports endpoint configuration via the OTEL_EXPORTER_ZIPKIN_ENDPOINT environment variables as defined in the `Specification`_
61+
The exporter supports the following environment variables for configuration:
62+
63+
:envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`: target to which the exporter will
64+
send data. This may include a path (e.g. http://example.com:9411/api/v2/spans).
65+
66+
:envvar:`OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT`: transport interchange format
67+
to use when sending data. Currently only Zipkin's v2 json and protobuf formats
68+
are supported, with v2 json being the default.
5969
6070
API
6171
---
6272
"""
6373

6474
import json
6575
import logging
66-
import os
67-
from typing import Optional, Sequence
76+
from typing import Optional, Sequence, Union
6877
from urllib.parse import urlparse
6978

7079
import requests
7180

81+
from opentelemetry.configuration import Configuration
82+
from opentelemetry.exporter.zipkin.gen import zipkin_pb2
7283
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
7384
from opentelemetry.trace import Span, SpanContext, SpanKind
7485

86+
TRANSPORT_FORMAT_JSON = "json"
87+
TRANSPORT_FORMAT_PROTOBUF = "protobuf"
88+
7589
DEFAULT_RETRY = False
7690
DEFAULT_URL = "http://localhost:9411/api/v2/spans"
7791
DEFAULT_MAX_TAG_VALUE_LENGTH = 128
78-
ZIPKIN_HEADERS = {"Content-Type": "application/json"}
7992

80-
SPAN_KIND_MAP = {
93+
SPAN_KIND_MAP_JSON = {
8194
SpanKind.INTERNAL: None,
8295
SpanKind.SERVER: "SERVER",
8396
SpanKind.CLIENT: "CLIENT",
8497
SpanKind.PRODUCER: "PRODUCER",
8598
SpanKind.CONSUMER: "CONSUMER",
8699
}
87100

101+
SPAN_KIND_MAP_PROTOBUF = {
102+
SpanKind.INTERNAL: zipkin_pb2.Span.Kind.SPAN_KIND_UNSPECIFIED,
103+
SpanKind.SERVER: zipkin_pb2.Span.Kind.SERVER,
104+
SpanKind.CLIENT: zipkin_pb2.Span.Kind.CLIENT,
105+
SpanKind.PRODUCER: zipkin_pb2.Span.Kind.PRODUCER,
106+
SpanKind.CONSUMER: zipkin_pb2.Span.Kind.CONSUMER,
107+
}
108+
88109
SUCCESS_STATUS_CODES = (200, 202)
89110

90111
logger = logging.getLogger(__name__)
@@ -100,6 +121,7 @@ class ZipkinSpanExporter(SpanExporter):
100121
ipv4: Primary IPv4 address associated with this connection.
101122
ipv6: Primary IPv6 address associated with this connection.
102123
retry: Set to True to configure the exporter to retry on failure.
124+
transport_format: transport interchange format to use
103125
"""
104126

105127
def __init__(
@@ -110,12 +132,13 @@ def __init__(
110132
ipv6: Optional[str] = None,
111133
retry: Optional[str] = DEFAULT_RETRY,
112134
max_tag_value_length: Optional[int] = DEFAULT_MAX_TAG_VALUE_LENGTH,
135+
transport_format: Union[
136+
TRANSPORT_FORMAT_JSON, TRANSPORT_FORMAT_PROTOBUF, None
137+
] = None,
113138
):
114139
self.service_name = service_name
115140
if url is None:
116-
self.url = os.environ.get(
117-
"OTEL_EXPORTER_ZIPKIN_ENDPOINT", DEFAULT_URL
118-
)
141+
self.url = Configuration().EXPORTER_ZIPKIN_ENDPOINT or DEFAULT_URL
119142
else:
120143
self.url = url
121144

@@ -126,10 +149,27 @@ def __init__(
126149
self.retry = retry
127150
self.max_tag_value_length = max_tag_value_length
128151

152+
if transport_format is None:
153+
self.transport_format = (
154+
Configuration().EXPORTER_ZIPKIN_TRANSPORT_FORMAT
155+
or TRANSPORT_FORMAT_JSON
156+
)
157+
else:
158+
self.transport_format = transport_format
159+
129160
def export(self, spans: Sequence[Span]) -> SpanExportResult:
130-
zipkin_spans = self._translate_to_zipkin(spans)
161+
if self.transport_format == TRANSPORT_FORMAT_JSON:
162+
content_type = "application/json"
163+
elif self.transport_format == TRANSPORT_FORMAT_PROTOBUF:
164+
content_type = "application/x-protobuf"
165+
else:
166+
logger.error("Invalid transport format %s", self.transport_format)
167+
return SpanExportResult.FAILURE
168+
131169
result = requests.post(
132-
url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS
170+
url=self.url,
171+
data=self._translate_to_transport_format(spans),
172+
headers={"Content-Type": content_type},
133173
)
134174

135175
if result.status_code not in SUCCESS_STATUS_CODES:
@@ -147,8 +187,14 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
147187
def shutdown(self) -> None:
148188
pass
149189

150-
def _translate_to_zipkin(self, spans: Sequence[Span]):
190+
def _translate_to_transport_format(self, spans: Sequence[Span]):
191+
return (
192+
self._translate_to_json(spans)
193+
if self.transport_format == TRANSPORT_FORMAT_JSON
194+
else self._translate_to_protobuf(spans)
195+
)
151196

197+
def _translate_to_json(self, spans: Sequence[Span]):
152198
local_endpoint = {"serviceName": self.service_name, "port": self.port}
153199

154200
if self.ipv4 is not None:
@@ -165,8 +211,8 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
165211

166212
# Timestamp in zipkin spans is int of microseconds.
167213
# see: https://zipkin.io/pages/instrumenting.html
168-
start_timestamp_mus = _nsec_to_usec_round(span.start_time)
169-
duration_mus = _nsec_to_usec_round(span.end_time - span.start_time)
214+
start_timestamp_mus = nsec_to_usec_round(span.start_time)
215+
duration_mus = nsec_to_usec_round(span.end_time - span.start_time)
170216

171217
zipkin_span = {
172218
# Ensure left-zero-padding of traceId, spanId, parentId
@@ -176,7 +222,7 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
176222
"timestamp": start_timestamp_mus,
177223
"duration": duration_mus,
178224
"localEndpoint": local_endpoint,
179-
"kind": SPAN_KIND_MAP[span.kind],
225+
"kind": SPAN_KIND_MAP_JSON[span.kind],
180226
"tags": self._extract_tags_from_span(span),
181227
"annotations": self._extract_annotations_from_events(
182228
span.events
@@ -211,7 +257,94 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
211257
zipkin_span["parentId"] = format(span.parent.span_id, "016x")
212258

213259
zipkin_spans.append(zipkin_span)
214-
return zipkin_spans
260+
261+
return json.dumps(zipkin_spans)
262+
263+
def _translate_to_protobuf(self, spans: Sequence[Span]):
264+
265+
local_endpoint = zipkin_pb2.Endpoint(
266+
service_name=self.service_name, port=self.port
267+
)
268+
269+
if self.ipv4 is not None:
270+
local_endpoint.ipv4 = self.ipv4
271+
272+
if self.ipv6 is not None:
273+
local_endpoint.ipv6 = self.ipv6
274+
275+
pbuf_spans = zipkin_pb2.ListOfSpans()
276+
277+
for span in spans:
278+
context = span.get_span_context()
279+
trace_id = context.trace_id.to_bytes(
280+
length=16, byteorder="big", signed=False,
281+
)
282+
span_id = self.format_pbuf_span_id(context.span_id)
283+
284+
# Timestamp in zipkin spans is int of microseconds.
285+
# see: https://zipkin.io/pages/instrumenting.html
286+
start_timestamp_mus = nsec_to_usec_round(span.start_time)
287+
duration_mus = nsec_to_usec_round(span.end_time - span.start_time)
288+
289+
# pylint: disable=no-member
290+
pbuf_span = zipkin_pb2.Span(
291+
trace_id=trace_id,
292+
id=span_id,
293+
name=span.name,
294+
timestamp=start_timestamp_mus,
295+
duration=duration_mus,
296+
local_endpoint=local_endpoint,
297+
kind=SPAN_KIND_MAP_PROTOBUF[span.kind],
298+
tags=self._extract_tags_from_span(span),
299+
)
300+
301+
annotations = self._extract_annotations_from_events(span.events)
302+
303+
if annotations is not None:
304+
for annotation in annotations:
305+
pbuf_span.annotations.append(
306+
zipkin_pb2.Annotation(
307+
timestamp=annotation["timestamp"],
308+
value=annotation["value"],
309+
)
310+
)
311+
312+
if span.instrumentation_info is not None:
313+
pbuf_span.tags.update(
314+
{
315+
"otel.instrumentation_library.name": span.instrumentation_info.name,
316+
"otel.instrumentation_library.version": span.instrumentation_info.version,
317+
}
318+
)
319+
320+
if span.status is not None:
321+
pbuf_span.tags.update(
322+
{"otel.status_code": str(span.status.status_code.value)}
323+
)
324+
if span.status.description is not None:
325+
pbuf_span.tags.update(
326+
{"otel.status_description": span.status.description}
327+
)
328+
329+
if context.trace_flags.sampled:
330+
pbuf_span.debug = True
331+
332+
if isinstance(span.parent, Span):
333+
pbuf_span.parent_id = self.format_pbuf_span_id(
334+
span.parent.get_span_context().span_id
335+
)
336+
elif isinstance(span.parent, SpanContext):
337+
pbuf_span.parent_id = self.format_pbuf_span_id(
338+
span.parent.span_id
339+
)
340+
341+
pbuf_spans.spans.append(pbuf_span)
342+
343+
return pbuf_spans.SerializeToString()
344+
345+
@staticmethod
346+
def format_pbuf_span_id(span_id: int):
347+
return span_id.to_bytes(length=8, byteorder="big", signed=False)
215348

216349
def _extract_tags_from_dict(self, tags_dict):
217350
tags = {}
@@ -251,13 +384,13 @@ def _extract_annotations_from_events(self, events):
251384

252385
annotations.append(
253386
{
254-
"timestamp": _nsec_to_usec_round(event.timestamp),
387+
"timestamp": nsec_to_usec_round(event.timestamp),
255388
"value": json.dumps({event.name: attrs}),
256389
}
257390
)
258391
return annotations
259392

260393

261-
def _nsec_to_usec_round(nsec):
394+
def nsec_to_usec_round(nsec):
262395
"""Round nanoseconds to microseconds"""
263396
return (nsec + 500) // 10 ** 3

exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)