Skip to content

Commit f71cfc4

Browse files
authored
Always check for request timeouts (dpkp#887)
* Check for requests that timeout without causing a socket read/write event
1 parent 57ea7e8 commit f71cfc4

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

kafka/client_async.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,14 @@ def _poll(self, timeout, sleep=True):
578578
if response:
579579
responses.append(response)
580580

581+
for conn in six.itervalues(self._conns):
582+
if conn.requests_timed_out():
583+
log.warning('%s timed out after %s ms. Closing connection.',
584+
conn, conn.config['request_timeout_ms'])
585+
conn.close(error=Errors.RequestTimedOutError(
586+
'Request timed out after %s ms' %
587+
conn.config['request_timeout_ms']))
588+
581589
if self._sensors:
582590
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
583591
return responses

kafka/conn.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,15 +575,15 @@ def recv(self):
575575
log.warning('%s: No in-flight-requests to recv', self)
576576
return None
577577

578-
elif self._requests_timed_out():
578+
response = self._recv()
579+
if not response and self.requests_timed_out():
579580
log.warning('%s timed out after %s ms. Closing connection.',
580581
self, self.config['request_timeout_ms'])
581582
self.close(error=Errors.RequestTimedOutError(
582583
'Request timed out after %s ms' %
583584
self.config['request_timeout_ms']))
584585
return None
585-
586-
return self._recv()
586+
return response
587587

588588
def _recv(self):
589589
# Not receiving is the state of reading the payload header
@@ -719,7 +719,7 @@ def _process_response(self, read_buffer):
719719
self._processing = False
720720
return response
721721

722-
def _requests_timed_out(self):
722+
def requests_timed_out(self):
723723
if self.in_flight_requests:
724724
oldest_at = self.in_flight_requests[0].timestamp
725725
timeout = self.config['request_timeout_ms'] / 1000.0

0 commit comments

Comments
 (0)