Skip to content

Commit a42b063

Browse files
mauriciovasquezbernalc24t
authored andcommitted
SDK: shut down span processors automatically (open-telemetry#280)
The BatchExportSpanProcessor is an asynchronous span processor that uses a worker thread to call the different exporters. Before this commit applications had to shut down the span processor explicitely to guarantee that all the spans were summited to the exporters, this was not very intuitive for the users. This commit removes that limitation by implementing the tracer's __del__ method and an atexit hook. According to __del__'s documentation [1] it is possible that sometimes it's not called, for that reason the atexit hook is also used to guarantee that the processor is shut down in all the cases. [1] https://docs.python.org/3/reference/datamodel.html#object.__del__
1 parent 5f311e0 commit a42b063

File tree

8 files changed

+96
-12
lines changed

8 files changed

+96
-12
lines changed

examples/basic_tracer/tracer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,3 @@
4848
with tracer.start_as_current_span("bar"):
4949
with tracer.start_as_current_span("baz"):
5050
print(Context)
51-
52-
span_processor.shutdown()

examples/http/server.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,3 @@ def hello():
6565

6666
if __name__ == "__main__":
6767
app.run(debug=True)
68-
span_processor.shutdown()

examples/http/tracer_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,3 @@
5151
# Spans and propagating context as appropriate.
5252
http_requests.enable(tracer)
5353
response = requests.get(url="http://127.0.0.1:5000/")
54-
span_processor.shutdown()

ext/opentelemetry-ext-jaeger/README.rst

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,6 @@ gRPC is still not supported by this implementation.
6161
with tracer.start_as_current_span('foo'):
6262
print('Hello world!')
6363
64-
# shutdown the span processor
65-
# TODO: this has to be improved so user doesn't need to call it manually
66-
span_processor.shutdown()
67-
6864
The `examples <./examples>`_ folder contains more elaborated examples.
6965

7066
References

ext/opentelemetry-ext-jaeger/examples/jaeger_exporter_example.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,3 @@
4646
time.sleep(0.2)
4747

4848
time.sleep(0.1)
49-
50-
# shutdown the span processor
51-
# TODO: this has to be improved so user doesn't need to call it manually
52-
span_processor.shutdown()

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515

16+
import atexit
1617
import logging
1718
import random
1819
import threading
@@ -296,19 +297,25 @@ class Tracer(trace_api.Tracer):
296297
297298
Args:
298299
name: The name of the tracer.
300+
shutdown_on_exit: Register an atexit hook to shut down the tracer when
301+
the application exits.
299302
"""
300303

301304
def __init__(
302305
self,
303306
name: str = "",
304307
sampler: sampling.Sampler = trace_api.sampling.ALWAYS_ON,
308+
shutdown_on_exit: bool = True,
305309
) -> None:
306310
slot_name = "current_span"
307311
if name:
308312
slot_name = "{}.current_span".format(name)
309313
self._current_span_slot = Context.register_slot(slot_name)
310314
self._active_span_processor = MultiSpanProcessor()
311315
self.sampler = sampler
316+
self._atexit_handler = None
317+
if shutdown_on_exit:
318+
self._atexit_handler = atexit.register(self.shutdown)
312319

313320
def get_current_span(self):
314321
"""See `opentelemetry.trace.Tracer.get_current_span`."""
@@ -426,5 +433,12 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
426433
# thread safe
427434
self._active_span_processor.add_span_processor(span_processor)
428435

436+
def shutdown(self):
437+
"""Shut down the span processors added to the tracer."""
438+
self._active_span_processor.shutdown()
439+
if self._atexit_handler is not None:
440+
atexit.unregister(self._atexit_handler)
441+
self._atexit_handler = None
442+
429443

430444
tracer = Tracer()

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class MySpanExporter(export.SpanExporter):
2727
def __init__(self, destination, max_export_batch_size=None):
2828
self.destination = destination
2929
self.max_export_batch_size = max_export_batch_size
30+
self.is_shutdown = False
3031

3132
def export(self, spans: trace.Span) -> export.SpanExportResult:
3233
if (
@@ -37,6 +38,9 @@ def export(self, spans: trace.Span) -> export.SpanExportResult:
3738
self.destination.extend(span.name for span in spans)
3839
return export.SpanExportResult.SUCCESS
3940

41+
def shutdown(self):
42+
self.is_shutdown = True
43+
4044

4145
class TestSimpleExportSpanProcessor(unittest.TestCase):
4246
def test_simple_span_processor(self):
@@ -55,6 +59,9 @@ def test_simple_span_processor(self):
5559

5660
self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)
5761

62+
span_processor.shutdown()
63+
self.assertTrue(my_exporter.is_shutdown)
64+
5865
def test_simple_span_processor_no_context(self):
5966
"""Check that we process spans that are never made active.
6067
@@ -102,6 +109,8 @@ def test_batch_span_processor(self):
102109
span_processor.shutdown()
103110
self.assertListEqual(span_names, spans_names_list)
104111

112+
self.assertTrue(my_exporter.is_shutdown)
113+
105114
def test_batch_span_processor_lossless(self):
106115
"""Test that no spans are lost when sending max_queue_size spans"""
107116
spans_names_list = []

opentelemetry-sdk/tests/trace/test_trace.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import shutil
16+
import subprocess
1517
import unittest
1618
from unittest import mock
1719

@@ -26,6 +28,77 @@ def test_extends_api(self):
2628
tracer = trace.Tracer()
2729
self.assertIsInstance(tracer, trace_api.Tracer)
2830

31+
def test_shutdown(self):
32+
tracer = trace.Tracer()
33+
34+
mock_processor1 = mock.Mock(spec=trace.SpanProcessor)
35+
tracer.add_span_processor(mock_processor1)
36+
37+
mock_processor2 = mock.Mock(spec=trace.SpanProcessor)
38+
tracer.add_span_processor(mock_processor2)
39+
40+
tracer.shutdown()
41+
42+
self.assertEqual(mock_processor1.shutdown.call_count, 1)
43+
self.assertEqual(mock_processor2.shutdown.call_count, 1)
44+
45+
shutdown_python_code = """
46+
import atexit
47+
from unittest import mock
48+
49+
from opentelemetry.sdk import trace
50+
51+
mock_processor = mock.Mock(spec=trace.SpanProcessor)
52+
53+
def print_shutdown_count():
54+
print(mock_processor.shutdown.call_count)
55+
56+
# atexit hooks are called in inverse order they are added, so do this before
57+
# creating the tracer
58+
atexit.register(print_shutdown_count)
59+
60+
tracer = trace.Tracer({tracer_parameters})
61+
tracer.add_span_processor(mock_processor)
62+
63+
{tracer_shutdown}
64+
"""
65+
66+
def run_general_code(shutdown_on_exit, explicit_shutdown):
67+
tracer_parameters = ""
68+
tracer_shutdown = ""
69+
70+
if not shutdown_on_exit:
71+
tracer_parameters = "shutdown_on_exit=False"
72+
73+
if explicit_shutdown:
74+
tracer_shutdown = "tracer.shutdown()"
75+
76+
return subprocess.check_output(
77+
[
78+
# use shutil to avoid calling python outside the
79+
# virtualenv on windows.
80+
shutil.which("python"),
81+
"-c",
82+
shutdown_python_code.format(
83+
tracer_parameters=tracer_parameters,
84+
tracer_shutdown=tracer_shutdown,
85+
),
86+
]
87+
)
88+
89+
# test default shutdown_on_exit (True)
90+
out = run_general_code(True, False)
91+
self.assertTrue(out.startswith(b"1"))
92+
93+
# test that shutdown is called only once even if Tracer.shutdown is
94+
# called explicitely
95+
out = run_general_code(True, True)
96+
self.assertTrue(out.startswith(b"1"))
97+
98+
# test shutdown_on_exit=False
99+
out = run_general_code(False, False)
100+
self.assertTrue(out.startswith(b"0"))
101+
29102

30103
class TestTracerSampling(unittest.TestCase):
31104
def test_default_sampler(self):

0 commit comments

Comments
 (0)