Skip to content

Only disable heartbeat thread once at beginning of join-group #2617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 8, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from kafka.util import Timer

log = logging.getLogger('kafka.coordinator')
heartbeat_log = logging.getLogger('kafka.coordinator.heartbeat')


class MemberState(object):
Expand Down Expand Up @@ -449,11 +450,12 @@ def join_group(self, timeout_ms=None):
timeout_ms=timer.timeout_ms)
self.rejoining = True

# fence off the heartbeat thread explicitly so that it cannot
# interfere with the join group. # Note that this must come after
# the call to onJoinPrepare since we must be able to continue
# sending heartbeats if that callback takes some time.
self._disable_heartbeat_thread()
# fence off the heartbeat thread explicitly so that it cannot
# interfere with the join group. # Note that this must come after
# the call to onJoinPrepare since we must be able to continue
# sending heartbeats if that callback takes some time.
log.debug("Disabling heartbeat thread during join-group")
self._disable_heartbeat_thread()

# ensure that there are no pending requests to the coordinator.
# This is important in particular to avoid resending a pending
Expand Down Expand Up @@ -779,7 +781,7 @@ def _handle_group_coordinator_response(self, future, response):
future.failure(error)
else:
error = error_type()
log.error("Group coordinator lookup for group %s failed: %s",
log.error("Group Coordinator lookup for group %s failed: %s",
self.group_id, error)
future.failure(error)

Expand Down Expand Up @@ -815,11 +817,11 @@ def _start_heartbeat_thread(self):
raise Errors.UnsupportedVersionError('Heartbeat APIs require 0.9+ broker')
with self._lock:
if self._heartbeat_thread is None:
log.info('Starting new heartbeat thread')
heartbeat_log.info('Starting new heartbeat thread')
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
self._heartbeat_thread.daemon = True
self._heartbeat_thread.start()
log.debug("Started heartbeat thread %s", self._heartbeat_thread.ident)
heartbeat_log.debug("Started heartbeat thread %s", self._heartbeat_thread.ident)

def _disable_heartbeat_thread(self):
with self._lock:
Expand All @@ -829,7 +831,7 @@ def _disable_heartbeat_thread(self):
def _close_heartbeat_thread(self, timeout_ms=None):
with self._lock:
if self._heartbeat_thread is not None:
log.info('Stopping heartbeat thread')
heartbeat_log.info('Stopping heartbeat thread')
try:
self._heartbeat_thread.close(timeout_ms=timeout_ms)
except ReferenceError:
Expand Down Expand Up @@ -893,7 +895,7 @@ def _send_heartbeat_request(self):
request = HeartbeatRequest[version](self.group_id,
self._generation.generation_id,
self._generation.member_id)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
heartbeat_log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_heartbeat_response, future, time.time())
Expand All @@ -906,38 +908,38 @@ def _handle_heartbeat_response(self, future, send_time, response):
self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful heartbeat response for group %s",
heartbeat_log.debug("Received successful heartbeat response for group %s",
self.group_id)
future.success(None)
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError):
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
heartbeat_log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
self.coordinator())
self.coordinator_dead(error_type())
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
log.warning("Heartbeat failed for group %s because it is"
heartbeat_log.warning("Heartbeat failed for group %s because it is"
" rebalancing", self.group_id)
self.request_rejoin()
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
log.warning("Heartbeat failed for group %s: generation id is not "
heartbeat_log.warning("Heartbeat failed for group %s: generation id is not "
" current.", self.group_id)
self.reset_generation()
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
log.warning("Heartbeat: local member_id was not recognized;"
heartbeat_log.warning("Heartbeat: local member_id was not recognized;"
" this consumer needs to re-join")
self.reset_generation()
future.failure(error_type)
elif error_type is Errors.GroupAuthorizationFailedError:
error = error_type(self.group_id)
log.error("Heartbeat failed: authorization error: %s", error)
heartbeat_log.error("Heartbeat failed: authorization error: %s", error)
future.failure(error)
else:
error = error_type()
log.error("Heartbeat failed: Unhandled error: %s", error)
heartbeat_log.error("Heartbeat failed: Unhandled error: %s", error)
future.failure(error)


Expand Down Expand Up @@ -1003,14 +1005,14 @@ def __init__(self, coordinator):

def enable(self):
with self.coordinator._lock:
log.debug('Enabling heartbeat thread')
heartbeat_log.debug('Enabling heartbeat thread')
self.enabled = True
self.coordinator.heartbeat.reset_timeouts()
self.coordinator._lock.notify()

def disable(self):
with self.coordinator._lock:
log.debug('Disabling heartbeat thread')
heartbeat_log.debug('Disabling heartbeat thread')
self.enabled = False

def close(self, timeout_ms=None):
Expand All @@ -1032,24 +1034,24 @@ def close(self, timeout_ms=None):
timeout_ms = self.coordinator.config['heartbeat_interval_ms']
self.join(timeout_ms / 1000)
if self.is_alive():
log.warning("Heartbeat thread did not fully terminate during close")
heartbeat_log.warning("Heartbeat thread did not fully terminate during close")

def run(self):
try:
log.debug('Heartbeat thread started')
heartbeat_log.debug('Heartbeat thread started')
while not self.closed:
self._run_once()

except ReferenceError:
log.debug('Heartbeat thread closed due to coordinator gc')
heartbeat_log.debug('Heartbeat thread closed due to coordinator gc')

except RuntimeError as e:
log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
heartbeat_log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
self.coordinator.group_id, e)
self.failed = e

finally:
log.debug('Heartbeat thread closed')
heartbeat_log.debug('Heartbeat thread closed')

def _run_once(self):
with self.coordinator._client._lock, self.coordinator._lock:
Expand All @@ -1063,16 +1065,16 @@ def _run_once(self):

with self.coordinator._lock:
if not self.enabled:
log.debug('Heartbeat disabled. Waiting')
heartbeat_log.debug('Heartbeat disabled. Waiting')
self.coordinator._lock.wait()
log.debug('Heartbeat re-enabled.')
heartbeat_log.debug('Heartbeat re-enabled.')
return

if self.coordinator.state is not MemberState.STABLE:
# the group is not stable (perhaps because we left the
# group or because the coordinator kicked us out), so
# disable heartbeats and wait for the main thread to rejoin.
log.debug('Group state is not stable, disabling heartbeats')
heartbeat_log.debug('Group state is not stable, disabling heartbeats')
self.disable()
return

Expand All @@ -1088,14 +1090,14 @@ def _run_once(self):
# the session timeout has expired without seeing a
# successful heartbeat, so we should probably make sure
# the coordinator is still healthy.
log.warning('Heartbeat session expired, marking coordinator dead')
heartbeat_log.warning('Heartbeat session expired, marking coordinator dead')
self.coordinator.coordinator_dead('Heartbeat session expired')

elif self.coordinator.heartbeat.poll_timeout_expired():
# the poll timeout has expired, which means that the
# foreground thread has stalled in between calls to
# poll(), so we explicitly leave the group.
log.warning('Heartbeat poll expired, leaving group')
heartbeat_log.warning('Heartbeat poll expired, leaving group')
### XXX
# maybe_leave_group acquires client + coordinator lock;
# if we hold coordinator lock before calling, we risk deadlock
Expand All @@ -1106,7 +1108,7 @@ def _run_once(self):
elif not self.coordinator.heartbeat.should_heartbeat():
# poll again after waiting for the retry backoff in case
# the heartbeat failed or the coordinator disconnected
log.log(0, 'Not ready to heartbeat, waiting')
heartbeat_log.log(0, 'Not ready to heartbeat, waiting')
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)

else:
Expand Down
Loading