From bc4291535423b2adc762cbea1d65d09a6610358e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 31 Mar 2019 12:46:27 -0700 Subject: [PATCH 1/4] Do not call conn state_change_callback with lock * Remove DISCONNECTING connection state * do not call state_change_callback with lock * Pass node_id, socket, and connection object to callback --- kafka/client_async.py | 17 +++++++++-------- kafka/conn.py | 32 +++++++++++++++++++------------- test/test_client_async.py | 21 +++++++++++---------- 3 files changed, 39 insertions(+), 31 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index a86ab556d..6eaa3dbe4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -260,16 +260,17 @@ def _can_connect(self, node_id): conn = self._conns[node_id] return conn.disconnected() and not conn.blacked_out() - def _conn_state_change(self, node_id, conn): + def _conn_state_change(self, node_id, sock, conn): + close_conns = [] with self._lock: if conn.connecting(): # SSL connections can enter this state 2x (second during Handshake) if node_id not in self._connecting: self._connecting.add(node_id) try: - self._selector.register(conn._sock, selectors.EVENT_WRITE) + key_selector = self._selector.register(sock, selectors.EVENT_WRITE) except KeyError: - self._selector.modify(conn._sock, selectors.EVENT_WRITE) + key_selector = self._selector.modify(sock, selectors.EVENT_WRITE) if self.cluster.is_bootstrap(node_id): self._last_bootstrap = time.time() @@ -280,9 +281,9 @@ def _conn_state_change(self, node_id, conn): self._connecting.remove(node_id) try: - self._selector.modify(conn._sock, selectors.EVENT_READ, conn) + self._selector.modify(sock, selectors.EVENT_READ, conn) except KeyError: - self._selector.register(conn._sock, selectors.EVENT_READ, conn) + self._selector.register(sock, selectors.EVENT_READ, conn) if self._sensors: self._sensors.connection_created.record() @@ -298,11 +299,11 @@ def _conn_state_change(self, node_id, conn): self._conns.pop(node_id).close() # Connection failures imply that our metadata is stale, so let's refresh - elif conn.state is ConnectionStates.DISCONNECTING: + elif conn.state is ConnectionStates.DISCONNECTED: if node_id in self._connecting: self._connecting.remove(node_id) try: - self._selector.unregister(conn._sock) + self._selector.unregister(sock) except KeyError: pass @@ -369,7 +370,7 @@ def _maybe_connect(self, node_id): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) host, port, afi = get_ip_port_afi(broker.host) - cb = functools.partial(WeakMethod(self._conn_state_change), node_id) + cb = WeakMethod(self._conn_state_change) conn = BrokerConnection(host, broker.port, afi, state_change_callback=cb, node_id=node_id, diff --git a/kafka/conn.py b/kafka/conn.py index a00206f5c..29eebf747 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -212,7 +212,7 @@ class BrokerConnection(object): 'ssl_ciphers': None, 'api_version': (0, 8, 2), # default to most restrictive 'selector': selectors.DefaultSelector, - 'state_change_callback': lambda conn: True, + 'state_change_callback': lambda node_id, sock, conn: True, 'metrics': None, 'metric_group_prefix': '', 'sasl_mechanism': None, @@ -357,6 +357,7 @@ def connect(self): return self.state else: log.debug('%s: creating new socket', self) + assert self._sock is None self._sock_afi, self._sock_addr = next_lookup self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) @@ -366,7 +367,7 @@ def connect(self): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) log.info('%s: connecting to %s:%d [%s %s]', self, self.host, self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) @@ -386,21 +387,21 @@ def connect(self): if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): log.debug('%s: initiating SSL handshake', self) self.state = ConnectionStates.HANDSHAKE - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) # _wrap_ssl can alter the connection state -- disconnects on failure self._wrap_ssl() elif self.config['security_protocol'] == 'SASL_PLAINTEXT': log.debug('%s: initiating SASL authentication', self) self.state = ConnectionStates.AUTHENTICATING - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) else: # security_protocol PLAINTEXT log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -425,7 +426,7 @@ def connect(self): log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') @@ -435,7 +436,7 @@ def connect(self): log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) if self.state not in (ConnectionStates.CONNECTED, ConnectionStates.DISCONNECTED): @@ -806,11 +807,7 @@ def close(self, error=None): if self.state is ConnectionStates.DISCONNECTED: return log.info('%s: Closing connection. %s', self, error or '') - self.state = ConnectionStates.DISCONNECTING - self.config['state_change_callback'](self) self._update_reconnect_backoff() - self._close_socket() - self.state = ConnectionStates.DISCONNECTED self._sasl_auth_future = None self._protocol = KafkaProtocol( client_id=self.config['client_id'], @@ -819,9 +816,18 @@ def close(self, error=None): error = Errors.Cancelled(str(self)) ifrs = list(self.in_flight_requests.items()) self.in_flight_requests.clear() - self.config['state_change_callback'](self) + self.state = ConnectionStates.DISCONNECTED + # To avoid race conditions and/or deadlocks + # keep a reference to the socket but leave it + # open until after the state_change_callback + # This should give clients a change to deregister + # the socket fd from selectors cleanly. + sock = self._sock + self._sock = None - # drop lock before processing futures + # drop lock before state change callback and processing futures + self.config['state_change_callback'](self.node_id, sock, self) + sock.close() for (_correlation_id, (future, _timestamp)) in ifrs: future.failure(error) diff --git a/test/test_client_async.py b/test/test_client_async.py index 0951cb414..2132c8e4c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -95,28 +95,29 @@ def test_conn_state_change(mocker, cli, conn): node_id = 0 cli._conns[node_id] = conn conn.state = ConnectionStates.CONNECTING - cli._conn_state_change(node_id, conn) + sock = conn._sock + cli._conn_state_change(node_id, sock, conn) assert node_id in cli._connecting - sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE) + sel.register.assert_called_with(sock, selectors.EVENT_WRITE) conn.state = ConnectionStates.CONNECTED - cli._conn_state_change(node_id, conn) + cli._conn_state_change(node_id, sock, conn) assert node_id not in cli._connecting - sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn) + sel.modify.assert_called_with(sock, selectors.EVENT_READ, conn) # Failure to connect should trigger metadata update assert cli.cluster._need_update is False - conn.state = ConnectionStates.DISCONNECTING - cli._conn_state_change(node_id, conn) + conn.state = ConnectionStates.DISCONNECTED + cli._conn_state_change(node_id, sock, conn) assert node_id not in cli._connecting assert cli.cluster._need_update is True - sel.unregister.assert_called_with(conn._sock) + sel.unregister.assert_called_with(sock) conn.state = ConnectionStates.CONNECTING - cli._conn_state_change(node_id, conn) + cli._conn_state_change(node_id, sock, conn) assert node_id in cli._connecting - conn.state = ConnectionStates.DISCONNECTING - cli._conn_state_change(node_id, conn) + conn.state = ConnectionStates.DISCONNECTED + cli._conn_state_change(node_id, sock, conn) assert node_id not in cli._connecting From 1a532ff0f80c0d8244bb3f1a2ccf9f963f9db8c5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 1 Apr 2019 20:03:27 -0700 Subject: [PATCH 2/4] drop unused key_selector assignment --- kafka/client_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 6eaa3dbe4..eff8a2bd7 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -268,9 +268,9 @@ def _conn_state_change(self, node_id, sock, conn): if node_id not in self._connecting: self._connecting.add(node_id) try: - key_selector = self._selector.register(sock, selectors.EVENT_WRITE) + self._selector.register(sock, selectors.EVENT_WRITE) except KeyError: - key_selector = self._selector.modify(sock, selectors.EVENT_WRITE) + self._selector.modify(sock, selectors.EVENT_WRITE) if self.cluster.is_bootstrap(node_id): self._last_bootstrap = time.time() From 9df0789ef8f5170b6b4da176134adbced2fc56cb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 1 Apr 2019 20:20:06 -0700 Subject: [PATCH 3/4] Short-circuit lock acquire in conn.close() --- kafka/conn.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/conn.py b/kafka/conn.py index 29eebf747..044d2d5d6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -803,6 +803,8 @@ def close(self, error=None): will be failed with this exception. Default: kafka.errors.KafkaConnectionError. """ + if self.state is ConnectionStates.DISCONNECTED: + return with self._lock: if self.state is ConnectionStates.DISCONNECTED: return From 809f35feaef539b4efcd5def1d8141a93d85f7fb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 1 Apr 2019 20:32:54 -0700 Subject: [PATCH 4/4] Remove close_conns --- kafka/client_async.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index eff8a2bd7..77efac869 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -261,7 +261,6 @@ def _can_connect(self, node_id): return conn.disconnected() and not conn.blacked_out() def _conn_state_change(self, node_id, sock, conn): - close_conns = [] with self._lock: if conn.connecting(): # SSL connections can enter this state 2x (second during Handshake)