From a6298c712a7a3c866f15c471413a73e9d1e250c0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 31 Mar 2019 10:10:32 -0700 Subject: [PATCH] Dont call conn.close() with client _lock in state change callback --- kafka/client_async.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index b6adb775b..88d2099f6 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -261,6 +261,7 @@ def _can_connect(self, node_id): return conn.disconnected() and not conn.blacked_out() def _conn_state_change(self, node_id, conn): + close_conns = [] with self._lock: if conn.connecting(): # SSL connections can enter this state 2x (second during Handshake) @@ -295,7 +296,7 @@ def _conn_state_change(self, node_id, conn): else: for node_id in list(self._conns.keys()): if self.cluster.is_bootstrap(node_id): - self._conns.pop(node_id).close() + close_conns.append(self._conns.pop(node_id)) # Connection failures imply that our metadata is stale, so let's refresh elif conn.state is ConnectionStates.DISCONNECTING: @@ -321,6 +322,9 @@ def _conn_state_change(self, node_id, conn): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() + for conn in close_conns: + conn.close() + def maybe_connect(self, node_id, wakeup=True): """Queues a node for asynchronous connection during the next .poll()""" if self._can_connect(node_id):