Skip to content

Commit bb8efb8

Browse files
committed
clear queue when socket.gaierror occurs
1 parent ace80f4 commit bb8efb8

File tree

1 file changed

+23
-6
lines changed

1 file changed

+23
-6
lines changed

fluent/asyncsender.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22

3+
import socket
34
import threading
45
from queue import Queue, Full, Empty
56

@@ -81,11 +82,7 @@ def close(self, flush=True):
8182
return
8283
self._closed = True
8384
if not flush:
84-
while True:
85-
try:
86-
self._queue.get(block=False)
87-
except Empty:
88-
break
85+
self._clear_queue()
8986
self._queue.put(_TOMBSTONE)
9087
self._send_thread.join()
9188

@@ -101,6 +98,13 @@ def queue_blocking(self):
10198
def queue_circular(self):
10299
return self._queue_circular
103100

101+
def _clear_queue(self):
102+
while True:
103+
try:
104+
self._queue.get(block=False)
105+
except Empty:
106+
break
107+
104108
def _send(self, bytes_):
105109
with self.lock:
106110
if self._closed:
@@ -120,8 +124,21 @@ def _send(self, bytes_):
120124

121125
return True
122126

127+
def _send_internal(self, bytes_):
128+
send_internal_result = super(FluentSender, self)._send_internal(bytes_)
129+
if send_internal_result is False:
130+
# when send_result is False, super() caught socket.error
131+
# and assigned the error to self.last_error
132+
if isinstance(self.last_error, socket.gaierror):
133+
# clear the queue to avoid blocking and print the log
134+
self._clear_queue()
135+
self._queue.put(_TOMBSTONE)
136+
print("%s. Please check address: (%s, %s)" % (str(self.last_error), self.host, self.port))
137+
138+
return send_internal_result
139+
123140
def _send_loop(self):
124-
send_internal = super(FluentSender, self)._send_internal
141+
send_internal = self._send_internal
125142

126143
try:
127144
while True:

0 commit comments

Comments
 (0)