Skip to content

Commit 73af98f

Browse files
authored
Merge pull request fluent#128 from arcivanov/verbose_trace
Perform a non-blocking read-side check before and after send
2 parents 425acf5 + 149701c commit 73af98f

File tree

4 files changed

+77
-5
lines changed

4 files changed

+77
-5
lines changed

fluent/sender.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,34 @@ def _send_internal(self, bytes_):
166166

167167
return False
168168

169+
def _check_recv_side(self):
170+
try:
171+
self.socket.settimeout(0.0)
172+
try:
173+
recvd = self.socket.recv(4096, socket.MSG_DONTWAIT)
174+
except socket.error as recv_e:
175+
if recv_e.errno != errno.EWOULDBLOCK:
176+
raise
177+
return
178+
179+
if recvd == b'':
180+
raise socket.error(errno.EPIPE, "Broken pipe")
181+
finally:
182+
self.socket.settimeout(self.timeout)
183+
169184
def _send_data(self, bytes_):
170185
# reconnect if possible
171186
self._reconnect()
172187
# send message
173188
bytes_to_send = len(bytes_)
174189
bytes_sent = 0
190+
self._check_recv_side()
175191
while bytes_sent < bytes_to_send:
176192
sent = self.socket.send(bytes_[bytes_sent:])
177193
if sent == 0:
178194
raise socket.error(errno.EPIPE, "Broken pipe")
179195
bytes_sent += sent
196+
self._check_recv_side()
180197

181198
def _reconnect(self):
182199
if not self.socket:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
setup(
1414
name='fluent-logger',
15-
version='0.9.0',
15+
version='0.9.9',
1616
description=desc,
1717
long_description=open(README).read(),
1818
package_dir={'fluent': 'fluent'},

tests/test_asynchandler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import logging
44
import sys
5-
import time
65
import unittest
76

87
import fluent.asynchandler

tests/test_sender.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,20 +218,76 @@ def test_broken_conn(self):
218218
self.assertTrue(sender.socket)
219219

220220
class FakeSocket:
221+
def __init__(self):
222+
self.to = 123
223+
self.send_side_effects = [3, 0, 9]
224+
self.send_idx = 0
225+
self.recv_side_effects = [socket.error(errno.EWOULDBLOCK, "Blah"),
226+
b"this data is going to be ignored",
227+
b"",
228+
socket.error(errno.EWOULDBLOCK, "Blah"),
229+
socket.error(errno.EWOULDBLOCK, "Blah"),
230+
socket.error(errno.EACCES, "This error will never happen"),
231+
]
232+
self.recv_idx = 0
233+
221234
def send(self, bytes_):
222-
return 0
235+
try:
236+
v = self.send_side_effects[self.send_idx]
237+
if isinstance(v, Exception):
238+
raise v
239+
if isinstance(v, type) and issubclass(v, Exception):
240+
raise v()
241+
return v
242+
finally:
243+
self.send_idx += 1
223244

224245
def shutdown(self, mode):
225246
pass
226247

227248
def close(self):
228249
pass
229250

251+
def settimeout(self, to):
252+
self.to = to
253+
254+
def gettimeout(self):
255+
return self.to
256+
257+
def recv(self, bufsize, flags):
258+
try:
259+
v = self.recv_side_effects[self.recv_idx]
260+
if isinstance(v, Exception):
261+
raise v
262+
if isinstance(v, type) and issubclass(v, Exception):
263+
raise v()
264+
return v
265+
finally:
266+
self.recv_idx += 1
267+
230268
old_sock = self._sender.socket
231-
self._sender.socket = FakeSocket()
269+
sock = FakeSocket()
270+
232271
try:
272+
self._sender.socket = sock
273+
sender.last_error = None
274+
self.assertTrue(sender._send_internal(b"456"))
275+
self.assertFalse(sender.last_error)
276+
277+
self._sender.socket = sock
278+
sender.last_error = None
279+
self.assertFalse(sender._send_internal(b"456"))
280+
self.assertEqual(sender.last_error.errno, errno.EPIPE)
281+
282+
self._sender.socket = sock
283+
sender.last_error = None
284+
self.assertFalse(sender._send_internal(b"456"))
285+
self.assertEqual(sender.last_error.errno, errno.EPIPE)
286+
287+
self._sender.socket = sock
288+
sender.last_error = None
233289
self.assertFalse(sender._send_internal(b"456"))
234-
self.assertTrue(sender.last_error.errno, errno.EPIPE)
290+
self.assertEqual(sender.last_error.errno, errno.EACCES)
235291
finally:
236292
self._sender.socket = old_sock
237293

0 commit comments

Comments
 (0)