@@ -135,6 +135,7 @@ class KafkaClient(object):
135
135
'bootstrap_servers' : 'localhost' ,
136
136
'client_id' : 'kafka-python-' + __version__ ,
137
137
'request_timeout_ms' : 40000 ,
138
+ 'connections_max_idle_ms' : 9 * 60 * 1000 ,
138
139
'reconnect_backoff_ms' : 50 ,
139
140
'max_in_flight_requests_per_connection' : 5 ,
140
141
'receive_buffer_bytes' : None ,
@@ -194,6 +195,7 @@ def __init__(self, **configs):
194
195
self ._wake_r .setblocking (False )
195
196
self ._wake_lock = threading .Lock ()
196
197
self ._selector .register (self ._wake_r , selectors .EVENT_READ )
198
+ self ._idle_expiry_manager = IdleConnectionManager (self .config ['connections_max_idle_ms' ])
197
199
self ._closed = False
198
200
self ._sensors = None
199
201
if self .config ['metrics' ]:
@@ -291,6 +293,8 @@ def _conn_state_change(self, node_id, conn):
291
293
if self ._sensors :
292
294
self ._sensors .connection_created .record ()
293
295
296
+ self ._idle_expiry_manager .update (node_id )
297
+
294
298
if 'bootstrap' in self ._conns and node_id != 'bootstrap' :
295
299
bootstrap = self ._conns .pop ('bootstrap' )
296
300
# XXX: make conn.close() require error to cause refresh
@@ -308,7 +312,13 @@ def _conn_state_change(self, node_id, conn):
308
312
pass
309
313
if self ._sensors :
310
314
self ._sensors .connection_closed .record ()
311
- if self ._refresh_on_disconnects and not self ._closed :
315
+
316
+ idle_disconnect = False
317
+ if self ._idle_expiry_manager .is_expired (node_id ):
318
+ idle_disconnect = True
319
+ self ._idle_expiry_manager .remove (node_id )
320
+
321
+ if self ._refresh_on_disconnects and not self ._closed and not idle_disconnect :
312
322
log .warning ("Node %s connection failed -- refreshing metadata" , node_id )
313
323
self .cluster .request_update ()
314
324
@@ -514,10 +524,12 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
514
524
if future and future .is_done :
515
525
timeout = 0
516
526
else :
527
+ idle_connection_timeout_ms = self ._idle_expiry_manager .next_check_ms ()
517
528
timeout = min (
518
529
timeout_ms ,
519
530
metadata_timeout_ms ,
520
531
self ._delayed_tasks .next_at () * 1000 ,
532
+ idle_connection_timeout_ms ,
521
533
self .config ['request_timeout_ms' ])
522
534
timeout = max (0 , timeout / 1000.0 ) # avoid negative timeouts
523
535
@@ -572,6 +584,8 @@ def _poll(self, timeout, sleep=True):
572
584
conn .close (Errors .ConnectionError ('Socket EVENT_READ without in-flight-requests' ))
573
585
continue
574
586
587
+ self ._idle_expiry_manager .update (conn .node_id )
588
+
575
589
# Accumulate as many responses as the connection has pending
576
590
while conn .in_flight_requests :
577
591
response = conn .recv () # Note: conn.recv runs callbacks / errbacks
@@ -601,6 +615,7 @@ def _poll(self, timeout, sleep=True):
601
615
602
616
if self ._sensors :
603
617
self ._sensors .io_time .record ((time .time () - end_select ) * 1000000000 )
618
+ self ._maybe_close_oldest_connection ()
604
619
return responses
605
620
606
621
def in_flight_request_count (self , node_id = None ):
@@ -846,6 +861,14 @@ def _clear_wake_fd(self):
846
861
except socket .error :
847
862
break
848
863
864
+ def _maybe_close_oldest_connection (self ):
865
+ expired_connection = self ._idle_expiry_manager .poll_expired_connection ()
866
+ if expired_connection :
867
+ conn_id , ts = expired_connection
868
+ idle_ms = (time .time () - ts ) * 1000
869
+ log .info ('Closing idle connection %s, last active %d ms ago' , conn_id , idle_ms )
870
+ self .close (node_id = conn_id )
871
+
849
872
850
873
class DelayedTaskQueue (object ):
851
874
# see https://docs.python.org/2/library/heapq.html
@@ -920,6 +943,76 @@ def pop_ready(self):
920
943
return ready_tasks
921
944
922
945
946
+ # OrderedDict requires python2.7+
947
+ try :
948
+ from collections import OrderedDict
949
+ except ImportError :
950
+ # If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads
951
+ OrderedDict = dict
952
+
953
+
954
+ class IdleConnectionManager (object ):
955
+ def __init__ (self , connections_max_idle_ms ):
956
+ if connections_max_idle_ms > 0 :
957
+ self .connections_max_idle = connections_max_idle_ms / 1000
958
+ else :
959
+ self .connections_max_idle = float ('inf' )
960
+ self .next_idle_close_check_time = None
961
+ self .update_next_idle_close_check_time (time .time ())
962
+ self .lru_connections = OrderedDict ()
963
+
964
+ def update (self , conn_id ):
965
+ # order should reflect last-update
966
+ if conn_id in self .lru_connections :
967
+ del self .lru_connections [conn_id ]
968
+ self .lru_connections [conn_id ] = time .time ()
969
+
970
+ def remove (self , conn_id ):
971
+ if conn_id in self .lru_connections :
972
+ del self .lru_connections [conn_id ]
973
+
974
+ def is_expired (self , conn_id ):
975
+ if conn_id not in self .lru_connections :
976
+ return None
977
+ return time .time () >= self .lru_connections [conn_id ] + self .connections_max_idle
978
+
979
+ def next_check_ms (self ):
980
+ now = time .time ()
981
+ if not self .lru_connections :
982
+ return float ('inf' )
983
+ elif self .next_idle_close_check_time <= now :
984
+ return 0
985
+ else :
986
+ return int ((self .next_idle_close_check_time - now ) * 1000 )
987
+
988
+ def update_next_idle_close_check_time (self , ts ):
989
+ self .next_idle_close_check_time = ts + self .connections_max_idle
990
+
991
+ def poll_expired_connection (self ):
992
+ if time .time () < self .next_idle_close_check_time :
993
+ return None
994
+
995
+ if not len (self .lru_connections ):
996
+ return None
997
+
998
+ oldest_conn_id = None
999
+ oldest_ts = None
1000
+ if OrderedDict is dict :
1001
+ for conn_id , ts in self .lru_connections .items ():
1002
+ if oldest_conn_id is None or ts < oldest_ts :
1003
+ oldest_conn_id = conn_id
1004
+ oldest_ts = ts
1005
+ else :
1006
+ (oldest_conn_id , oldest_ts ) = next (iter (self .lru_connections .items ()))
1007
+
1008
+ self .update_next_idle_close_check_time (oldest_ts )
1009
+
1010
+ if time .time () >= oldest_ts + self .connections_max_idle :
1011
+ return (oldest_conn_id , oldest_ts )
1012
+ else :
1013
+ return None
1014
+
1015
+
923
1016
class KafkaClientMetrics (object ):
924
1017
def __init__ (self , metrics , metric_group_prefix , conns ):
925
1018
self .metrics = metrics
0 commit comments