diff --git a/kafka/client_async.py b/kafka/client_async.py index b395dc5da..b46b879f9 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -59,6 +59,9 @@ class KafkaClient: rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000. + connection_timeout_ms (int): Connection timeout in milliseconds. + Default: None, which defaults it to the same value as + request_timeout_ms. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. connections_max_idle_ms: Close idle connections after the number of @@ -145,6 +148,7 @@ class KafkaClient: 'bootstrap_servers': 'localhost', 'bootstrap_topics_filter': set(), 'client_id': 'kafka-python-' + __version__, + 'connection_timeout_ms': None, 'request_timeout_ms': 30000, 'wakeup_timeout_ms': 3000, 'connections_max_idle_ms': 9 * 60 * 1000, diff --git a/kafka/conn.py b/kafka/conn.py index 5a73ba429..745e4bca6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -68,13 +68,6 @@ class SSLWantWriteError(Exception): gssapi = None GSSError = None -# needed for AWS_MSK_IAM authentication: -try: - from botocore.session import Session as BotoSession -except ImportError: - # no botocore available, will disable AWS_MSK_IAM mechanism - BotoSession = None - AFI_NAMES = { socket.AF_UNSPEC: "unspecified", socket.AF_INET: "IPv4", @@ -112,6 +105,9 @@ class BrokerConnection: rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000. + connection_timeout_ms (int): Connection timeout in milliseconds. + Default: None, which defaults it to the same value as + request_timeout_ms. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. max_in_flight_requests_per_connection (int): Requests are pipelined @@ -188,6 +184,7 @@ class BrokerConnection: 'client_id': 'kafka-python-' + __version__, 'node_id': 0, 'request_timeout_ms': 30000, + 'connection_timeout_ms': None, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, 'max_in_flight_requests_per_connection': 5, @@ -232,6 +229,9 @@ def __init__(self, host, port, afi, **configs): if key in configs: self.config[key] = configs[key] + if self.config['connection_timeout_ms'] is None: + self.config['connection_timeout_ms'] = self.config['request_timeout_ms'] + self.node_id = self.config.pop('node_id') if self.config['receive_buffer_bytes'] is not None: @@ -246,19 +246,15 @@ def __init__(self, host, port, afi, **configs): assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, ( 'security_protocol must be in ' + ', '.join(self.SECURITY_PROTOCOLS)) - if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): assert ssl_available, "Python wasn't built with SSL support" - if self.config['sasl_mechanism'] == 'AWS_MSK_IAM': - assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' - assert self.config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' - if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): assert self.config['sasl_mechanism'] in sasl.MECHANISMS, ( 'sasl_mechanism must be one of {}'.format(', '.join(sasl.MECHANISMS.keys())) ) sasl.MECHANISMS[self.config['sasl_mechanism']].validate_config(self) + # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -284,7 +280,10 @@ def __init__(self, host, port, afi, **configs): if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] self._sasl_auth_future = None - self.last_attempt = 0 + self.last_activity = 0 + # This value is not used for internal state, but it is left to allow backwards-compatability + # The variable last_activity is now used instead, but is updated more often may therefore break compatability with some hacks. + self.last_attempt= 0 self._gai = [] self._sensors = None if self.config['metrics']: @@ -362,6 +361,7 @@ def connect(self): self.config['state_change_callback'](self.node_id, self._sock, self) log.info('%s: connecting to %s:%d [%s %s]', self, self.host, self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) + self.last_activity = time.time() if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -394,6 +394,7 @@ def connect(self): self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() self.config['state_change_callback'](self.node_id, self._sock, self) + self.last_activity = time.time() # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -419,6 +420,7 @@ def connect(self): self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() self.config['state_change_callback'](self.node_id, self._sock, self) + self.last_activity = time.time() if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') @@ -429,12 +431,13 @@ def connect(self): self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() self.config['state_change_callback'](self.node_id, self._sock, self) + self.last_activity = time.time() if self.state not in (ConnectionStates.CONNECTED, ConnectionStates.DISCONNECTED): # Connection timed out - request_timeout = self.config['request_timeout_ms'] / 1000.0 - if time.time() > request_timeout + self.last_attempt: + request_timeout = self.config['connection_timeout_ms'] / 1000.0 + if time.time() > request_timeout + self.last_activity: log.error('Connection attempt to %s timed out', self) self.close(Errors.KafkaConnectionError('timeout')) return self.state @@ -595,7 +598,7 @@ def blacked_out(self): re-establish a connection yet """ if self.state is ConnectionStates.DISCONNECTED: - if time.time() < self.last_attempt + self._reconnect_backoff: + if time.time() < self.last_activity + self._reconnect_backoff: return True return False @@ -606,7 +609,7 @@ def connection_delay(self): the reconnect backoff time. When connecting or connected, returns a very large number to handle slow/stalled connections. """ - time_waited = time.time() - (self.last_attempt or 0) + time_waited = time.time() - (self.last_activity or 0) if self.state is ConnectionStates.DISCONNECTED: return max(self._reconnect_backoff - time_waited, 0) * 1000 else: diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f58221372..b9b2433d9 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -190,6 +190,9 @@ class KafkaProducer: brokers or partitions. Default: 300000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. + connection_timeout_ms (int): Connection timeout in milliseconds. + Default: None, which defaults it to the same value as + request_timeout_ms. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. receive_buffer_bytes (int): The size of the TCP receive buffer @@ -300,6 +303,7 @@ class KafkaProducer: 'max_request_size': 1048576, 'metadata_max_age_ms': 300000, 'retry_backoff_ms': 100, + 'connection_timeout_ms': None, 'request_timeout_ms': 30000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index ebea5dc5a..2ae88d326 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -10,10 +10,20 @@ from kafka.protocol.types import Int32 import kafka.errors as Errors -from botocore.session import Session as BotoSession # importing it in advance is not an option apparently... +# needed for AWS_MSK_IAM authentication: +try: + from botocore.session import Session as BotoSession +except ImportError: + # no botocore available, will disable AWS_MSK_IAM mechanism + BotoSession = None + from typing import Optional +def validate_config(conn): + assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' + assert conn.config.get('security_protocol') == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' + def try_authenticate(self, future): session = BotoSession() @@ -25,7 +35,7 @@ def try_authenticate(self, future): region=session.get_config_variable('region'), token=credentials.token, ) - + msg = client.first_message() size = Int32.encode(len(msg)) diff --git a/test/test_conn.py b/test/test_conn.py index d595fac3a..979f25e31 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -6,6 +6,7 @@ import socket import pytest +import time from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts from kafka.protocol.api import RequestHeader @@ -61,28 +62,99 @@ def test_connect_timeout(_socket, conn): # Initial connect returns EINPROGRESS # immediate inline connect returns EALREADY # second explicit connect returns EALREADY - # third explicit connect returns EALREADY and times out via last_attempt + # third explicit connect returns EALREADY and times out via last_activity _socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY] conn.connect() assert conn.state is ConnectionStates.CONNECTING conn.connect() assert conn.state is ConnectionStates.CONNECTING + conn.last_activity = 0 conn.last_attempt = 0 conn.connect() assert conn.state is ConnectionStates.DISCONNECTED +def test_connect_timeout_slowconn(_socket, conn, mocker): + # Same as test_connect_timeout, + # but we make the connection run longer than the timeout in order to test that + # BrokerConnection resets the timer whenever things happen during the connection + # See https://github.com/dpkp/kafka-python/issues/2386 + _socket.connect_ex.side_effect = [EINPROGRESS, EISCONN] + + # 0.8 = we guarantee that when testing with three intervals of this we are past the timeout + time_between_connect = (conn.config['connection_timeout_ms']/1000) * 0.8 + start = time.time() + + # Use plaintext auth for simplicity + last_activity = conn.last_activity + last_attempt = conn.last_attempt + conn.config['security_protocol'] = 'SASL_PLAINTEXT' + conn.connect() + assert conn.state is ConnectionStates.CONNECTING + # Ensure the last_activity counter was updated + # Last_attempt should also be updated + assert conn.last_activity > last_activity + assert conn.last_attempt > last_attempt + last_attempt = conn.last_attempt + last_activity = conn.last_activity + + # Simulate time being passed + # This shouldn't be enough time to time out the connection + conn._try_authenticate = mocker.Mock(side_effect=[False, False, True]) + with mock.patch("time.time", return_value=start+time_between_connect): + # This should trigger authentication + # Note that an authentication attempt isn't actually made until now. + # We simulate that authentication does not succeed at this point + # This is technically incorrect, but it lets us see what happens + # to the state machine when the state doesn't change for two function calls + conn.connect() + assert conn.last_activity > last_activity + # Last attempt is kept as a legacy variable, should not update + assert conn.last_attempt == last_attempt + last_activity = conn.last_activity + + assert conn.state is ConnectionStates.AUTHENTICATING + + + # This time around we should be way past timeout. + # Now we care about connect() not terminating the attempt, + # because connection state was progressed in the meantime. + with mock.patch("time.time", return_value=start+time_between_connect*2): + # Simulate this one not succeeding as well. This is so we can ensure things don't time out + conn.connect() + + # No state change = no activity change + assert conn.last_activity == last_activity + assert conn.last_attempt == last_attempt + + # If last_activity was not reset when the state transitioned to AUTHENTICATING, + # the connection state would be timed out now. + assert conn.state is ConnectionStates.AUTHENTICATING + + + # This time around, the connection should succeed. + with mock.patch("time.time", return_value=start+time_between_connect*3): + # This should finalize the connection + conn.connect() + + assert conn.last_activity > last_activity + assert conn.last_attempt == last_attempt + last_activity = conn.last_activity + + assert conn.state is ConnectionStates.CONNECTED + + def test_blacked_out(conn): with mock.patch("time.time", return_value=1000): - conn.last_attempt = 0 + conn.last_activity = 0 assert conn.blacked_out() is False - conn.last_attempt = 1000 + conn.last_activity = 1000 assert conn.blacked_out() is True def test_connection_delay(conn): with mock.patch("time.time", return_value=1000): - conn.last_attempt = 1000 + conn.last_activity = 1000 assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] conn.state = ConnectionStates.CONNECTING assert conn.connection_delay() == float('inf') @@ -286,7 +358,7 @@ def test_lookup_on_connect(): ] with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: - conn.last_attempt = 0 + conn.last_activity = 0 conn.connect() m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) assert conn._sock_afi == afi2 @@ -301,11 +373,10 @@ def test_relookup_on_failure(): assert conn.host == hostname mock_return1 = [] with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m: - last_attempt = conn.last_attempt + last_activity = conn.last_activity conn.connect() m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) assert conn.disconnected() - assert conn.last_attempt > last_attempt afi2 = socket.AF_INET sockaddr2 = ('127.0.0.2', 9092) @@ -314,12 +385,13 @@ def test_relookup_on_failure(): ] with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: - conn.last_attempt = 0 + conn.last_activity = 0 conn.connect() m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) assert conn._sock_afi == afi2 assert conn._sock_addr == sockaddr2 conn.close() + assert conn.last_activity > last_activity def test_requests_timed_out(conn):