Skip to content

Commit ad7a809

Browse files
sdk: fix force_flush in batch span processor (open-telemetry#397)
open-telemetry#389 implemented force_flush() for the span processor. For BatchSpanProcessor it was implemented by exposing an already existing _flush() method, it created a race condition because the _flush() method was intended to be called only from the context of the worker thread, this because it uses the export() method that is not thread safe. The result after that PR is that some tests were failing randomly because export() was being executed in two different threads, the worker thread and the user thread calling force_flush(). This commit fixes it by implementing a more sophisticated flush mechanism. When a flush is requested, a special span token is inserted in the spans queue, a flag indicating a flush operation is on progress is set and the worker thread is waken up, after it a condition variable is monitored waiting for the worker thread to indicate that the token has been processed. The worker thread has a new logic to avoid sleeping (waiting on the condition variable) when there is a flush operation going on, it also notifies the caller (using another condition variable) when the token has been processed.
1 parent f42bc5b commit ad7a809

File tree

3 files changed

+91
-17
lines changed

3 files changed

+91
-17
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,16 @@ def shutdown(self) -> None:
7070
"""Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown.
7171
"""
7272

73-
def force_flush(self) -> None:
74-
"""Export all ended spans to the configured Exporter that have not
75-
yet been exported.
73+
def force_flush(self, timeout_millis: int = 30000) -> bool:
74+
"""Export all ended spans to the configured Exporter that have not yet
75+
been exported.
76+
77+
Args:
78+
timeout_millis: The maximum amount of time to wait for spans to be
79+
exported.
80+
81+
Returns:
82+
False if the timeout is exceeded, True otherwise.
7683
"""
7784

7885

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from enum import Enum
2020

2121
from opentelemetry.context import Context
22+
from opentelemetry.trace import DefaultSpan
2223
from opentelemetry.util import time_ns
2324

2425
from .. import Span, SpanProcessor
@@ -83,8 +84,9 @@ def on_end(self, span: Span) -> None:
8384
def shutdown(self) -> None:
8485
self.span_exporter.shutdown()
8586

86-
def force_flush(self) -> None:
87-
pass
87+
def force_flush(self, timeout_millis: int = 30000) -> bool:
88+
# pylint: disable=unused-argument
89+
return True
8890

8991

9092
class BatchExportSpanProcessor(SpanProcessor):
@@ -94,6 +96,8 @@ class BatchExportSpanProcessor(SpanProcessor):
9496
batches ended spans and pushes them to the configured `SpanExporter`.
9597
"""
9698

99+
_FLUSH_TOKEN_SPAN = DefaultSpan(context=None)
100+
97101
def __init__(
98102
self,
99103
span_exporter: SpanExporter,
@@ -123,6 +127,9 @@ def __init__(
123127
) # type: typing.Deque[Span]
124128
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
125129
self.condition = threading.Condition(threading.Lock())
130+
self.flush_condition = threading.Condition(threading.Lock())
131+
# flag to indicate that there is a flush operation on progress
132+
self._flushing = False
126133
self.schedule_delay_millis = schedule_delay_millis
127134
self.max_export_batch_size = max_export_batch_size
128135
self.max_queue_size = max_queue_size
@@ -156,7 +163,10 @@ def on_end(self, span: Span) -> None:
156163
def worker(self):
157164
timeout = self.schedule_delay_millis / 1e3
158165
while not self.done:
159-
if len(self.queue) < self.max_export_batch_size:
166+
if (
167+
len(self.queue) < self.max_export_batch_size
168+
and not self._flushing
169+
):
160170
with self.condition:
161171
self.condition.wait(timeout)
162172
if not self.queue:
@@ -174,17 +184,21 @@ def worker(self):
174184
timeout = self.schedule_delay_millis / 1e3 - duration
175185

176186
# be sure that all spans are sent
177-
self.force_flush()
187+
self._drain_queue()
178188

179189
def export(self) -> None:
180190
"""Exports at most max_export_batch_size spans."""
181191
idx = 0
182-
192+
notify_flush = False
183193
# currently only a single thread acts as consumer, so queue.pop() will
184194
# not raise an exception
185195
while idx < self.max_export_batch_size and self.queue:
186-
self.spans_list[idx] = self.queue.pop()
187-
idx += 1
196+
span = self.queue.pop()
197+
if span is self._FLUSH_TOKEN_SPAN:
198+
notify_flush = True
199+
else:
200+
self.spans_list[idx] = span
201+
idx += 1
188202
with Context.use(suppress_instrumentation=True):
189203
try:
190204
# Ignore type b/c the Optional[None]+slicing is too "clever"
@@ -196,15 +210,45 @@ def export(self) -> None:
196210
except Exception:
197211
logger.exception("Exception while exporting Span batch.")
198212

213+
if notify_flush:
214+
with self.flush_condition:
215+
self.flush_condition.notify()
216+
199217
# clean up list
200218
for index in range(idx):
201219
self.spans_list[index] = None
202220

203-
def force_flush(self):
204-
# export all elements until queue is empty
221+
def _drain_queue(self):
222+
""""Export all elements until queue is empty.
223+
224+
Can only be called from the worker thread context because it invokes
225+
`export` that is not thread safe.
226+
"""
205227
while self.queue:
206228
self.export()
207229

230+
def force_flush(self, timeout_millis: int = 30000) -> bool:
231+
if self.done:
232+
logger.warning("Already shutdown, ignoring call to force_flush().")
233+
return True
234+
235+
self._flushing = True
236+
self.queue.appendleft(self._FLUSH_TOKEN_SPAN)
237+
238+
# wake up worker thread
239+
with self.condition:
240+
self.condition.notify_all()
241+
242+
# wait for token to be processed
243+
with self.flush_condition:
244+
ret = self.flush_condition.wait(timeout_millis / 1e3)
245+
246+
self._flushing = False
247+
248+
if not ret:
249+
logger.warning("Timeout was exceeded in force_flush().")
250+
return ret
251+
208252
def shutdown(self) -> None:
209253
# signal the worker thread to finish and then wait for it
210254
self.done = True

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import time
1616
import unittest
17+
from logging import WARNING
1718
from unittest import mock
1819

1920
from opentelemetry import trace as trace_api
@@ -24,17 +25,24 @@
2425
class MySpanExporter(export.SpanExporter):
2526
"""Very simple span exporter used for testing."""
2627

27-
def __init__(self, destination, max_export_batch_size=None):
28+
def __init__(
29+
self,
30+
destination,
31+
max_export_batch_size=None,
32+
export_timeout_millis=0.0,
33+
):
2834
self.destination = destination
2935
self.max_export_batch_size = max_export_batch_size
3036
self.is_shutdown = False
37+
self.export_timeout = export_timeout_millis / 1e3
3138

3239
def export(self, spans: trace.Span) -> export.SpanExportResult:
3340
if (
3441
self.max_export_batch_size is not None
3542
and len(spans) > self.max_export_batch_size
3643
):
3744
raise ValueError("Batch is too big")
45+
time.sleep(self.export_timeout)
3846
self.destination.extend(span.name for span in spans)
3947
return export.SpanExportResult.SUCCESS
4048

@@ -127,18 +135,33 @@ def test_flush(self):
127135
for name in span_names0:
128136
_create_start_and_end_span(name, span_processor)
129137

130-
span_processor.force_flush()
138+
self.assertTrue(span_processor.force_flush())
131139
self.assertListEqual(span_names0, spans_names_list)
132140

133141
# create some more spans to check that span processor still works
134142
for name in span_names1:
135143
_create_start_and_end_span(name, span_processor)
136144

137-
span_processor.force_flush()
145+
self.assertTrue(span_processor.force_flush())
138146
self.assertListEqual(span_names0 + span_names1, spans_names_list)
139147

140148
span_processor.shutdown()
141149

150+
def test_flush_timeout(self):
151+
spans_names_list = []
152+
153+
my_exporter = MySpanExporter(
154+
destination=spans_names_list, export_timeout_millis=500
155+
)
156+
span_processor = export.BatchExportSpanProcessor(my_exporter)
157+
158+
_create_start_and_end_span("foo", span_processor)
159+
160+
# check that the timeout is not meet
161+
with self.assertLogs(level=WARNING):
162+
self.assertFalse(span_processor.force_flush(100))
163+
span_processor.shutdown()
164+
142165
def test_batch_span_processor_lossless(self):
143166
"""Test that no spans are lost when sending max_queue_size spans"""
144167
spans_names_list = []
@@ -153,7 +176,7 @@ def test_batch_span_processor_lossless(self):
153176
for _ in range(512):
154177
_create_start_and_end_span("foo", span_processor)
155178

156-
span_processor.force_flush()
179+
self.assertTrue(span_processor.force_flush())
157180
self.assertEqual(len(spans_names_list), 512)
158181
span_processor.shutdown()
159182

@@ -177,7 +200,7 @@ def test_batch_span_processor_many_spans(self):
177200

178201
time.sleep(0.05) # give some time for the exporter to upload spans
179202

180-
span_processor.force_flush()
203+
self.assertTrue(span_processor.force_flush())
181204
self.assertEqual(len(spans_names_list), 1024)
182205
span_processor.shutdown()
183206

0 commit comments

Comments
 (0)