Skip to content

Commit b7254b9

Browse files
authored
Merge pull request fluent#107 from arcivanov/issue_105
Almost complete rewrite of async sender and async handler
2 parents f357a2d + fad318b commit b7254b9

File tree

10 files changed

+748
-643
lines changed

10 files changed

+748
-643
lines changed

fluent/asynchandler.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,17 @@ def getSenderClass(self):
1313
return asyncsender.FluentSender
1414

1515
def close(self):
16+
self.acquire()
1617
try:
17-
self.sender.close()
18+
try:
19+
self.sender.close()
20+
finally:
21+
super(FluentHandler, self).close()
1822
finally:
19-
super(FluentHandler, self).close()
23+
self.release()
24+
25+
def __enter__(self):
26+
return self
27+
28+
def __exit__(self, exc_type, exc_val, exc_tb):
29+
self.close()

fluent/asyncsender.py

Lines changed: 63 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from __future__ import print_function
44

55
import threading
6-
import time
76

87
try:
98
from queue import Queue, Full, Empty
@@ -15,12 +14,13 @@
1514

1615
__all__ = ["EventTime", "FluentSender"]
1716

18-
_global_sender = None
19-
20-
DEFAULT_QUEUE_TIMEOUT = 0.05
2117
DEFAULT_QUEUE_MAXSIZE = 100
2218
DEFAULT_QUEUE_CIRCULAR = False
2319

20+
_TOMBSTONE = object()
21+
22+
_global_sender = None
23+
2424

2525
def _set_global_sender(sender): # pragma: no cover
2626
""" [For testing] Function to set global sender directly
@@ -42,8 +42,9 @@ def close(): # pragma: no cover
4242
get_global_sender().close()
4343

4444

45-
class CommunicatorThread(threading.Thread):
46-
def __init__(self, tag,
45+
class FluentSender(sender.FluentSender):
46+
def __init__(self,
47+
tag,
4748
host='localhost',
4849
port=24224,
4950
bufmax=1 * 1024 * 1024,
@@ -52,76 +53,42 @@ def __init__(self, tag,
5253
buffer_overflow_handler=None,
5354
nanosecond_precision=False,
5455
msgpack_kwargs=None,
55-
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
5656
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
57-
queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs):
58-
super(CommunicatorThread, self).__init__(**kwargs)
59-
self._queue = Queue(maxsize=queue_maxsize)
60-
self._do_run = True
61-
self._queue_timeout = queue_timeout
57+
queue_circular=DEFAULT_QUEUE_CIRCULAR,
58+
**kwargs):
59+
"""
60+
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
61+
"""
62+
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
63+
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
64+
nanosecond_precision=nanosecond_precision,
65+
msgpack_kwargs=msgpack_kwargs,
66+
**kwargs)
6267
self._queue_maxsize = queue_maxsize
6368
self._queue_circular = queue_circular
64-
self._conn_close_lock = threading.Lock()
65-
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
66-
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
67-
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
6869

69-
def send(self, bytes_):
70-
if self._queue_circular and self._queue.full():
71-
# discard oldest
72-
try:
73-
self._queue.get(block=False)
74-
except Empty: # pragma: no cover
75-
pass
76-
try:
77-
self._queue.put(bytes_, block=(not self._queue_circular))
78-
except Full:
79-
return False
80-
return True
70+
self._thread_guard = threading.Event() # This ensures visibility across all variables
71+
self._closed = False
8172

82-
def run(self):
83-
while self._do_run:
84-
try:
85-
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
86-
except Empty:
87-
continue
88-
with self._conn_close_lock:
89-
self._sender._send(bytes_)
90-
91-
def close(self, flush=True, discard=True):
92-
if discard:
93-
while not self._queue.empty():
94-
try:
95-
self._queue.get(block=False)
96-
except Empty:
97-
break
98-
while flush and (not self._queue.empty()):
99-
time.sleep(0.1)
100-
self._do_run = False
101-
self._sender.close()
102-
103-
def _close(self):
104-
with self._conn_close_lock:
105-
self._sender._close()
106-
107-
@property
108-
def last_error(self):
109-
return self._sender.last_error
110-
111-
@last_error.setter
112-
def last_error(self, err):
113-
self._sender.last_error = err
114-
115-
def clear_last_error(self, _thread_id=None):
116-
self._sender.clear_last_error(_thread_id=_thread_id)
117-
118-
@property
119-
def queue_timeout(self):
120-
return self._queue_timeout
121-
122-
@queue_timeout.setter
123-
def queue_timeout(self, value):
124-
self._queue_timeout = value
73+
self._queue = Queue(maxsize=queue_maxsize)
74+
self._send_thread = threading.Thread(target=self._send_loop,
75+
name="AsyncFluentSender %d" % id(self))
76+
self._send_thread.daemon = True
77+
self._send_thread.start()
78+
79+
def close(self, flush=True):
80+
with self.lock:
81+
if self._closed:
82+
return
83+
self._closed = True
84+
if not flush:
85+
while True:
86+
try:
87+
self._queue.get(block=False)
88+
except Empty:
89+
break
90+
self._queue.put(_TOMBSTONE)
91+
self._send_thread.join()
12592

12693
@property
12794
def queue_maxsize(self):
@@ -135,91 +102,35 @@ def queue_blocking(self):
135102
def queue_circular(self):
136103
return self._queue_circular
137104

138-
139-
class FluentSender(sender.FluentSender):
140-
def __init__(self,
141-
tag,
142-
host='localhost',
143-
port=24224,
144-
bufmax=1 * 1024 * 1024,
145-
timeout=3.0,
146-
verbose=False,
147-
buffer_overflow_handler=None,
148-
nanosecond_precision=False,
149-
msgpack_kwargs=None,
150-
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
151-
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
152-
queue_circular=DEFAULT_QUEUE_CIRCULAR,
153-
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
154-
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
155-
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
156-
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
157-
**kwargs)
158-
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
159-
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
160-
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
161-
queue_timeout=queue_timeout, queue_maxsize=queue_maxsize,
162-
queue_circular=queue_circular)
163-
self._communicator.start()
164-
165105
def _send(self, bytes_):
166-
return self._communicator.send(bytes_=bytes_)
167-
168-
def _close(self):
169-
# super(FluentSender, self)._close()
170-
self._communicator._close()
171-
172-
def _send_internal(self, bytes_):
173-
assert False # pragma: no cover
174-
175-
def _send_data(self, bytes_):
176-
assert False # pragma: no cover
177-
178-
# override reconnect, so we don't open a socket here (since it
179-
# will be opened by the CommunicatorThread)
180-
def _reconnect(self):
181-
return
182-
183-
def close(self):
184-
self._communicator.close(flush=True)
185-
self._communicator.join()
186-
return super(FluentSender, self).close()
187-
188-
@property
189-
def last_error(self):
190-
return self._communicator.last_error
191-
192-
@last_error.setter
193-
def last_error(self, err):
194-
self._communicator.last_error = err
195-
196-
def clear_last_error(self, _thread_id=None):
197-
self._communicator.clear_last_error(_thread_id=_thread_id)
106+
with self.lock:
107+
if self._closed:
108+
return False
109+
if self._queue_circular and self._queue.full():
110+
# discard oldest
111+
try:
112+
self._queue.get(block=False)
113+
except Empty: # pragma: no cover
114+
pass
115+
try:
116+
self._queue.put(bytes_, block=(not self._queue_circular))
117+
except Full: # pragma: no cover
118+
return False # this actually can't happen
198119

199-
@property
200-
def queue_timeout(self):
201-
return self._communicator.queue_timeout
120+
return True
202121

203-
@queue_timeout.setter
204-
def queue_timeout(self, value):
205-
self._communicator.queue_timeout = value
122+
def _send_loop(self):
123+
send_internal = super(FluentSender, self)._send_internal
206124

207-
@property
208-
def queue_maxsize(self):
209-
return self._communicator.queue_maxsize
210-
211-
@property
212-
def queue_blocking(self):
213-
return self._communicator.queue_blocking
214-
215-
@property
216-
def queue_circular(self):
217-
return self._communicator.queue_circular
125+
try:
126+
while True:
127+
bytes_ = self._queue.get(block=True)
128+
if bytes_ is _TOMBSTONE:
129+
break
218130

219-
def __enter__(self):
220-
return self
131+
send_internal(bytes_)
132+
finally:
133+
self._close()
221134

222-
def __exit__(self, typ, value, traceback):
223-
# give time to the comm. thread to send its queued messages
224-
time.sleep(0.2)
135+
def __exit__(self, exc_type, exc_val, exc_tb):
225136
self.close()

fluent/handler.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False
8181

8282
def format(self, record):
8383
# Only needed for python2.6
84-
if sys.version_info[0:2] <= (2, 6) and self.usesTime():
84+
if sys.version_info[0:2] <= (2, 6) and self.usesTime(): # pragma: no cover
8585
record.asctime = self.formatTime(record, self.datefmt)
8686

8787
# Compute attributes handled by parent class.
@@ -116,8 +116,11 @@ def usesTime(self):
116116
if self._exc_attrs is not None:
117117
return super(FluentRecordFormatter, self).usesTime()
118118
else:
119-
return any([value.find('%(asctime)') >= 0
120-
for value in self._fmt_dict.values()])
119+
if self.__style:
120+
search = self.__style.asctime_search
121+
else:
122+
search = "%(asctime)"
123+
return any([value.find(search) >= 0 for value in self._fmt_dict.values()])
121124

122125
def _structuring(self, data, record):
123126
""" Melds `msg` into `data`.
@@ -209,7 +212,15 @@ def emit(self, record):
209212
def close(self):
210213
self.acquire()
211214
try:
212-
self.sender._close()
213-
logging.Handler.close(self)
215+
try:
216+
self.sender.close()
217+
finally:
218+
super(FluentHandler, self).close()
214219
finally:
215220
self.release()
221+
222+
def __enter__(self):
223+
return self
224+
225+
def __exit__(self, exc_type, exc_val, exc_tb):
226+
self.close()

0 commit comments

Comments
 (0)