From 67e787be82e4f2bf84e2aac9c2cb6bf49bb8788b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 12 Mar 2025 20:46:12 -0700 Subject: [PATCH] Grab client lock before coordinator lock when sending from heartbeat thread --- kafka/coordinator/base.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 0d4aedb88..3f063684d 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1010,6 +1010,7 @@ def _run_once(self): # properly in the case that no brokers are available # to connect to (and the future is automatically failed). self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + return elif self.coordinator.heartbeat.session_timeout_expired(): # the session timeout has expired without seeing a @@ -1017,6 +1018,7 @@ def _run_once(self): # the coordinator is still healthy. log.warning('Heartbeat session expired, marking coordinator dead') self.coordinator.coordinator_dead('Heartbeat session expired') + return elif self.coordinator.heartbeat.poll_timeout_expired(): # the poll timeout has expired, which means that the @@ -1029,18 +1031,20 @@ def _run_once(self): # release() is safe here because this is the last code in the current context self.coordinator._lock.release() self.coordinator.maybe_leave_group() + return elif not self.coordinator.heartbeat.should_heartbeat(): # poll again after waiting for the retry backoff in case # the heartbeat failed or the coordinator disconnected log.log(0, 'Not ready to heartbeat, waiting') self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + return - else: - self.coordinator.heartbeat.sent_heartbeat() - future = self.coordinator._send_heartbeat_request() - future.add_callback(self._handle_heartbeat_success) - future.add_errback(self._handle_heartbeat_failure) + with self.coordinator._client._lock, self.coordinator._lock: + self.coordinator.heartbeat.sent_heartbeat() + future = self.coordinator._send_heartbeat_request() + future.add_callback(self._handle_heartbeat_success) + future.add_errback(self._handle_heartbeat_failure) def _handle_heartbeat_success(self, result): with self.coordinator._lock: