Skip to content

Admin - Implement perform leader election #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 93 additions & 13 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
from kafka.admin.leader_election_resources import ElectionType
from kafka.client_async import KafkaClient, selectors
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
import kafka.errors as Errors
Expand All @@ -17,7 +18,7 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest, DescribeLogDirsRequest
DeleteGroupsRequest, ElectLeadersRequest, DescribeLogDirsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -391,27 +392,55 @@ def _send_request_to_controller(self, request):
# So this is a little brittle in that it assumes all responses have
# one of these attributes and that they always unpack into
# (topic, error_code) tuples.
topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
else response.topic_error_codes)
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
# extra values (usually the error_message)
for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
topic_error_tuples = getattr(response, 'topic_errors', getattr(response, 'topic_error_codes', None))
if topic_error_tuples is not None:
success = self._parse_topic_request_response(topic_error_tuples, request, response, tries)
else:
# Leader Election request has a two layer error response (topic and partition)
success = self._parse_topic_partition_request_response(request, response, tries)

if success:
return response
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")

def _parse_topic_request_response(self, topic_error_tuples, request, response, tries):
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
# extra values (usually the error_message)
for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
error_type = Errors.for_code(error_code)
if tries and error_type is NotControllerError:
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
return False
elif error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
return True

def _parse_topic_partition_request_response(self, request, response, tries):
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
# extra values (usually the error_message)
for topic, partition_results in response.replication_election_results:
for partition_id, error_code in map(lambda e: e[:2], partition_results):
error_type = Errors.for_code(error_code)
if tries and error_type is NotControllerError:
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
break
elif error_type is not Errors.NoError:
return False
elif error_type not in [Errors.NoError, Errors.ElectionNotNeeded]:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
else:
return response
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
return True

@staticmethod
def _convert_new_topic_request(new_topic):
Expand Down Expand Up @@ -1335,6 +1364,56 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
.format(version))
return self._send_request_to_node(group_coordinator_id, request)

@staticmethod
def _convert_topic_partitions(topic_partitions):
return [
(
topic,
partition_ids
)
for topic, partition_ids in topic_partitions.items()
]

def _get_all_topic_partitions(self):
return [
(
topic,
[partition_info.partition for partition_info in self._client.cluster._partitions[topic].values()]
)
for topic in self._client.cluster.topics()
]

def _get_topic_partitions(self, topic_partitions):
if topic_partitions is None:
return self._get_all_topic_partitions()
return self._convert_topic_partitions(topic_partitions)

def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None):
"""Perform leader election on the topic partitions.

:param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean
:param topic_partitions: A map of topic name strings to partition ids list.
By default, will run on all topic partitions
:param timeout_ms: Milliseconds to wait for the leader election process to complete
before the broker returns.

:return: Appropriate version of ElectLeadersResponse class.
"""
version = self._matching_api_version(ElectLeadersRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if 0 < version <= 1:
request = ElectLeadersRequest[version](
election_type=ElectionType(election_type),
topic_partitions=self._get_topic_partitions(topic_partitions),
timeout=timeout_ms,
)
else:
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdminClient."
.format(version))
# TODO convert structs to a more pythonic interface
return self._send_request_to_controller(request)

def _wait_for_futures(self, futures):
while not all(future.succeeded() for future in futures):
for future in futures:
Expand All @@ -1358,3 +1437,4 @@ def describe_log_dirs(self):
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return future.value

15 changes: 15 additions & 0 deletions kafka/admin/leader_election_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum

class ElectionType(IntEnum):
""" Leader election type
"""

PREFERRED = 0,
UNCLEAN = 1
6 changes: 6 additions & 0 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,12 @@ class GroupIdNotFoundError(BrokerResponseError):
description = 'The group id does not exist.'


class ElectionNotNeeded(BrokerResponseError):
errno = 84
message = 'ELECTION_NOT_NEEDED'
description = 'Leader election not needed for topic partition.'


class KafkaUnavailableError(KafkaError):
pass

Expand Down
65 changes: 65 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,3 +1092,68 @@ class ListPartitionReassignmentsRequest_v0(Request):
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]

ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]


class ElectLeadersResponse_v0(Response):
API_KEY = 43
API_VERSION = 1
SCHEMA = Schema(
('throttle_time_ms', Int32),
('error_code', Int16),
('replication_election_results', Array(
('topic', String('utf-8')),
('partition_result', Array(
('partition_id', Int32),
('error_code', Int16),
('error_message', String('utf-8'))
))
))
)

class ElectLeadersRequest_v0(Request):
API_KEY = 43
API_VERSION = 1
RESPONSE_TYPE = ElectLeadersResponse_v0
SCHEMA = Schema(
('election_type', Int8),
('topic_partitions', Array(
('topic', String('utf-8')),
('partition_ids', Array(Int32))
)),
('timeout', Int32),
)


class ElectLeadersResponse_v1(Response):
API_KEY = 43
API_VERSION = 1
SCHEMA = Schema(
('throttle_time_ms', Int32),
('error_code', Int16),
('replication_election_results', Array(
('topic', String('utf-8')),
('partition_result', Array(
('partition_id', Int32),
('error_code', Int16),
('error_message', String('utf-8'))
))
))
)

class ElectLeadersRequest_v1(Request):
API_KEY = 43
API_VERSION = 1
RESPONSE_TYPE = ElectLeadersResponse_v1
SCHEMA = Schema(
('election_type', Int8),
('topic_partitions', Array(
('topic', String('utf-8')),
('partition_ids', Array(Int32))
)),
('timeout', Int32),
)


ElectLeadersRequest = [ElectLeadersRequest_v0, ElectLeadersRequest_v1]

ElectLeadersResponse = [ElectLeadersResponse_v0, ElectLeadersResponse_v1]
Loading