Skip to content

Commit 161d76e

Browse files
committed
Add proper handling of the send, tracking if data was actually sent in fluent#77
Use TCP_NODELAY to detect disconnect early Fix connection shutdown cleanup in fluent#103 Make locking less verbose via with idiom fixes fluent#77, fluent#103
1 parent af4fd66 commit 161d76e

File tree

4 files changed

+56
-47
lines changed

4 files changed

+56
-47
lines changed

fluent/asynchandler.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from fluent import asyncsender
44
from fluent import handler
5-
from fluent.handler import FluentRecordFormatter
65

76

87
class FluentHandler(handler.FluentHandler):
@@ -14,5 +13,7 @@ def getSenderClass(self):
1413
return asyncsender.FluentSender
1514

1615
def close(self):
17-
self.sender.close()
18-
super(FluentHandler, self).close()
16+
try:
17+
self.sender.close()
18+
finally:
19+
super(FluentHandler, self).close()

fluent/asyncsender.py

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import print_function
4+
45
import threading
56
import time
7+
68
try:
79
from queue import Queue, Full, Empty
810
except ImportError:
@@ -11,28 +13,30 @@
1113
from fluent import sender
1214
from fluent.sender import EventTime
1315

16+
__all__ = ["EventTime", "FluentSender"]
17+
1418
_global_sender = None
1519

1620
DEFAULT_QUEUE_TIMEOUT = 0.05
1721

1822

19-
def _set_global_sender(sender):
23+
def _set_global_sender(sender): # pragma: no cover
2024
""" [For testing] Function to set global sender directly
2125
"""
2226
global _global_sender
2327
_global_sender = sender
2428

2529

26-
def setup(tag, **kwargs):
30+
def setup(tag, **kwargs): # pragma: no cover
2731
global _global_sender
2832
_global_sender = FluentSender(tag, **kwargs)
2933

3034

31-
def get_global_sender():
35+
def get_global_sender(): # pragma: no cover
3236
return _global_sender
3337

3438

35-
def close():
39+
def close(): # pragma: no cover
3640
get_global_sender().close()
3741

3842

@@ -69,9 +73,8 @@ def run(self):
6973
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
7074
except Empty:
7175
continue
72-
self._conn_close_lock.acquire()
73-
self._sender._send(bytes_)
74-
self._conn_close_lock.release()
76+
with self._conn_close_lock:
77+
self._sender._send(bytes_)
7578

7679
def close(self, flush=True, discard=True):
7780
if discard:
@@ -86,14 +89,8 @@ def close(self, flush=True, discard=True):
8689
self._sender.close()
8790

8891
def _close(self):
89-
self._conn_close_lock.acquire()
90-
# self._sender.lock.acquire()
91-
try:
92+
with self._conn_close_lock:
9293
self._sender._close()
93-
finally:
94-
# self._sender.lock.release()
95-
self._conn_close_lock.release()
96-
pass
9794

9895
@property
9996
def last_error(self):
@@ -103,7 +100,7 @@ def last_error(self):
103100
def last_error(self, err):
104101
self._sender.last_error = err
105102

106-
def clear_last_error(self, _thread_id = None):
103+
def clear_last_error(self, _thread_id=None):
107104
self._sender.clear_last_error(_thread_id=_thread_id)
108105

109106
@property
@@ -133,14 +130,15 @@ def __init__(self,
133130
nanosecond_precision=False,
134131
msgpack_kwargs=None,
135132
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
136-
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
133+
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
137134
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
138135
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
139136
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
140137
**kwargs)
141138
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
142139
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
143-
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
140+
nanosecond_precision=nanosecond_precision,
141+
msgpack_kwargs=msgpack_kwargs,
144142
queue_timeout=queue_timeout)
145143
self._communicator.start()
146144

@@ -175,7 +173,7 @@ def last_error(self):
175173
def last_error(self, err):
176174
self._communicator.last_error = err
177175

178-
def clear_last_error(self, _thread_id = None):
176+
def clear_last_error(self, _thread_id=None):
179177
self._communicator.clear_last_error(_thread_id=_thread_id)
180178

181179
@property

fluent/sender.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,35 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import print_function
4-
import struct
4+
55
import socket
6+
import struct
67
import threading
78
import time
89
import traceback
910

1011
import msgpack
1112

12-
1313
_global_sender = None
1414

1515

16-
def _set_global_sender(sender):
16+
def _set_global_sender(sender): # pragma: no cover
1717
""" [For testing] Function to set global sender directly
1818
"""
1919
global _global_sender
2020
_global_sender = sender
2121

2222

23-
def setup(tag, **kwargs):
23+
def setup(tag, **kwargs): # pragma: no cover
2424
global _global_sender
2525
_global_sender = FluentSender(tag, **kwargs)
2626

2727

28-
def get_global_sender():
28+
def get_global_sender(): # pragma: no cover
2929
return _global_sender
3030

31-
def close():
31+
32+
def close(): # pragma: no cover
3233
get_global_sender().close()
3334

3435

@@ -54,7 +55,7 @@ def __init__(self,
5455
buffer_overflow_handler=None,
5556
nanosecond_precision=False,
5657
msgpack_kwargs=None,
57-
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
58+
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
5859

5960
self.tag = tag
6061
self.host = host
@@ -98,8 +99,7 @@ def emit_with_time(self, label, timestamp, data):
9899
return self._send(bytes_)
99100

100101
def close(self):
101-
self.lock.acquire()
102-
try:
102+
with self.lock:
103103
if self.pendings:
104104
try:
105105
self._send_data(self.pendings)
@@ -108,8 +108,6 @@ def close(self):
108108

109109
self._close()
110110
self.pendings = None
111-
finally:
112-
self.lock.release()
113111

114112
def _make_packet(self, label, timestamp, data):
115113
if label:
@@ -122,11 +120,8 @@ def _make_packet(self, label, timestamp, data):
122120
return msgpack.packb(packet, **self.msgpack_kwargs)
123121

124122
def _send(self, bytes_):
125-
self.lock.acquire()
126-
try:
123+
with self.lock:
127124
return self._send_internal(bytes_)
128-
finally:
129-
self.lock.release()
130125

131126
def _send_internal(self, bytes_):
132127
# buffering
@@ -142,7 +137,6 @@ def _send_internal(self, bytes_):
142137

143138
return True
144139
except socket.error as e:
145-
#except Exception as e:
146140
self.last_error = e
147141

148142
# close socket
@@ -161,7 +155,13 @@ def _send_data(self, bytes_):
161155
# reconnect if possible
162156
self._reconnect()
163157
# send message
164-
self.socket.sendall(bytes_)
158+
bytes_to_send = len(bytes_)
159+
bytes_sent = 0
160+
while bytes_sent < bytes_to_send:
161+
sent = self.socket.send(bytes_[bytes_sent:])
162+
if sent == 0:
163+
raise BrokenPipeError(32, 'broken pipe')
164+
bytes_sent += sent
165165

166166
def _reconnect(self):
167167
if not self.socket:
@@ -172,6 +172,8 @@ def _reconnect(self):
172172
else:
173173
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
174174
sock.settimeout(self.timeout)
175+
# This might be controversial and may need to be removed
176+
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
175177
sock.connect((self.host, self.port))
176178
self.socket = sock
177179

@@ -189,17 +191,22 @@ def last_error(self):
189191

190192
@last_error.setter
191193
def last_error(self, err):
192-
self._last_error_threadlocal.exception = err
194+
self._last_error_threadlocal.exception = err
193195

194-
def clear_last_error(self, _thread_id = None):
196+
def clear_last_error(self, _thread_id=None):
195197
if hasattr(self._last_error_threadlocal, 'exception'):
196198
delattr(self._last_error_threadlocal, 'exception')
197199

198200
def _close(self):
199-
if self.socket:
200-
self.socket.shutdown(socket.SHUT_RDWR)
201-
self.socket.close()
202-
self.socket = None
201+
try:
202+
sock = self.socket
203+
if sock:
204+
try:
205+
sock.shutdown(socket.SHUT_RDWR)
206+
finally:
207+
sock.close()
208+
finally:
209+
self.socket = None
203210

204211
def __enter__(self):
205212
return self

tests/test_sender.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import print_function
4-
import unittest
4+
55
import socket
6+
import unittest
7+
68
import msgpack
79

810
import fluent.sender
@@ -127,8 +129,9 @@ def test_clear_last_error(self):
127129

128130
self.assertEqual(self._sender.last_error, None)
129131

130-
@unittest.skip("This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped")
131-
#@patch('fluent.sender.socket')
132+
@unittest.skip(
133+
"This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped")
134+
# @patch('fluent.sender.socket')
132135
def test_connect_exception_during_sender_init(self, mock_socket):
133136
# Make the socket.socket().connect() call raise a custom exception
134137
mock_connect = mock_socket.socket.return_value.connect

0 commit comments

Comments
 (0)