From 290745fe56e3f86f1eee1e38354c6c6e789e12ce Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 8 May 2025 12:02:45 -0700 Subject: [PATCH 1/3] Add kafka.coordinator.heartbeat logger for heartbeat logs --- kafka/coordinator/base.py | 49 ++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 1592f9154..c0f18d5fc 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -19,6 +19,7 @@ from kafka.util import Timer log = logging.getLogger('kafka.coordinator') +heartbeat_log = logging.getLogger('kafka.coordinator.heartbeat') class MemberState(object): @@ -815,11 +816,11 @@ def _start_heartbeat_thread(self): raise Errors.UnsupportedVersionError('Heartbeat APIs require 0.9+ broker') with self._lock: if self._heartbeat_thread is None: - log.info('Starting new heartbeat thread') + heartbeat_log.info('Starting new heartbeat thread') self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) self._heartbeat_thread.daemon = True self._heartbeat_thread.start() - log.debug("Started heartbeat thread %s", self._heartbeat_thread.ident) + heartbeat_log.debug("Started heartbeat thread %s", self._heartbeat_thread.ident) def _disable_heartbeat_thread(self): with self._lock: @@ -829,7 +830,7 @@ def _disable_heartbeat_thread(self): def _close_heartbeat_thread(self, timeout_ms=None): with self._lock: if self._heartbeat_thread is not None: - log.info('Stopping heartbeat thread') + heartbeat_log.info('Stopping heartbeat thread') try: self._heartbeat_thread.close(timeout_ms=timeout_ms) except ReferenceError: @@ -893,7 +894,7 @@ def _send_heartbeat_request(self): request = HeartbeatRequest[version](self.group_id, self._generation.generation_id, self._generation.member_id) - log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member + heartbeat_log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future, time.time()) @@ -906,38 +907,38 @@ def _handle_heartbeat_response(self, future, send_time, response): self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("Received successful heartbeat response for group %s", + heartbeat_log.debug("Received successful heartbeat response for group %s", self.group_id) future.success(None) elif error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): - log.warning("Heartbeat failed for group %s: coordinator (node %s)" + heartbeat_log.warning("Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", self.group_id, self.coordinator()) self.coordinator_dead(error_type()) future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: - log.warning("Heartbeat failed for group %s because it is" + heartbeat_log.warning("Heartbeat failed for group %s because it is" " rebalancing", self.group_id) self.request_rejoin() future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: - log.warning("Heartbeat failed for group %s: generation id is not " + heartbeat_log.warning("Heartbeat failed for group %s: generation id is not " " current.", self.group_id) self.reset_generation() future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: - log.warning("Heartbeat: local member_id was not recognized;" + heartbeat_log.warning("Heartbeat: local member_id was not recognized;" " this consumer needs to re-join") self.reset_generation() future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: error = error_type(self.group_id) - log.error("Heartbeat failed: authorization error: %s", error) + heartbeat_log.error("Heartbeat failed: authorization error: %s", error) future.failure(error) else: error = error_type() - log.error("Heartbeat failed: Unhandled error: %s", error) + heartbeat_log.error("Heartbeat failed: Unhandled error: %s", error) future.failure(error) @@ -1003,14 +1004,14 @@ def __init__(self, coordinator): def enable(self): with self.coordinator._lock: - log.debug('Enabling heartbeat thread') + heartbeat_log.debug('Enabling heartbeat thread') self.enabled = True self.coordinator.heartbeat.reset_timeouts() self.coordinator._lock.notify() def disable(self): with self.coordinator._lock: - log.debug('Disabling heartbeat thread') + heartbeat_log.debug('Disabling heartbeat thread') self.enabled = False def close(self, timeout_ms=None): @@ -1032,24 +1033,24 @@ def close(self, timeout_ms=None): timeout_ms = self.coordinator.config['heartbeat_interval_ms'] self.join(timeout_ms / 1000) if self.is_alive(): - log.warning("Heartbeat thread did not fully terminate during close") + heartbeat_log.warning("Heartbeat thread did not fully terminate during close") def run(self): try: - log.debug('Heartbeat thread started') + heartbeat_log.debug('Heartbeat thread started') while not self.closed: self._run_once() except ReferenceError: - log.debug('Heartbeat thread closed due to coordinator gc') + heartbeat_log.debug('Heartbeat thread closed due to coordinator gc') except RuntimeError as e: - log.error("Heartbeat thread for group %s failed due to unexpected error: %s", + heartbeat_log.error("Heartbeat thread for group %s failed due to unexpected error: %s", self.coordinator.group_id, e) self.failed = e finally: - log.debug('Heartbeat thread closed') + heartbeat_log.debug('Heartbeat thread closed') def _run_once(self): with self.coordinator._client._lock, self.coordinator._lock: @@ -1063,16 +1064,16 @@ def _run_once(self): with self.coordinator._lock: if not self.enabled: - log.debug('Heartbeat disabled. Waiting') + heartbeat_log.debug('Heartbeat disabled. Waiting') self.coordinator._lock.wait() - log.debug('Heartbeat re-enabled.') + heartbeat_log.debug('Heartbeat re-enabled.') return if self.coordinator.state is not MemberState.STABLE: # the group is not stable (perhaps because we left the # group or because the coordinator kicked us out), so # disable heartbeats and wait for the main thread to rejoin. - log.debug('Group state is not stable, disabling heartbeats') + heartbeat_log.debug('Group state is not stable, disabling heartbeats') self.disable() return @@ -1088,14 +1089,14 @@ def _run_once(self): # the session timeout has expired without seeing a # successful heartbeat, so we should probably make sure # the coordinator is still healthy. - log.warning('Heartbeat session expired, marking coordinator dead') + heartbeat_log.warning('Heartbeat session expired, marking coordinator dead') self.coordinator.coordinator_dead('Heartbeat session expired') elif self.coordinator.heartbeat.poll_timeout_expired(): # the poll timeout has expired, which means that the # foreground thread has stalled in between calls to # poll(), so we explicitly leave the group. - log.warning('Heartbeat poll expired, leaving group') + heartbeat_log.warning('Heartbeat poll expired, leaving group') ### XXX # maybe_leave_group acquires client + coordinator lock; # if we hold coordinator lock before calling, we risk deadlock @@ -1106,7 +1107,7 @@ def _run_once(self): elif not self.coordinator.heartbeat.should_heartbeat(): # poll again after waiting for the retry backoff in case # the heartbeat failed or the coordinator disconnected - log.log(0, 'Not ready to heartbeat, waiting') + heartbeat_log.log(0, 'Not ready to heartbeat, waiting') self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) else: From 2944d884e8ffd4f60670303e51c9f69028c5a801 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 8 May 2025 12:03:05 -0700 Subject: [PATCH 2/3] minor log change --- kafka/coordinator/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c0f18d5fc..2dc170686 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -780,7 +780,7 @@ def _handle_group_coordinator_response(self, future, response): future.failure(error) else: error = error_type() - log.error("Group coordinator lookup for group %s failed: %s", + log.error("Group Coordinator lookup for group %s failed: %s", self.group_id, error) future.failure(error) From 872a1b2a3c4bd297d6c18a736517cf0ec9a9412a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 8 May 2025 12:03:38 -0700 Subject: [PATCH 3/3] Only disable heartbeat thread once at beginning of join-group --- kafka/coordinator/base.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 2dc170686..448659e62 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -450,11 +450,12 @@ def join_group(self, timeout_ms=None): timeout_ms=timer.timeout_ms) self.rejoining = True - # fence off the heartbeat thread explicitly so that it cannot - # interfere with the join group. # Note that this must come after - # the call to onJoinPrepare since we must be able to continue - # sending heartbeats if that callback takes some time. - self._disable_heartbeat_thread() + # fence off the heartbeat thread explicitly so that it cannot + # interfere with the join group. # Note that this must come after + # the call to onJoinPrepare since we must be able to continue + # sending heartbeats if that callback takes some time. + log.debug("Disabling heartbeat thread during join-group") + self._disable_heartbeat_thread() # ensure that there are no pending requests to the coordinator. # This is important in particular to avoid resending a pending