Skip to content

Commit 50119c5

Browse files
author
Paweł Guz
committed
Add queue overflow handler in asyncsender.
1 parent a37f313 commit 50119c5

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

fluent/asyncsender.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self,
5555
msgpack_kwargs=None,
5656
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
5757
queue_circular=DEFAULT_QUEUE_CIRCULAR,
58+
queue_overflow_handler=None,
5859
**kwargs):
5960
"""
6061
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
@@ -66,6 +67,7 @@ def __init__(self,
6667
**kwargs)
6768
self._queue_maxsize = queue_maxsize
6869
self._queue_circular = queue_circular
70+
self._queue_overflow_handler = queue_overflow_handler
6971

7072
self._thread_guard = threading.Event() # This ensures visibility across all variables
7173
self._closed = False
@@ -109,7 +111,8 @@ def _send(self, bytes_):
109111
if self._queue_circular and self._queue.full():
110112
# discard oldest
111113
try:
112-
self._queue.get(block=False)
114+
discarded_bytes = self._queue.get(block=False)
115+
self._call_queue_overflow_handler(discarded_bytes)
113116
except Empty: # pragma: no cover
114117
pass
115118
try:
@@ -132,5 +135,13 @@ def _send_loop(self):
132135
finally:
133136
self._close()
134137

138+
def _call_queue_overflow_handler(self, discarded_bytes):
139+
try:
140+
if self._queue_overflow_handler:
141+
self._queue_overflow_handler(discarded_bytes)
142+
except Exception as e:
143+
# User should care any exception in handler
144+
pass
145+
135146
def __exit__(self, exc_type, exc_val, exc_tb):
136147
self.close()

0 commit comments

Comments
 (0)