Skip to content

Commit f6e0705

Browse files
authored
Improve BatchExportSpanProcessor (open-telemetry#1062)
* it was possible for force flush calls to miss the flush finished notifications by the worker thread. in case a flush token got added in the main thread and the worker thread processed and notified the flush condition before the main thread called wait on the flush condition, the wakup is missed and the main thread has to wait the full flush timeout * calls to force flush were not really thread safe since the state if a flush operation is in progress was indictated by a single boolean flag which gets reset when the first force flush call finishes. * instead of having a boolean flag to indicate a flush request use an Event. When a call to force flush is made it is looked up if a flush request event is currently pending or a new one is created. The worker thread will check if a flush request event exists, unset it and use a local reference for signaling once the export operation finished. Force flush calls will wait in the meantime on the flush request event until they are signaled by the worker thread. This also makes calls to force flush thread safe since multiple threads can/might wait on one event.
1 parent 63685b1 commit f6e0705

File tree

3 files changed

+159
-43
lines changed

3 files changed

+159
-43
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
([#1034](https://github.com/open-telemetry/opentelemetry-python/pull/1034))
99
- Remove lazy Event and Link API from Span interface
1010
([#1045](https://github.com/open-telemetry/opentelemetry-python/pull/1045))
11+
- Improve BatchExportSpanProcessor
12+
([#1062](https://github.com/open-telemetry/opentelemetry-python/pull/1062))
1113
- Populate resource attributes as per semantic conventions
1214
([#1053](https://github.com/open-telemetry/opentelemetry-python/pull/1053))
1315

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 106 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
import typing
2121
from enum import Enum
2222

23-
from opentelemetry.context import attach, detach, get_current, set_value
24-
from opentelemetry.trace import DefaultSpan
23+
from opentelemetry.context import attach, detach, set_value
2524
from opentelemetry.util import time_ns
2625

2726
from .. import Span, SpanProcessor
@@ -91,15 +90,23 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
9190
return True
9291

9392

93+
class _FlushRequest:
94+
"""Represents a request for the BatchExportSpanProcessor to flush spans."""
95+
96+
__slots__ = ["event", "num_spans"]
97+
98+
def __init__(self):
99+
self.event = threading.Event()
100+
self.num_spans = 0
101+
102+
94103
class BatchExportSpanProcessor(SpanProcessor):
95104
"""Batch span processor implementation.
96105
97106
BatchExportSpanProcessor is an implementation of `SpanProcessor` that
98107
batches ended spans and pushes them to the configured `SpanExporter`.
99108
"""
100109

101-
_FLUSH_TOKEN_SPAN = DefaultSpan(context=None)
102-
103110
def __init__(
104111
self,
105112
span_exporter: SpanExporter,
@@ -129,9 +136,7 @@ def __init__(
129136
) # type: typing.Deque[Span]
130137
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
131138
self.condition = threading.Condition(threading.Lock())
132-
self.flush_condition = threading.Condition(threading.Lock())
133-
# flag to indicate that there is a flush operation on progress
134-
self._flushing = False
139+
self._flush_request = None # type: typing.Optional[_FlushRequest]
135140
self.schedule_delay_millis = schedule_delay_millis
136141
self.max_export_batch_size = max_export_batch_size
137142
self.max_queue_size = max_queue_size
@@ -164,60 +169,128 @@ def on_end(self, span: Span) -> None:
164169

165170
def worker(self):
166171
timeout = self.schedule_delay_millis / 1e3
172+
flush_request = None # type: typing.Optional[_FlushRequest]
167173
while not self.done:
168-
if (
169-
len(self.queue) < self.max_export_batch_size
170-
and not self._flushing
171-
):
172-
with self.condition:
174+
with self.condition:
175+
if self.done:
176+
# done flag may have changed, avoid waiting
177+
break
178+
flush_request = self._get_and_unset_flush_request()
179+
if (
180+
len(self.queue) < self.max_export_batch_size
181+
and flush_request is None
182+
):
183+
173184
self.condition.wait(timeout)
185+
flush_request = self._get_and_unset_flush_request()
174186
if not self.queue:
175187
# spurious notification, let's wait again
188+
self._notify_flush_request_finished(flush_request)
189+
flush_request = None
176190
continue
177191
if self.done:
178192
# missing spans will be sent when calling flush
179193
break
180194

181-
# substract the duration of this export call to the next timeout
195+
# subtract the duration of this export call to the next timeout
182196
start = time_ns()
183-
self.export()
197+
self._export(flush_request)
184198
end = time_ns()
185199
duration = (end - start) / 1e9
186200
timeout = self.schedule_delay_millis / 1e3 - duration
187201

202+
self._notify_flush_request_finished(flush_request)
203+
flush_request = None
204+
205+
# there might have been a new flush request while export was running
206+
# and before the done flag switched to true
207+
with self.condition:
208+
shutdown_flush_request = self._get_and_unset_flush_request()
209+
188210
# be sure that all spans are sent
189211
self._drain_queue()
212+
self._notify_flush_request_finished(flush_request)
213+
self._notify_flush_request_finished(shutdown_flush_request)
214+
215+
def _get_and_unset_flush_request(self,) -> typing.Optional[_FlushRequest]:
216+
"""Returns the current flush request and makes it invisible to the
217+
worker thread for subsequent calls.
218+
"""
219+
flush_request = self._flush_request
220+
self._flush_request = None
221+
if flush_request is not None:
222+
flush_request.num_spans = len(self.queue)
223+
return flush_request
224+
225+
@staticmethod
226+
def _notify_flush_request_finished(
227+
flush_request: typing.Optional[_FlushRequest],
228+
):
229+
"""Notifies the flush initiator(s) waiting on the given request/event
230+
that the flush operation was finished.
231+
"""
232+
if flush_request is not None:
233+
flush_request.event.set()
234+
235+
def _get_or_create_flush_request(self) -> _FlushRequest:
236+
"""Either returns the current active flush event or creates a new one.
190237
191-
def export(self) -> None:
192-
"""Exports at most max_export_batch_size spans."""
238+
The flush event will be visible and read by the worker thread before an
239+
export operation starts. Callers of a flush operation may wait on the
240+
returned event to be notified when the flush/export operation was
241+
finished.
242+
243+
This method is not thread-safe, i.e. callers need to take care about
244+
synchronization/locking.
245+
"""
246+
if self._flush_request is None:
247+
self._flush_request = _FlushRequest()
248+
return self._flush_request
249+
250+
def _export(self, flush_request: typing.Optional[_FlushRequest]):
251+
"""Exports spans considering the given flush_request.
252+
253+
In case of a given flush_requests spans are exported in batches until
254+
the number of exported spans reached or exceeded the number of spans in
255+
the flush request.
256+
In no flush_request was given at most max_export_batch_size spans are
257+
exported.
258+
"""
259+
if not flush_request:
260+
self._export_batch()
261+
return
262+
263+
num_spans = flush_request.num_spans
264+
while self.queue:
265+
num_exported = self._export_batch()
266+
num_spans -= num_exported
267+
268+
if num_spans <= 0:
269+
break
270+
271+
def _export_batch(self) -> int:
272+
"""Exports at most max_export_batch_size spans and returns the number of
273+
exported spans.
274+
"""
193275
idx = 0
194-
notify_flush = False
195276
# currently only a single thread acts as consumer, so queue.pop() will
196277
# not raise an exception
197278
while idx < self.max_export_batch_size and self.queue:
198-
span = self.queue.pop()
199-
if span is self._FLUSH_TOKEN_SPAN:
200-
notify_flush = True
201-
else:
202-
self.spans_list[idx] = span
203-
idx += 1
279+
self.spans_list[idx] = self.queue.pop()
280+
idx += 1
204281
token = attach(set_value("suppress_instrumentation", True))
205282
try:
206283
# Ignore type b/c the Optional[None]+slicing is too "clever"
207284
# for mypy
208285
self.span_exporter.export(self.spans_list[:idx]) # type: ignore
209-
# pylint: disable=broad-except
210-
except Exception:
286+
except Exception: # pylint: disable=broad-except
211287
logger.exception("Exception while exporting Span batch.")
212288
detach(token)
213289

214-
if notify_flush:
215-
with self.flush_condition:
216-
self.flush_condition.notify()
217-
218290
# clean up list
219291
for index in range(idx):
220292
self.spans_list[index] = None
293+
return idx
221294

222295
def _drain_queue(self):
223296
""""Export all elements until queue is empty.
@@ -226,26 +299,20 @@ def _drain_queue(self):
226299
`export` that is not thread safe.
227300
"""
228301
while self.queue:
229-
self.export()
302+
self._export_batch()
230303

231304
def force_flush(self, timeout_millis: int = 30000) -> bool:
232305
if self.done:
233306
logger.warning("Already shutdown, ignoring call to force_flush().")
234307
return True
235308

236-
self._flushing = True
237-
self.queue.appendleft(self._FLUSH_TOKEN_SPAN)
238-
239-
# wake up worker thread
240309
with self.condition:
310+
flush_request = self._get_or_create_flush_request()
311+
# signal the worker thread to flush and wait for it to finish
241312
self.condition.notify_all()
242313

243314
# wait for token to be processed
244-
with self.flush_condition:
245-
ret = self.flush_condition.wait(timeout_millis / 1e3)
246-
247-
self._flushing = False
248-
315+
ret = flush_request.event.wait(timeout_millis / 1e3)
249316
if not ret:
250317
logger.warning("Timeout was exceeded in force_flush().")
251318
return ret

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
# limitations under the License.
1414

1515
import os
16+
import threading
1617
import time
1718
import unittest
19+
from concurrent.futures import ThreadPoolExecutor
1820
from logging import WARNING
1921
from unittest import mock
2022

@@ -31,11 +33,13 @@ def __init__(
3133
destination,
3234
max_export_batch_size=None,
3335
export_timeout_millis=0.0,
36+
export_event: threading.Event = None,
3437
):
3538
self.destination = destination
3639
self.max_export_batch_size = max_export_batch_size
3740
self.is_shutdown = False
3841
self.export_timeout = export_timeout_millis / 1e3
42+
self.export_event = export_event
3943

4044
def export(self, spans: trace.Span) -> export.SpanExportResult:
4145
if (
@@ -45,6 +49,8 @@ def export(self, spans: trace.Span) -> export.SpanExportResult:
4549
raise ValueError("Batch is too big")
4650
time.sleep(self.export_timeout)
4751
self.destination.extend(span.name for span in spans)
52+
if self.export_event:
53+
self.export_event.set()
4854
return export.SpanExportResult.SUCCESS
4955

5056
def shutdown(self):
@@ -148,6 +154,42 @@ def test_flush(self):
148154

149155
span_processor.shutdown()
150156

157+
def test_flush_empty(self):
158+
spans_names_list = []
159+
160+
my_exporter = MySpanExporter(destination=spans_names_list)
161+
span_processor = export.BatchExportSpanProcessor(my_exporter)
162+
163+
self.assertTrue(span_processor.force_flush())
164+
165+
def test_flush_from_multiple_threads(self):
166+
num_threads = 50
167+
num_spans = 10
168+
169+
span_list = []
170+
171+
my_exporter = MySpanExporter(destination=span_list)
172+
span_processor = export.BatchExportSpanProcessor(
173+
my_exporter, max_queue_size=512, max_export_batch_size=128
174+
)
175+
176+
def create_spans_and_flush(tno: int):
177+
for span_idx in range(num_spans):
178+
_create_start_and_end_span(
179+
"Span {}-{}".format(tno, span_idx), span_processor
180+
)
181+
self.assertTrue(span_processor.force_flush())
182+
183+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
184+
future_list = []
185+
for thread_no in range(num_threads):
186+
future = executor.submit(create_spans_and_flush, thread_no)
187+
future_list.append(future)
188+
189+
executor.shutdown()
190+
191+
self.assertEqual(num_threads * num_spans, len(span_list))
192+
151193
def test_flush_timeout(self):
152194
spans_names_list = []
153195

@@ -209,17 +251,22 @@ def test_batch_span_processor_scheduled_delay(self):
209251
"""Test that spans are exported each schedule_delay_millis"""
210252
spans_names_list = []
211253

212-
my_exporter = MySpanExporter(destination=spans_names_list)
254+
export_event = threading.Event()
255+
my_exporter = MySpanExporter(
256+
destination=spans_names_list, export_event=export_event
257+
)
213258
span_processor = export.BatchExportSpanProcessor(
214-
my_exporter, schedule_delay_millis=50
259+
my_exporter, schedule_delay_millis=50,
215260
)
216261

217262
# create single span
263+
start_time = time.time()
218264
_create_start_and_end_span("foo", span_processor)
219265

220-
time.sleep(0.05 + 0.02)
221-
# span should be already exported
266+
self.assertTrue(export_event.wait(2))
267+
export_time = time.time()
222268
self.assertEqual(len(spans_names_list), 1)
269+
self.assertGreaterEqual((export_time - start_time) * 1e3, 50)
223270

224271
span_processor.shutdown()
225272

0 commit comments

Comments
 (0)