Skip to content

Commit 97b602f

Browse files
authored
Fix BatchExportSpanProcessor not resetting timeout on worker loop (open-telemetry#1218)
1 parent 82356a8 commit 97b602f

File tree

4 files changed

+77
-2
lines changed

4 files changed

+77
-2
lines changed

exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/spanprocessor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ def worker(self):
119119
with self.condition:
120120
self.condition.wait(timeout)
121121
if not self.check_traces_queue:
122-
# spurious notification, let's wait again
122+
# spurious notification, let's wait again, reset timeout
123+
timeout = self.schedule_delay_millis / 1e3
123124
continue
124125
if self.done:
125126
# missing spans will be sent when calling flush

exporter/opentelemetry-exporter-datadog/tests/test_datadog_exporter.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,42 @@ def test_span_processor_scheduled_delay(self):
483483

484484
tracer_provider.shutdown()
485485

486+
def test_batch_span_processor_reset_timeout(self):
487+
"""Test that the scheduled timeout is reset on cycles without spans"""
488+
delay = 50
489+
# pylint: disable=protected-access
490+
exporter = MockDatadogSpanExporter()
491+
exporter._agent_writer.write.side_effect = lambda spans: time.sleep(
492+
0.05
493+
)
494+
span_processor = datadog.DatadogExportSpanProcessor(
495+
exporter, schedule_delay_millis=delay
496+
)
497+
tracer_provider = trace.TracerProvider()
498+
tracer_provider.add_span_processor(span_processor)
499+
tracer = tracer_provider.get_tracer(__name__)
500+
with mock.patch.object(span_processor.condition, "wait") as mock_wait:
501+
with tracer.start_span("foo"):
502+
pass
503+
504+
# give some time for exporter to loop
505+
# since wait is mocked it should return immediately
506+
time.sleep(0.1)
507+
mock_wait_calls = list(mock_wait.mock_calls)
508+
509+
# find the index of the call that processed the singular span
510+
for idx, wait_call in enumerate(mock_wait_calls):
511+
_, args, __ = wait_call
512+
if args[0] <= 0:
513+
after_calls = mock_wait_calls[idx + 1 :]
514+
break
515+
516+
self.assertTrue(
517+
all(args[0] >= 0.05 for _, args, __ in after_calls)
518+
)
519+
520+
span_processor.shutdown()
521+
486522
def test_span_processor_accepts_parent_context(self):
487523
span_processor = mock.Mock(
488524
wraps=datadog.DatadogExportSpanProcessor(self.exporter)

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ def worker(self):
213213
self.condition.wait(timeout)
214214
flush_request = self._get_and_unset_flush_request()
215215
if not self.queue:
216-
# spurious notification, let's wait again
216+
# spurious notification, let's wait again, reset timeout
217+
timeout = self.schedule_delay_millis / 1e3
217218
self._notify_flush_request_finished(flush_request)
218219
flush_request = None
219220
continue

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,43 @@ def test_batch_span_processor_scheduled_delay(self):
381381

382382
span_processor.shutdown()
383383

384+
def test_batch_span_processor_reset_timeout(self):
385+
"""Test that the scheduled timeout is reset on cycles without spans"""
386+
spans_names_list = []
387+
388+
export_event = threading.Event()
389+
my_exporter = MySpanExporter(
390+
destination=spans_names_list,
391+
export_event=export_event,
392+
export_timeout_millis=50,
393+
)
394+
395+
span_processor = export.BatchExportSpanProcessor(
396+
my_exporter, schedule_delay_millis=50,
397+
)
398+
399+
with mock.patch.object(span_processor.condition, "wait") as mock_wait:
400+
_create_start_and_end_span("foo", span_processor)
401+
self.assertTrue(export_event.wait(2))
402+
403+
# give some time for exporter to loop
404+
# since wait is mocked it should return immediately
405+
time.sleep(0.05)
406+
mock_wait_calls = list(mock_wait.mock_calls)
407+
408+
# find the index of the call that processed the singular span
409+
for idx, wait_call in enumerate(mock_wait_calls):
410+
_, args, __ = wait_call
411+
if args[0] <= 0:
412+
after_calls = mock_wait_calls[idx + 1 :]
413+
break
414+
415+
self.assertTrue(
416+
all(args[0] >= 0.05 for _, args, __ in after_calls)
417+
)
418+
419+
span_processor.shutdown()
420+
384421
def test_batch_span_processor_parameters(self):
385422
# zero max_queue_size
386423
self.assertRaises(

0 commit comments

Comments
 (0)