Skip to content

Commit 4a663c6

Browse files
authored
Merge pull request fluent#104 from arcivanov/issue_77_103
Add proper handling of the send, tracking if data was actually sent in fluent#77
2 parents 3178185 + 03946f0 commit 4a663c6

File tree

4 files changed

+53
-45
lines changed

4 files changed

+53
-45
lines changed

fluent/asynchandler.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from fluent import asyncsender
44
from fluent import handler
5-
from fluent.handler import FluentRecordFormatter
65

76

87
class FluentHandler(handler.FluentHandler):
@@ -14,5 +13,7 @@ def getSenderClass(self):
1413
return asyncsender.FluentSender
1514

1615
def close(self):
17-
self.sender.close()
18-
super(FluentHandler, self).close()
16+
try:
17+
self.sender.close()
18+
finally:
19+
super(FluentHandler, self).close()

fluent/asyncsender.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import print_function
4+
45
import threading
56
import time
7+
68
try:
79
from queue import Queue, Full, Empty
810
except ImportError:
@@ -11,30 +13,32 @@
1113
from fluent import sender
1214
from fluent.sender import EventTime
1315

16+
__all__ = ["EventTime", "FluentSender"]
17+
1418
_global_sender = None
1519

1620
DEFAULT_QUEUE_TIMEOUT = 0.05
1721
DEFAULT_QUEUE_MAXSIZE = 100
1822
DEFAULT_QUEUE_CIRCULAR = False
1923

2024

21-
def _set_global_sender(sender):
25+
def _set_global_sender(sender): # pragma: no cover
2226
""" [For testing] Function to set global sender directly
2327
"""
2428
global _global_sender
2529
_global_sender = sender
2630

2731

28-
def setup(tag, **kwargs):
32+
def setup(tag, **kwargs): # pragma: no cover
2933
global _global_sender
3034
_global_sender = FluentSender(tag, **kwargs)
3135

3236

33-
def get_global_sender():
37+
def get_global_sender(): # pragma: no cover
3438
return _global_sender
3539

3640

37-
def close():
41+
def close(): # pragma: no cover
3842
get_global_sender().close()
3943

4044

@@ -81,9 +85,8 @@ def run(self):
8185
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
8286
except Empty:
8387
continue
84-
self._conn_close_lock.acquire()
85-
self._sender._send(bytes_)
86-
self._conn_close_lock.release()
88+
with self._conn_close_lock:
89+
self._sender._send(bytes_)
8790

8891
def close(self, flush=True, discard=True):
8992
if discard:
@@ -98,14 +101,8 @@ def close(self, flush=True, discard=True):
98101
self._sender.close()
99102

100103
def _close(self):
101-
self._conn_close_lock.acquire()
102-
# self._sender.lock.acquire()
103-
try:
104+
with self._conn_close_lock:
104105
self._sender._close()
105-
finally:
106-
# self._sender.lock.release()
107-
self._conn_close_lock.release()
108-
pass
109106

110107
@property
111108
def last_error(self):
@@ -115,7 +112,7 @@ def last_error(self):
115112
def last_error(self, err):
116113
self._sender.last_error = err
117114

118-
def clear_last_error(self, _thread_id = None):
115+
def clear_last_error(self, _thread_id=None):
119116
self._sender.clear_last_error(_thread_id=_thread_id)
120117

121118
@property
@@ -196,7 +193,7 @@ def last_error(self):
196193
def last_error(self, err):
197194
self._communicator.last_error = err
198195

199-
def clear_last_error(self, _thread_id = None):
196+
def clear_last_error(self, _thread_id=None):
200197
self._communicator.clear_last_error(_thread_id=_thread_id)
201198

202199
@property

fluent/sender.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,35 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import print_function
4-
import struct
4+
55
import socket
6+
import struct
67
import threading
78
import time
89
import traceback
910

1011
import msgpack
1112

12-
1313
_global_sender = None
1414

1515

16-
def _set_global_sender(sender):
16+
def _set_global_sender(sender): # pragma: no cover
1717
""" [For testing] Function to set global sender directly
1818
"""
1919
global _global_sender
2020
_global_sender = sender
2121

2222

23-
def setup(tag, **kwargs):
23+
def setup(tag, **kwargs): # pragma: no cover
2424
global _global_sender
2525
_global_sender = FluentSender(tag, **kwargs)
2626

2727

28-
def get_global_sender():
28+
def get_global_sender(): # pragma: no cover
2929
return _global_sender
3030

31-
def close():
31+
32+
def close(): # pragma: no cover
3233
get_global_sender().close()
3334

3435

@@ -54,7 +55,7 @@ def __init__(self,
5455
buffer_overflow_handler=None,
5556
nanosecond_precision=False,
5657
msgpack_kwargs=None,
57-
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
58+
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
5859

5960
self.tag = tag
6061
self.host = host
@@ -98,8 +99,7 @@ def emit_with_time(self, label, timestamp, data):
9899
return self._send(bytes_)
99100

100101
def close(self):
101-
self.lock.acquire()
102-
try:
102+
with self.lock:
103103
if self.pendings:
104104
try:
105105
self._send_data(self.pendings)
@@ -108,8 +108,6 @@ def close(self):
108108

109109
self._close()
110110
self.pendings = None
111-
finally:
112-
self.lock.release()
113111

114112
def _make_packet(self, label, timestamp, data):
115113
if label:
@@ -122,11 +120,8 @@ def _make_packet(self, label, timestamp, data):
122120
return msgpack.packb(packet, **self.msgpack_kwargs)
123121

124122
def _send(self, bytes_):
125-
self.lock.acquire()
126-
try:
123+
with self.lock:
127124
return self._send_internal(bytes_)
128-
finally:
129-
self.lock.release()
130125

131126
def _send_internal(self, bytes_):
132127
# buffering
@@ -142,7 +137,6 @@ def _send_internal(self, bytes_):
142137

143138
return True
144139
except socket.error as e:
145-
#except Exception as e:
146140
self.last_error = e
147141

148142
# close socket
@@ -161,7 +155,13 @@ def _send_data(self, bytes_):
161155
# reconnect if possible
162156
self._reconnect()
163157
# send message
164-
self.socket.sendall(bytes_)
158+
bytes_to_send = len(bytes_)
159+
bytes_sent = 0
160+
while bytes_sent < bytes_to_send:
161+
sent = self.socket.send(bytes_[bytes_sent:])
162+
if sent == 0:
163+
raise BrokenPipeError(32, 'broken pipe')
164+
bytes_sent += sent
165165

166166
def _reconnect(self):
167167
if not self.socket:
@@ -172,6 +172,8 @@ def _reconnect(self):
172172
else:
173173
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
174174
sock.settimeout(self.timeout)
175+
# This might be controversial and may need to be removed
176+
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
175177
sock.connect((self.host, self.port))
176178
self.socket = sock
177179

@@ -189,17 +191,22 @@ def last_error(self):
189191

190192
@last_error.setter
191193
def last_error(self, err):
192-
self._last_error_threadlocal.exception = err
194+
self._last_error_threadlocal.exception = err
193195

194-
def clear_last_error(self, _thread_id = None):
196+
def clear_last_error(self, _thread_id=None):
195197
if hasattr(self._last_error_threadlocal, 'exception'):
196198
delattr(self._last_error_threadlocal, 'exception')
197199

198200
def _close(self):
199-
if self.socket:
200-
self.socket.shutdown(socket.SHUT_RDWR)
201-
self.socket.close()
202-
self.socket = None
201+
try:
202+
sock = self.socket
203+
if sock:
204+
try:
205+
sock.shutdown(socket.SHUT_RDWR)
206+
finally:
207+
sock.close()
208+
finally:
209+
self.socket = None
203210

204211
def __enter__(self):
205212
return self

tests/test_sender.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import print_function
4-
import unittest
4+
55
import socket
6+
import unittest
7+
68
import msgpack
79

810
import fluent.sender
@@ -127,8 +129,9 @@ def test_clear_last_error(self):
127129

128130
self.assertEqual(self._sender.last_error, None)
129131

130-
@unittest.skip("This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped")
131-
#@patch('fluent.sender.socket')
132+
@unittest.skip(
133+
"This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped")
134+
# @patch('fluent.sender.socket')
132135
def test_connect_exception_during_sender_init(self, mock_socket):
133136
# Make the socket.socket().connect() call raise a custom exception
134137
mock_connect = mock_socket.socket.return_value.connect

0 commit comments

Comments
 (0)