Skip to content

Commit 0429699

Browse files
authored
Timeout idle connections via connections_max_idle_ms (dpkp#1068)
1 parent 7c24135 commit 0429699

File tree

4 files changed

+135
-6
lines changed

4 files changed

+135
-6
lines changed

kafka/client_async.py

+94-1
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class KafkaClient(object):
135135
'bootstrap_servers': 'localhost',
136136
'client_id': 'kafka-python-' + __version__,
137137
'request_timeout_ms': 40000,
138+
'connections_max_idle_ms': 9 * 60 * 1000,
138139
'reconnect_backoff_ms': 50,
139140
'max_in_flight_requests_per_connection': 5,
140141
'receive_buffer_bytes': None,
@@ -194,6 +195,7 @@ def __init__(self, **configs):
194195
self._wake_r.setblocking(False)
195196
self._wake_lock = threading.Lock()
196197
self._selector.register(self._wake_r, selectors.EVENT_READ)
198+
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
197199
self._closed = False
198200
self._sensors = None
199201
if self.config['metrics']:
@@ -291,6 +293,8 @@ def _conn_state_change(self, node_id, conn):
291293
if self._sensors:
292294
self._sensors.connection_created.record()
293295

296+
self._idle_expiry_manager.update(node_id)
297+
294298
if 'bootstrap' in self._conns and node_id != 'bootstrap':
295299
bootstrap = self._conns.pop('bootstrap')
296300
# XXX: make conn.close() require error to cause refresh
@@ -308,7 +312,13 @@ def _conn_state_change(self, node_id, conn):
308312
pass
309313
if self._sensors:
310314
self._sensors.connection_closed.record()
311-
if self._refresh_on_disconnects and not self._closed:
315+
316+
idle_disconnect = False
317+
if self._idle_expiry_manager.is_expired(node_id):
318+
idle_disconnect = True
319+
self._idle_expiry_manager.remove(node_id)
320+
321+
if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
312322
log.warning("Node %s connection failed -- refreshing metadata", node_id)
313323
self.cluster.request_update()
314324

@@ -514,10 +524,12 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
514524
if future and future.is_done:
515525
timeout = 0
516526
else:
527+
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
517528
timeout = min(
518529
timeout_ms,
519530
metadata_timeout_ms,
520531
self._delayed_tasks.next_at() * 1000,
532+
idle_connection_timeout_ms,
521533
self.config['request_timeout_ms'])
522534
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
523535

@@ -572,6 +584,8 @@ def _poll(self, timeout, sleep=True):
572584
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
573585
continue
574586

587+
self._idle_expiry_manager.update(conn.node_id)
588+
575589
# Accumulate as many responses as the connection has pending
576590
while conn.in_flight_requests:
577591
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
@@ -601,6 +615,7 @@ def _poll(self, timeout, sleep=True):
601615

602616
if self._sensors:
603617
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
618+
self._maybe_close_oldest_connection()
604619
return responses
605620

606621
def in_flight_request_count(self, node_id=None):
@@ -846,6 +861,14 @@ def _clear_wake_fd(self):
846861
except socket.error:
847862
break
848863

864+
def _maybe_close_oldest_connection(self):
865+
expired_connection = self._idle_expiry_manager.poll_expired_connection()
866+
if expired_connection:
867+
conn_id, ts = expired_connection
868+
idle_ms = (time.time() - ts) * 1000
869+
log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms)
870+
self.close(node_id=conn_id)
871+
849872

850873
class DelayedTaskQueue(object):
851874
# see https://docs.python.org/2/library/heapq.html
@@ -920,6 +943,76 @@ def pop_ready(self):
920943
return ready_tasks
921944

922945

946+
# OrderedDict requires python2.7+
947+
try:
948+
from collections import OrderedDict
949+
except ImportError:
950+
# If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads
951+
OrderedDict = dict
952+
953+
954+
class IdleConnectionManager(object):
955+
def __init__(self, connections_max_idle_ms):
956+
if connections_max_idle_ms > 0:
957+
self.connections_max_idle = connections_max_idle_ms / 1000
958+
else:
959+
self.connections_max_idle = float('inf')
960+
self.next_idle_close_check_time = None
961+
self.update_next_idle_close_check_time(time.time())
962+
self.lru_connections = OrderedDict()
963+
964+
def update(self, conn_id):
965+
# order should reflect last-update
966+
if conn_id in self.lru_connections:
967+
del self.lru_connections[conn_id]
968+
self.lru_connections[conn_id] = time.time()
969+
970+
def remove(self, conn_id):
971+
if conn_id in self.lru_connections:
972+
del self.lru_connections[conn_id]
973+
974+
def is_expired(self, conn_id):
975+
if conn_id not in self.lru_connections:
976+
return None
977+
return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle
978+
979+
def next_check_ms(self):
980+
now = time.time()
981+
if not self.lru_connections:
982+
return float('inf')
983+
elif self.next_idle_close_check_time <= now:
984+
return 0
985+
else:
986+
return int((self.next_idle_close_check_time - now) * 1000)
987+
988+
def update_next_idle_close_check_time(self, ts):
989+
self.next_idle_close_check_time = ts + self.connections_max_idle
990+
991+
def poll_expired_connection(self):
992+
if time.time() < self.next_idle_close_check_time:
993+
return None
994+
995+
if not len(self.lru_connections):
996+
return None
997+
998+
oldest_conn_id = None
999+
oldest_ts = None
1000+
if OrderedDict is dict:
1001+
for conn_id, ts in self.lru_connections.items():
1002+
if oldest_conn_id is None or ts < oldest_ts:
1003+
oldest_conn_id = conn_id
1004+
oldest_ts = ts
1005+
else:
1006+
(oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items()))
1007+
1008+
self.update_next_idle_close_check_time(oldest_ts)
1009+
1010+
if time.time() >= oldest_ts + self.connections_max_idle:
1011+
return (oldest_conn_id, oldest_ts)
1012+
else:
1013+
return None
1014+
1015+
9231016
class KafkaClientMetrics(object):
9241017
def __init__(self, metrics, metric_group_prefix, conns):
9251018
self.metrics = metrics

kafka/conn.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ def __init__(self, host, port, afi, **configs):
177177
if key in configs:
178178
self.config[key] = configs[key]
179179

180+
self.node_id = self.config.pop('node_id')
181+
180182
if self.config['receive_buffer_bytes'] is not None:
181183
self.config['socket_options'].append(
182184
(socket.SOL_SOCKET, socket.SO_RCVBUF,
@@ -214,7 +216,7 @@ def __init__(self, host, port, afi, **configs):
214216
if self.config['metrics']:
215217
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
216218
self.config['metric_group_prefix'],
217-
self.config['node_id'])
219+
self.node_id)
218220

219221
def connect(self):
220222
"""Attempt to connect and return ConnectionState"""
@@ -904,7 +906,7 @@ def connect():
904906

905907
def __repr__(self):
906908
return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
907-
self.config['node_id'], self.hostname, self.host, self.port)
909+
self.node_id, self.hostname, self.host, self.port)
908910

909911

910912
class BrokerConnectionMetrics(object):

kafka/producer/kafka.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ class KafkaProducer(object):
266266
'linger_ms': 0,
267267
'partitioner': DefaultPartitioner(),
268268
'buffer_memory': 33554432,
269-
'connections_max_idle_ms': 600000, # not implemented yet
269+
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
270270
'max_block_ms': 60000,
271271
'max_request_size': 1048576,
272272
'metadata_max_age_ms': 300000,

test/test_client_async.py

+36-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import absolute_import, division
2+
13
# selectors in stdlib as of py3.4
24
try:
35
import selectors # pylint: disable=import-error
@@ -10,7 +12,7 @@
1012

1113
import pytest
1214

13-
from kafka.client_async import KafkaClient
15+
from kafka.client_async import KafkaClient, IdleConnectionManager
1416
from kafka.conn import ConnectionStates
1517
import kafka.errors as Errors
1618
from kafka.future import Future
@@ -319,7 +321,10 @@ def client(mocker):
319321
mocker.patch.object(KafkaClient, '_bootstrap')
320322
_poll = mocker.patch.object(KafkaClient, '_poll')
321323

322-
cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
324+
cli = KafkaClient(request_timeout_ms=9999999,
325+
reconnect_backoff_ms=2222,
326+
connections_max_idle_ms=float('inf'),
327+
api_version=(0, 9))
323328

324329
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
325330
tasks.return_value = 9999999
@@ -395,3 +400,32 @@ def test_schedule():
395400

396401
def test_unschedule():
397402
pass
403+
404+
405+
def test_idle_connection_manager(mocker):
406+
t = mocker.patch.object(time, 'time')
407+
t.return_value = 0
408+
409+
idle = IdleConnectionManager(100)
410+
assert idle.next_check_ms() == float('inf')
411+
412+
idle.update('foo')
413+
assert not idle.is_expired('foo')
414+
assert idle.poll_expired_connection() is None
415+
assert idle.next_check_ms() == 100
416+
417+
t.return_value = 90 / 1000
418+
assert not idle.is_expired('foo')
419+
assert idle.poll_expired_connection() is None
420+
assert idle.next_check_ms() == 10
421+
422+
t.return_value = 100 / 1000
423+
assert idle.is_expired('foo')
424+
assert idle.next_check_ms() == 0
425+
426+
conn_id, conn_ts = idle.poll_expired_connection()
427+
assert conn_id == 'foo'
428+
assert conn_ts == 0
429+
430+
idle.remove('foo')
431+
assert idle.next_check_ms() == float('inf')

0 commit comments

Comments
 (0)