From bded1cb4080c4662a3f429b505897f527ff10ac6 Mon Sep 17 00:00:00 2001 From: Deric Degagne Date: Mon, 21 Mar 2022 13:21:05 +0000 Subject: [PATCH 1/3] Added support for FindCoordinatorRequest dynamic version --- .gitignore | 2 ++ kafka/admin/client.py | 19 ++++++++----------- kafka/admin/coordinator_type.py | 15 +++++++++++++++ kafka/cluster.py | 8 ++++---- kafka/conn.py | 4 ++-- kafka/coordinator/base.py | 4 ++-- kafka/protocol/commit.py | 27 +++++++++++++-------------- kafka/protocol/parser.py | 8 ++++---- test/test_protocol.py | 4 ++-- 9 files changed, 52 insertions(+), 39 deletions(-) create mode 100644 kafka/admin/coordinator_type.py diff --git a/.gitignore b/.gitignore index f3cd082fa..1f40a75b7 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ docs/_build integration-test/ tests-env/ .pytest_cache/ +cli/ +.dccache \ No newline at end of file diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fd4d66110..0767f8e98 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -22,7 +22,8 @@ ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, DeleteGroupsRequest ) -from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest +from kafka.admin.coordinator_type import CoordinatorType +from kafka.protocol.commit import OffsetFetchRequest, FindCoordinatorRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.types import Array from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation @@ -297,18 +298,14 @@ def _find_coordinator_id_send_request(self, group_id): name as a string. :return: A message future """ - # TODO add support for dynamically picking version of - # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. - # When I experimented with this, the coordinator value returned in - # GroupCoordinatorResponse_v1 didn't match the value returned by - # GroupCoordinatorResponse_v0 and I couldn't figure out why. - version = 0 - # version = self._matching_api_version(GroupCoordinatorRequest) + version = self._matching_api_version(FindCoordinatorRequest) if version <= 0: - request = GroupCoordinatorRequest[version](group_id) + request = FindCoordinatorRequest[version](group_id) + elif version >= 1: + request = FindCoordinatorRequest[version](group_id, CoordinatorType.GROUP) else: raise NotImplementedError( - "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." + "Support for FindCoordinatorRequest{} has not yet been added to KafkaAdminClient." .format(version)) return self._send_request_to_node(self._client.least_loaded_node(), request) @@ -328,7 +325,7 @@ def _find_coordinator_id_process_response(self, response): .format(response)) else: raise NotImplementedError( - "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." + "Support for FindCoordinatorResponse_v{} has not yet been added to KafkaAdminClient." .format(response.API_VERSION)) return response.coordinator_id diff --git a/kafka/admin/coordinator_type.py b/kafka/admin/coordinator_type.py new file mode 100644 index 000000000..6ece19a7b --- /dev/null +++ b/kafka/admin/coordinator_type.py @@ -0,0 +1,15 @@ +from __future__ import absolute_import + +# enum in stdlib as of py3.4 +try: + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + + +class CoordinatorType(IntEnum): + """Type of coordinator key type""" + + GROUP = 0 + TRANSACTION = 1 \ No newline at end of file diff --git a/kafka/cluster.py b/kafka/cluster.py index 438baf29d..616c2da2d 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -21,7 +21,7 @@ class ClusterMetadata(object): A class to manage kafka cluster metadata. This class does not perform any IO. It simply updates internal state - given API responses (MetadataResponse, GroupCoordinatorResponse). + given API responses (MetadataResponse, FindCoordinatorResponse). Keyword Arguments: retry_backoff_ms (int): Milliseconds to backoff when retrying on @@ -346,8 +346,8 @@ def add_group_coordinator(self, group, response): """Update with metadata for a group coordinator Arguments: - group (str): name of group from GroupCoordinatorRequest - response (GroupCoordinatorResponse): broker response + group (str): name of group from FindCoordinatorRequest + response (FindCoordinatorResponse): broker response Returns: string: coordinator node_id if metadata is updated, None on error @@ -355,7 +355,7 @@ def add_group_coordinator(self, group, response): log.debug("Updating coordinator for %s: %s", group, response) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: - log.error("GroupCoordinatorResponse error: %s", error_type) + log.error("FindCoordinatorResponse error: %s", error_type) self._groups[group] = -1 return diff --git a/kafka/conn.py b/kafka/conn.py index cac354875..a4e01626c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1224,13 +1224,13 @@ def reset_override_configs(): # request, both will be failed with a ConnectionError that wraps # socket.error (32, 54, or 104) from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest - from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest + from kafka.protocol.commit import OffsetFetchRequest, FindCoordinatorRequest test_cases = [ # All cases starting from 0.10 will be based on ApiVersionResponse ((0, 10), ApiVersionRequest[0]()), ((0, 9), ListGroupsRequest[0]()), - ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), + ((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')), ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), ((0, 8, 0), MetadataRequest[0](topics)), ] diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5e41309df..bdc6ca7ad 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -14,7 +14,7 @@ from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest +from kafka.protocol.commit import FindCoordinatorRequest, OffsetCommitRequest from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) @@ -669,7 +669,7 @@ def _send_group_coordinator_request(self): log.debug("Sending group coordinator request for group %s to broker %s", self.group_id, node_id) - request = GroupCoordinatorRequest[0](self.group_id) + request = FindCoordinatorRequest[0](self.group_id) future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_group_coordinator_response, future) diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 31fc23707..a61696784 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String +from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String, CompactBytes class OffsetCommitResponse_v0(Response): @@ -209,47 +209,46 @@ class OffsetFetchRequest_v3(Request): ] -class GroupCoordinatorResponse_v0(Response): +class FindCoordinatorResponse_v0(Response): API_KEY = 10 API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('coordinator_id', Int32), - ('host', String('utf-8')), + ('host', CompactBytes), ('port', Int32) ) -class GroupCoordinatorResponse_v1(Response): +class FindCoordinatorResponse_v1(Response): API_KEY = 10 - API_VERSION = 1 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('error_message', String('utf-8')), ('coordinator_id', Int32), - ('host', String('utf-8')), + ('host', CompactBytes), ('port', Int32) ) -class GroupCoordinatorRequest_v0(Request): +class FindCoordinatorRequest_v0(Request): API_KEY = 10 API_VERSION = 0 - RESPONSE_TYPE = GroupCoordinatorResponse_v0 + RESPONSE_TYPE = FindCoordinatorResponse_v0 SCHEMA = Schema( - ('consumer_group', String('utf-8')) + ('group_id', String('utf-8')) ) -class GroupCoordinatorRequest_v1(Request): +class FindCoordinatorRequest_v1(Request): API_KEY = 10 API_VERSION = 1 - RESPONSE_TYPE = GroupCoordinatorResponse_v1 + RESPONSE_TYPE = FindCoordinatorResponse_v1 SCHEMA = Schema( ('coordinator_key', String('utf-8')), ('coordinator_type', Int8) ) - -GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1] -GroupCoordinatorResponse = [GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1] +FindCoordinatorRequest = [FindCoordinatorRequest_v0, FindCoordinatorRequest_v1] +FindCoordinatorResponse = [FindCoordinatorResponse_v0, FindCoordinatorResponse_v1] diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index a9e767220..ef8a031eb 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -4,9 +4,9 @@ import logging import kafka.errors as Errors -from kafka.protocol.commit import GroupCoordinatorResponse +from kafka.protocol.commit import FindCoordinatorResponse from kafka.protocol.frame import KafkaBytes -from kafka.protocol.types import Int32, TaggedFields +from kafka.protocol.types import Int32 from kafka.version import __version__ log = logging.getLogger(__name__) @@ -142,9 +142,9 @@ def _process_response(self, read_buffer): # 0.8.2 quirk if (recv_correlation_id == 0 and correlation_id != 0 and - request.RESPONSE_TYPE is GroupCoordinatorResponse[0] and + request.RESPONSE_TYPE is FindCoordinatorResponse[0] and (self._api_version == (0, 8, 2) or self._api_version is None)): - log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' + log.warning('Kafka 0.8.2 quirk -- FindCoordinatorResponse' ' Correlation ID does not match request. This' ' should go away once at least one topic has been' ' initialized on the broker.') diff --git a/test/test_protocol.py b/test/test_protocol.py index 6a77e19d6..58af72323 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -5,7 +5,7 @@ import pytest from kafka.protocol.api import RequestHeader -from kafka.protocol.commit import GroupCoordinatorRequest +from kafka.protocol.commit import FindCoordinatorRequest from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.message import Message, MessageSet, PartialMessage from kafka.protocol.metadata import MetadataRequest @@ -168,7 +168,7 @@ def test_encode_message_header(): b'client3', # ClientId ]) - req = GroupCoordinatorRequest[0]('foo') + req = FindCoordinatorRequest[0]('foo') header = RequestHeader(req, correlation_id=4, client_id='client3') assert header.encode() == expect From 3df7e6a8f8000895f88ebcb3016c7708c6df3972 Mon Sep 17 00:00:00 2001 From: Deric Degagne Date: Mon, 21 Mar 2022 19:54:10 +0000 Subject: [PATCH 2/3] fixed typo to include version # --- .gitignore | 2 +- kafka/admin/client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 1f40a75b7..1ebba5952 100644 --- a/.gitignore +++ b/.gitignore @@ -15,5 +15,5 @@ docs/_build integration-test/ tests-env/ .pytest_cache/ -cli/ +settings.json .dccache \ No newline at end of file diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 0767f8e98..5b6648dd2 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -305,7 +305,7 @@ def _find_coordinator_id_send_request(self, group_id): request = FindCoordinatorRequest[version](group_id, CoordinatorType.GROUP) else: raise NotImplementedError( - "Support for FindCoordinatorRequest{} has not yet been added to KafkaAdminClient." + "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." .format(version)) return self._send_request_to_node(self._client.least_loaded_node(), request) From c72ad6b1ba979da54685c97511f68ce3309219ef Mon Sep 17 00:00:00 2001 From: Deric Degagne Date: Tue, 22 Mar 2022 18:41:03 +0000 Subject: [PATCH 3/3] updated cluster metadata --- kafka/admin/client.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 5b6648dd2..ab98125ad 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -221,6 +221,7 @@ def __init__(self, **configs): self._closed = False self._refresh_controller_id() + self._cluster_metadata = self._get_cluster_metadata().to_object() log.debug("KafkaAdminClient started.") def close(self): @@ -511,20 +512,18 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): return future.value def list_topics(self): - metadata = self._get_cluster_metadata(topics=None) - obj = metadata.to_object() - return [t['topic'] for t in obj['topics']] + metadata = copy.copy(self._cluster_metadata) + topics = metadata.pop('topics') + return [m['topic'] for m in topics] def describe_topics(self, topics=None): - metadata = self._get_cluster_metadata(topics=topics) - obj = metadata.to_object() - return obj['topics'] + metadata = copy.copy(self._cluster_metadata) + return metadata.pop('topics') def describe_cluster(self): - metadata = self._get_cluster_metadata() - obj = metadata.to_object() - obj.pop('topics') # We have 'describe_topics' for this - return obj + metadata = copy.copy(self._cluster_metadata) + metadata.pop('topics') # describe_topics is for this + return metadata @staticmethod def _convert_describe_acls_response_to_acls(describe_response):