Skip to content

Commit 0d10c47

Browse files
authored
Merge pull request fluent#95 from panta/feature/asynchronous-communication
Asynchronous threaded support for senders and handlers.
2 parents 66603f2 + 25ecbda commit 0d10c47

File tree

6 files changed

+775
-6
lines changed

6 files changed

+775
-6
lines changed

README.rst

+66
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,72 @@ A sample configuration ``logging.yaml`` would be:
282282
level: DEBUG
283283
propagate: False
284284
285+
Asynchronous Communication
286+
~~~~~~~~~~~~~~~~~~~~~~~~~~
287+
288+
Besides the regular interfaces - the event-based one provided by ``sender.FluentSender`` and the python logging one
289+
provided by ``handler.FluentHandler`` - there are also corresponding asynchronous versions in ``asyncsender`` and
290+
``asynchandler`` respectively. These versions use a separate thread to handle the communication with the remote fluentd
291+
server. In this way the client of the library won't be blocked during the logging of the events, and won't risk going
292+
into timeout if the fluentd server becomes unreachable. Also it won't be slowed down by the network overhead.
293+
294+
The interfaces in ``asyncsender`` and ``asynchandler`` are exactly the same as those in ``sender`` and ``handler``, so it's
295+
just a matter of importing from a different module.
296+
297+
For instance, for the event-based interface:
298+
299+
.. code:: python
300+
301+
from fluent import asyncsender as sender
302+
303+
# for local fluent
304+
sender.setup('app')
305+
306+
# for remote fluent
307+
sender.setup('app', host='host', port=24224)
308+
309+
# do your work
310+
...
311+
312+
# IMPORTANT: before program termination, close the sender
313+
sender.close()
314+
315+
or for the python logging interface:
316+
317+
.. code:: python
318+
319+
import logging
320+
from fluent import asynchandler as handler
321+
322+
custom_format = {
323+
'host': '%(hostname)s',
324+
'where': '%(module)s.%(funcName)s',
325+
'type': '%(levelname)s',
326+
'stack_trace': '%(exc_text)s'
327+
}
328+
329+
logging.basicConfig(level=logging.INFO)
330+
l = logging.getLogger('fluent.test')
331+
h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
332+
formatter = handler.FluentRecordFormatter(custom_format)
333+
h.setFormatter(formatter)
334+
l.addHandler(h)
335+
l.info({
336+
'from': 'userA',
337+
'to': 'userB'
338+
})
339+
l.info('{"from": "userC", "to": "userD"}')
340+
l.info("This log entry will be logged with the additional key: 'message'.")
341+
342+
...
343+
344+
# IMPORTANT: before program termination, close the handler
345+
h.close()
346+
347+
**NOTE**: please note that it's important to close the sender or the handler at program termination. This will make
348+
sure the communication thread terminates and it's joined correctly. Otherwise the program won't exit, waiting for
349+
the thread, unless forcibly killed.
350+
285351
Testing
286352
-------
287353

fluent/asynchandler.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# -*- coding: utf-8 -*-
2+
3+
from fluent import asyncsender
4+
from fluent import handler
5+
from fluent.handler import FluentRecordFormatter
6+
7+
8+
class FluentHandler(handler.FluentHandler):
9+
'''
10+
Asynchronous Logging Handler for fluent.
11+
'''
12+
13+
def getSenderClass(self):
14+
return asyncsender.FluentSender
15+
16+
def close(self):
17+
self.sender.close()
18+
super(FluentHandler, self).close()

fluent/asyncsender.py

+195
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
# -*- coding: utf-8 -*-
2+
3+
from __future__ import print_function
4+
import threading
5+
import time
6+
try:
7+
from queue import Queue, Full, Empty
8+
except ImportError:
9+
from Queue import Queue, Full, Empty
10+
11+
from fluent import sender
12+
from fluent.sender import EventTime
13+
14+
_global_sender = None
15+
16+
DEFAULT_QUEUE_TIMEOUT = 0.05
17+
18+
19+
def _set_global_sender(sender):
20+
""" [For testing] Function to set global sender directly
21+
"""
22+
global _global_sender
23+
_global_sender = sender
24+
25+
26+
def setup(tag, **kwargs):
27+
global _global_sender
28+
_global_sender = FluentSender(tag, **kwargs)
29+
30+
31+
def get_global_sender():
32+
return _global_sender
33+
34+
35+
def close():
36+
get_global_sender().close()
37+
38+
39+
class CommunicatorThread(threading.Thread):
40+
def __init__(self, tag,
41+
host='localhost',
42+
port=24224,
43+
bufmax=1 * 1024 * 1024,
44+
timeout=3.0,
45+
verbose=False,
46+
buffer_overflow_handler=None,
47+
nanosecond_precision=False,
48+
msgpack_kwargs=None,
49+
queue_timeout=DEFAULT_QUEUE_TIMEOUT, *args, **kwargs):
50+
super(CommunicatorThread, self).__init__(**kwargs)
51+
self._queue = Queue()
52+
self._do_run = True
53+
self._queue_timeout = queue_timeout
54+
self._conn_close_lock = threading.Lock()
55+
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
56+
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
57+
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
58+
59+
def send(self, bytes_):
60+
try:
61+
self._queue.put(bytes_)
62+
except Full:
63+
return False
64+
return True
65+
66+
def run(self):
67+
while self._do_run:
68+
try:
69+
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
70+
except Empty:
71+
continue
72+
self._conn_close_lock.acquire()
73+
self._sender._send(bytes_)
74+
self._conn_close_lock.release()
75+
76+
def close(self, flush=True, discard=True):
77+
if discard:
78+
while not self._queue.empty():
79+
try:
80+
self._queue.get(block=False)
81+
except Empty:
82+
break
83+
while flush and (not self._queue.empty()):
84+
time.sleep(0.1)
85+
self._do_run = False
86+
self._sender.close()
87+
88+
def _close(self):
89+
self._conn_close_lock.acquire()
90+
# self._sender.lock.acquire()
91+
try:
92+
self._sender._close()
93+
finally:
94+
# self._sender.lock.release()
95+
self._conn_close_lock.release()
96+
pass
97+
98+
@property
99+
def last_error(self):
100+
return self._sender.last_error
101+
102+
@last_error.setter
103+
def last_error(self, err):
104+
self._sender.last_error = err
105+
106+
def clear_last_error(self, _thread_id = None):
107+
self._sender.clear_last_error(_thread_id=_thread_id)
108+
109+
@property
110+
def queue_timeout(self):
111+
return self._queue_timeout
112+
113+
@queue_timeout.setter
114+
def queue_timeout(self, value):
115+
self._queue_timeout = value
116+
117+
def __enter__(self):
118+
return self
119+
120+
def __exit__(self, typ, value, traceback):
121+
self.close()
122+
123+
124+
class FluentSender(sender.FluentSender):
125+
def __init__(self,
126+
tag,
127+
host='localhost',
128+
port=24224,
129+
bufmax=1 * 1024 * 1024,
130+
timeout=3.0,
131+
verbose=False,
132+
buffer_overflow_handler=None,
133+
nanosecond_precision=False,
134+
msgpack_kwargs=None,
135+
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
136+
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
137+
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
138+
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
139+
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
140+
**kwargs)
141+
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
142+
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
143+
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
144+
queue_timeout=queue_timeout)
145+
self._communicator.start()
146+
147+
def _send(self, bytes_):
148+
return self._communicator.send(bytes_=bytes_)
149+
150+
def _close(self):
151+
# super(FluentSender, self)._close()
152+
self._communicator._close()
153+
154+
def _send_internal(self, bytes_):
155+
return
156+
157+
def _send_data(self, bytes_):
158+
return
159+
160+
# override reconnect, so we don't open a socket here (since it
161+
# will be opened by the CommunicatorThread)
162+
def _reconnect(self):
163+
return
164+
165+
def close(self):
166+
self._communicator.close(flush=True)
167+
self._communicator.join()
168+
return super(FluentSender, self).close()
169+
170+
@property
171+
def last_error(self):
172+
return self._communicator.last_error
173+
174+
@last_error.setter
175+
def last_error(self, err):
176+
self._communicator.last_error = err
177+
178+
def clear_last_error(self, _thread_id = None):
179+
self._communicator.clear_last_error(_thread_id=_thread_id)
180+
181+
@property
182+
def queue_timeout(self):
183+
return self._communicator.queue_timeout
184+
185+
@queue_timeout.setter
186+
def queue_timeout(self, value):
187+
self._communicator.queue_timeout = value
188+
189+
def __enter__(self):
190+
return self
191+
192+
def __exit__(self, typ, value, traceback):
193+
# give time to the comm. thread to send its queued messages
194+
time.sleep(0.2)
195+
self.close()

fluent/handler.py

+20-6
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,28 @@ def __init__(self,
141141
nanosecond_precision=False):
142142

143143
self.tag = tag
144-
self.sender = sender.FluentSender(tag,
145-
host=host, port=port,
146-
timeout=timeout, verbose=verbose,
147-
buffer_overflow_handler=buffer_overflow_handler,
148-
msgpack_kwargs=msgpack_kwargs,
149-
nanosecond_precision=nanosecond_precision)
144+
self.sender = self.getSenderInstance(tag,
145+
host=host, port=port,
146+
timeout=timeout, verbose=verbose,
147+
buffer_overflow_handler=buffer_overflow_handler,
148+
msgpack_kwargs=msgpack_kwargs,
149+
nanosecond_precision=nanosecond_precision)
150150
logging.Handler.__init__(self)
151151

152+
def getSenderClass(self):
153+
return sender.FluentSender
154+
155+
def getSenderInstance(self, tag, host, port, timeout, verbose,
156+
buffer_overflow_handler, msgpack_kwargs,
157+
nanosecond_precision):
158+
sender_class = self.getSenderClass()
159+
return sender_class(tag,
160+
host=host, port=port,
161+
timeout=timeout, verbose=verbose,
162+
buffer_overflow_handler=buffer_overflow_handler,
163+
msgpack_kwargs=msgpack_kwargs,
164+
nanosecond_precision=nanosecond_precision)
165+
152166
def emit(self, record):
153167
data = self.format(record)
154168
return self.sender.emit(None, data)

0 commit comments

Comments
 (0)