20
20
import typing
21
21
from enum import Enum
22
22
23
- from opentelemetry .context import attach , detach , get_current , set_value
24
- from opentelemetry .trace import DefaultSpan
23
+ from opentelemetry .context import attach , detach , set_value
25
24
from opentelemetry .util import time_ns
26
25
27
26
from .. import Span , SpanProcessor
@@ -91,15 +90,23 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
91
90
return True
92
91
93
92
93
+ class _FlushRequest :
94
+ """Represents a request for the BatchExportSpanProcessor to flush spans."""
95
+
96
+ __slots__ = ["event" , "num_spans" ]
97
+
98
+ def __init__ (self ):
99
+ self .event = threading .Event ()
100
+ self .num_spans = 0
101
+
102
+
94
103
class BatchExportSpanProcessor (SpanProcessor ):
95
104
"""Batch span processor implementation.
96
105
97
106
BatchExportSpanProcessor is an implementation of `SpanProcessor` that
98
107
batches ended spans and pushes them to the configured `SpanExporter`.
99
108
"""
100
109
101
- _FLUSH_TOKEN_SPAN = DefaultSpan (context = None )
102
-
103
110
def __init__ (
104
111
self ,
105
112
span_exporter : SpanExporter ,
@@ -129,9 +136,7 @@ def __init__(
129
136
) # type: typing.Deque[Span]
130
137
self .worker_thread = threading .Thread (target = self .worker , daemon = True )
131
138
self .condition = threading .Condition (threading .Lock ())
132
- self .flush_condition = threading .Condition (threading .Lock ())
133
- # flag to indicate that there is a flush operation on progress
134
- self ._flushing = False
139
+ self ._flush_request = None # type: typing.Optional[_FlushRequest]
135
140
self .schedule_delay_millis = schedule_delay_millis
136
141
self .max_export_batch_size = max_export_batch_size
137
142
self .max_queue_size = max_queue_size
@@ -164,60 +169,128 @@ def on_end(self, span: Span) -> None:
164
169
165
170
def worker (self ):
166
171
timeout = self .schedule_delay_millis / 1e3
172
+ flush_request = None # type: typing.Optional[_FlushRequest]
167
173
while not self .done :
168
- if (
169
- len (self .queue ) < self .max_export_batch_size
170
- and not self ._flushing
171
- ):
172
- with self .condition :
174
+ with self .condition :
175
+ if self .done :
176
+ # done flag may have changed, avoid waiting
177
+ break
178
+ flush_request = self ._get_and_unset_flush_request ()
179
+ if (
180
+ len (self .queue ) < self .max_export_batch_size
181
+ and flush_request is None
182
+ ):
183
+
173
184
self .condition .wait (timeout )
185
+ flush_request = self ._get_and_unset_flush_request ()
174
186
if not self .queue :
175
187
# spurious notification, let's wait again
188
+ self ._notify_flush_request_finished (flush_request )
189
+ flush_request = None
176
190
continue
177
191
if self .done :
178
192
# missing spans will be sent when calling flush
179
193
break
180
194
181
- # substract the duration of this export call to the next timeout
195
+ # subtract the duration of this export call to the next timeout
182
196
start = time_ns ()
183
- self .export ( )
197
+ self ._export ( flush_request )
184
198
end = time_ns ()
185
199
duration = (end - start ) / 1e9
186
200
timeout = self .schedule_delay_millis / 1e3 - duration
187
201
202
+ self ._notify_flush_request_finished (flush_request )
203
+ flush_request = None
204
+
205
+ # there might have been a new flush request while export was running
206
+ # and before the done flag switched to true
207
+ with self .condition :
208
+ shutdown_flush_request = self ._get_and_unset_flush_request ()
209
+
188
210
# be sure that all spans are sent
189
211
self ._drain_queue ()
212
+ self ._notify_flush_request_finished (flush_request )
213
+ self ._notify_flush_request_finished (shutdown_flush_request )
214
+
215
+ def _get_and_unset_flush_request (self ,) -> typing .Optional [_FlushRequest ]:
216
+ """Returns the current flush request and makes it invisible to the
217
+ worker thread for subsequent calls.
218
+ """
219
+ flush_request = self ._flush_request
220
+ self ._flush_request = None
221
+ if flush_request is not None :
222
+ flush_request .num_spans = len (self .queue )
223
+ return flush_request
224
+
225
+ @staticmethod
226
+ def _notify_flush_request_finished (
227
+ flush_request : typing .Optional [_FlushRequest ],
228
+ ):
229
+ """Notifies the flush initiator(s) waiting on the given request/event
230
+ that the flush operation was finished.
231
+ """
232
+ if flush_request is not None :
233
+ flush_request .event .set ()
234
+
235
+ def _get_or_create_flush_request (self ) -> _FlushRequest :
236
+ """Either returns the current active flush event or creates a new one.
190
237
191
- def export (self ) -> None :
192
- """Exports at most max_export_batch_size spans."""
238
+ The flush event will be visible and read by the worker thread before an
239
+ export operation starts. Callers of a flush operation may wait on the
240
+ returned event to be notified when the flush/export operation was
241
+ finished.
242
+
243
+ This method is not thread-safe, i.e. callers need to take care about
244
+ synchronization/locking.
245
+ """
246
+ if self ._flush_request is None :
247
+ self ._flush_request = _FlushRequest ()
248
+ return self ._flush_request
249
+
250
+ def _export (self , flush_request : typing .Optional [_FlushRequest ]):
251
+ """Exports spans considering the given flush_request.
252
+
253
+ In case of a given flush_requests spans are exported in batches until
254
+ the number of exported spans reached or exceeded the number of spans in
255
+ the flush request.
256
+ In no flush_request was given at most max_export_batch_size spans are
257
+ exported.
258
+ """
259
+ if not flush_request :
260
+ self ._export_batch ()
261
+ return
262
+
263
+ num_spans = flush_request .num_spans
264
+ while self .queue :
265
+ num_exported = self ._export_batch ()
266
+ num_spans -= num_exported
267
+
268
+ if num_spans <= 0 :
269
+ break
270
+
271
+ def _export_batch (self ) -> int :
272
+ """Exports at most max_export_batch_size spans and returns the number of
273
+ exported spans.
274
+ """
193
275
idx = 0
194
- notify_flush = False
195
276
# currently only a single thread acts as consumer, so queue.pop() will
196
277
# not raise an exception
197
278
while idx < self .max_export_batch_size and self .queue :
198
- span = self .queue .pop ()
199
- if span is self ._FLUSH_TOKEN_SPAN :
200
- notify_flush = True
201
- else :
202
- self .spans_list [idx ] = span
203
- idx += 1
279
+ self .spans_list [idx ] = self .queue .pop ()
280
+ idx += 1
204
281
token = attach (set_value ("suppress_instrumentation" , True ))
205
282
try :
206
283
# Ignore type b/c the Optional[None]+slicing is too "clever"
207
284
# for mypy
208
285
self .span_exporter .export (self .spans_list [:idx ]) # type: ignore
209
- # pylint: disable=broad-except
210
- except Exception :
286
+ except Exception : # pylint: disable=broad-except
211
287
logger .exception ("Exception while exporting Span batch." )
212
288
detach (token )
213
289
214
- if notify_flush :
215
- with self .flush_condition :
216
- self .flush_condition .notify ()
217
-
218
290
# clean up list
219
291
for index in range (idx ):
220
292
self .spans_list [index ] = None
293
+ return idx
221
294
222
295
def _drain_queue (self ):
223
296
""""Export all elements until queue is empty.
@@ -226,26 +299,20 @@ def _drain_queue(self):
226
299
`export` that is not thread safe.
227
300
"""
228
301
while self .queue :
229
- self .export ()
302
+ self ._export_batch ()
230
303
231
304
def force_flush (self , timeout_millis : int = 30000 ) -> bool :
232
305
if self .done :
233
306
logger .warning ("Already shutdown, ignoring call to force_flush()." )
234
307
return True
235
308
236
- self ._flushing = True
237
- self .queue .appendleft (self ._FLUSH_TOKEN_SPAN )
238
-
239
- # wake up worker thread
240
309
with self .condition :
310
+ flush_request = self ._get_or_create_flush_request ()
311
+ # signal the worker thread to flush and wait for it to finish
241
312
self .condition .notify_all ()
242
313
243
314
# wait for token to be processed
244
- with self .flush_condition :
245
- ret = self .flush_condition .wait (timeout_millis / 1e3 )
246
-
247
- self ._flushing = False
248
-
315
+ ret = flush_request .event .wait (timeout_millis / 1e3 )
249
316
if not ret :
250
317
logger .warning ("Timeout was exceeded in force_flush()." )
251
318
return ret
0 commit comments