From 62b2a839fefce23a3a2cc02931f5f7331f2d67cf Mon Sep 17 00:00:00 2001 From: Denis Kazakov Date: Fri, 21 Oct 2022 23:32:24 +0300 Subject: [PATCH 01/12] KIP-345 Add static consumer membership support --- kafka/consumer/group.py | 7 +- kafka/coordinator/base.py | 127 ++++++++++++++++++++++++++-------- kafka/coordinator/consumer.py | 12 +++- kafka/protocol/group.py | 119 ++++++++++++++++++++++++++++--- 4 files changed, 225 insertions(+), 40 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1d1dfa37..e82d755c5 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -56,6 +56,8 @@ class KafkaConsumer(six.Iterator): committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None + group_instance_id (str or None): the unique identifier defined by + user to distinguish each client instance key_deserializer (callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (callable): Any callable that takes a @@ -245,6 +247,7 @@ class KafkaConsumer(six.Iterator): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + coordinator (callable): Custom class / callable for creating ConsumerCoordinator instances Note: Configuration parameters are described in more detail at @@ -254,6 +257,7 @@ class KafkaConsumer(six.Iterator): 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'group_id': None, + 'group_instance_id': None, 'key_deserializer': None, 'value_deserializer': None, 'fetch_max_wait_ms': 500, @@ -308,6 +312,7 @@ class KafkaConsumer(six.Iterator): 'sasl_oauth_token_provider': None, 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator 'kafka_client': KafkaClient, + 'coordinator': ConsumerCoordinator, } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 @@ -383,7 +388,7 @@ def __init__(self, *topics, **configs): self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( self._client, self._subscription, self._metrics, **self.config) - self._coordinator = ConsumerCoordinator( + self._coordinator = self.config['coordinator']( self._client, self._subscription, self._metrics, assignors=self.config['partition_assignment_strategy'], **self.config) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5e41309df..154153e27 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -82,6 +82,7 @@ class BaseCoordinator(object): DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', + 'group_instance_id': None, 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, 'max_poll_interval_ms': 300000, @@ -96,6 +97,8 @@ def __init__(self, client, metrics, **configs): group_id (str): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' + group_instance_id (str or None): the unique identifier defined by + user to distinguish each client instance session_timeout_ms (int): The timeout used to detect failures when using Kafka's group management facilities. Default: 30000 heartbeat_interval_ms (int): The expected time in milliseconds @@ -455,30 +458,48 @@ def _send_join_group_request(self): if self.config['api_version'] < (0, 9): raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers') elif (0, 9) <= self.config['api_version'] < (0, 10, 1): - request = JoinGroupRequest[0]( + version = 0 + args = ( self.group_id, self.config['session_timeout_ms'], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0): - request = JoinGroupRequest[1]( + version = 1 + args = ( self.group_id, self.config['session_timeout_ms'], self.config['max_poll_interval_ms'], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) + elif self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 5 + args = ( + self.group_id, + self.config['session_timeout_ms'], + self.config['max_poll_interval_ms'], + self._generation.member_id, + self.config['group_instance_id'], + self.protocol_type(), + member_metadata, + ) else: - request = JoinGroupRequest[2]( + version = 2 + args = ( self.group_id, self.config['session_timeout_ms'], self.config['max_poll_interval_ms'], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) # create the request for the coordinator + request = JoinGroupRequest[version](*args) log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) future = Future() _f = self._client.send(self.coordinator_id, request) @@ -562,12 +583,25 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - {}) + if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 3 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.config['group_instance_id'], + {}, + ) + else: + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + {}, + ) + + request = SyncGroupRequest[version](*args) log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) @@ -590,15 +624,30 @@ def _on_join_leader(self, response): except Exception as e: return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - [(member_id, - assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in six.iteritems(group_assignment)]) + group_assignment = [ + (member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) + for member_id, assignment in six.iteritems(group_assignment) + ] + + if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 3 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.config['group_instance_id'], + group_assignment, + ) + else: + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + group_assignment, + ) + request = SyncGroupRequest[version](*args) log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) @@ -764,15 +813,22 @@ def close(self): def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" with self._client._lock, self._lock: - if (not self.coordinator_unknown() + if ( + not self.coordinator_unknown() and self.state is not MemberState.UNJOINED - and self._generation is not Generation.NO_GENERATION): - + and self._generation is not Generation.NO_GENERATION + and not self.config['group_instance_id'] + ): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. log.info('Leaving consumer group (%s).', self.group_id) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 3 + args = (self.group_id, [(self._generation.member_id, self.config['group_instance_id'])]) + else: + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + args = self.group_id, self._generation.member_id + request = LeaveGroupRequest[version](*args) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) future.add_errback(log.error, "LeaveGroup request failed: %s") @@ -799,10 +855,23 @@ def _send_heartbeat_request(self): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = HeartbeatRequest[version](self.group_id, - self._generation.generation_id, - self._generation.member_id) + if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: + version = 2 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.config['group_instance_id'], + ) + else: + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + args = ( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + ) + + request = HeartbeatRequest[version](*args) log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 971f5e802..ae7535de6 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -29,6 +29,7 @@ class ConsumerCoordinator(BaseCoordinator): """This class manages the coordination process with the consumer coordinator.""" DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', + 'group_instance_id': None, 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': None, @@ -49,6 +50,8 @@ def __init__(self, client, subscription, metrics, **configs): group_id (str): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' + group_instance_id (str or None): the unique identifier defined by + user to distinguish each client instance enable_auto_commit (bool): If true the consumer's offset will be periodically committed in the background. Default: True. auto_commit_interval_ms (int): milliseconds between automatic @@ -308,10 +311,15 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) member_metadata = {} all_subscribed_topics = set() - for member_id, metadata_bytes in members: + + for member in members: + if len(member) == 3: + member_id, group_instance_id, metadata_bytes = member + else: + member_id, metadata_bytes = member metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) member_metadata[member_id] = metadata - all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member + all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member # the leader will begin watching for changes to any of the topics # the group is interested in, which ensures that all metadata changes diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index bcb96553b..c6a40f78f 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -42,6 +42,23 @@ class JoinGroupResponse_v2(Response): ) +class JoinGroupResponse_v5(Response): + API_KEY = 11 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('generation_id', Int32), + ('group_protocol', String('utf-8')), + ('leader_id', String('utf-8')), + ('member_id', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('member_metadata', Bytes))), + ) + + class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 @@ -83,11 +100,30 @@ class JoinGroupRequest_v2(Request): UNKNOWN_MEMBER_ID = '' +class JoinGroupRequest_v5(Request): + API_KEY = 11 + API_VERSION = 5 + RESPONSE_TYPE = JoinGroupResponse_v5 + SCHEMA = Schema( + ('group', String('utf-8')), + ('session_timeout', Int32), + ('rebalance_timeout', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('protocol_type', String('utf-8')), + ('group_protocols', Array( + ('protocol_name', String('utf-8')), + ('protocol_metadata', Bytes))), + ) + UNKNOWN_MEMBER_ID = '' + + + JoinGroupRequest = [ - JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2 + JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, None, None, JoinGroupRequest_v5, ] JoinGroupResponse = [ - JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2 + JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, None, None, JoinGroupResponse_v5, ] @@ -118,6 +154,16 @@ class SyncGroupResponse_v1(Response): ) +class SyncGroupResponse_v3(Response): + API_KEY = 14 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('member_assignment', Bytes) + ) + + class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 @@ -139,8 +185,23 @@ class SyncGroupRequest_v1(Request): SCHEMA = SyncGroupRequest_v0.SCHEMA -SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1] -SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1] +class SyncGroupRequest_v3(Request): + API_KEY = 14 + API_VERSION = 3 + RESPONSE_TYPE = SyncGroupResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('group_assignment', Array( + ('member_id', String('utf-8')), + ('member_metadata', Bytes))), + ) + + +SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, None, SyncGroupRequest_v3] +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, None, SyncGroupResponse_v3] class MemberAssignment(Struct): @@ -188,8 +249,29 @@ class HeartbeatRequest_v1(Request): SCHEMA = HeartbeatRequest_v0.SCHEMA -HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1] -HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1] +class HeartbeatResponse_v2(Response): + API_KEY = 12 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16) + ) + + +class HeartbeatRequest_v2(Request): + API_KEY = 12 + API_VERSION = 2 + RESPONSE_TYPE = HeartbeatResponse_v2 + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ) + + +HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1, HeartbeatRequest_v2] +HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1, HeartbeatResponse_v2] class LeaveGroupResponse_v0(Response): @@ -209,6 +291,15 @@ class LeaveGroupResponse_v1(Response): ) +class LeaveGroupResponse_v3(Response): + API_KEY = 13 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16) + ) + + class LeaveGroupRequest_v0(Request): API_KEY = 13 API_VERSION = 0 @@ -226,5 +317,17 @@ class LeaveGroupRequest_v1(Request): SCHEMA = LeaveGroupRequest_v0.SCHEMA -LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1] -LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1] +class LeaveGroupRequest_v3(Request): + API_KEY = 13 + API_VERSION = 3 + RESPONSE_TYPE = LeaveGroupResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('member_identity_list', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')))), + ) + + +LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1, None, LeaveGroupRequest_v3] +LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1, None, LeaveGroupResponse_v3] From 6242c03b2c34016af324d5b701e72d20dc1ce60f Mon Sep 17 00:00:00 2001 From: Denis Kazakov Date: Sat, 22 Oct 2022 00:55:01 +0300 Subject: [PATCH 02/12] KIP-345 Add examples to docs --- CHANGES.md | 5 +++++ README.rst | 2 ++ docs/changelog.rst | 7 +++++++ docs/usage.rst | 10 ++++++++++ 4 files changed, 24 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 097c55db6..0add9c82f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,8 @@ +# 2.0.3 (under development) + +Consumer +* KIP-345: Implement static membership support + # 2.0.2 (Sep 29, 2020) Consumer diff --git a/README.rst b/README.rst index 5f834442c..fdaa027e5 100644 --- a/README.rst +++ b/README.rst @@ -56,6 +56,8 @@ that expose basic message attributes: topic, partition, offset, key, and value: >>> # join a consumer group for dynamic partition assignment and offset commits >>> from kafka import KafkaConsumer >>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') +>>> # or as a static member with a fixed group member name +>>> # consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group', group_instance_id='consumer-1') >>> for msg in consumer: ... print (msg) diff --git a/docs/changelog.rst b/docs/changelog.rst index 446b29021..eae2b9954 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,13 @@ Changelog ========= +2.0.3 (under development) +#################### + +Consumer +-------- +* KIP-345: Implement static membership support + 2.0.2 (Sep 29, 2020) #################### diff --git a/docs/usage.rst b/docs/usage.rst index 1cf1aa414..9a36ee059 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -45,6 +45,16 @@ KafkaConsumer group_id='my-group', bootstrap_servers='my.server.com') + # Use multiple static consumers w/ 2.3.0 kafka brokers + consumer1 = KafkaConsumer('my-topic', + group_id='my-group', + group_instance_id='process-1', + bootstrap_servers='my.server.com') + consumer2 = KafkaConsumer('my-topic', + group_id='my-group', + group_instance_id='process-2', + bootstrap_servers='my.server.com') + There are many configuration options for the consumer class. See :class:`~kafka.KafkaConsumer` API documentation for more details. From 0d75205609e690c9efa689d3382c7cf2adcdc101 Mon Sep 17 00:00:00 2001 From: Denis Kazakov Date: Mon, 24 Oct 2022 00:26:28 +0300 Subject: [PATCH 03/12] KIP-345 Add leave_group_on_close flag https://issues.apache.org/jira/browse/KAFKA-6995 --- kafka/consumer/group.py | 11 ++++++++--- kafka/coordinator/base.py | 21 +++++++++++++++++---- kafka/coordinator/consumer.py | 11 ++++++++--- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index e82d755c5..60231d295 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -56,8 +56,12 @@ class KafkaConsumer(six.Iterator): committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None - group_instance_id (str or None): the unique identifier defined by - user to distinguish each client instance + group_instance_id (str): the unique identifier to distinguish + each client instance. If set and leave_group_on_close is + False consumer group rebalancing won't be triggered until + sessiont_timeout_ms is met. Requires 2.3.0+. + leave_group_on_close (bool or None): whether to leave a consumer + group or not on consumer shutdown. key_deserializer (callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (callable): Any callable that takes a @@ -257,7 +261,8 @@ class KafkaConsumer(six.Iterator): 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'group_id': None, - 'group_instance_id': None, + 'group_instance_id': '', + 'leave_group_on_close': None, 'key_deserializer': None, 'value_deserializer': None, 'fetch_max_wait_ms': 500, diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 154153e27..466c04296 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -82,7 +82,8 @@ class BaseCoordinator(object): DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', - 'group_instance_id': None, + 'group_instance_id': '', + 'leave_group_on_close': None, 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, 'max_poll_interval_ms': 300000, @@ -97,8 +98,12 @@ def __init__(self, client, metrics, **configs): group_id (str): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' - group_instance_id (str or None): the unique identifier defined by - user to distinguish each client instance + group_instance_id (str): the unique identifier to distinguish + each client instance. If set and leave_group_on_close is + False consumer group rebalancing won't be triggered until + sessiont_timeout_ms is met. Requires 2.3.0+. + leave_group_on_close (bool or None): whether to leave a consumer + group or not on consumer shutdown. session_timeout_ms (int): The timeout used to detect failures when using Kafka's group management facilities. Default: 30000 heartbeat_interval_ms (int): The expected time in milliseconds @@ -124,6 +129,11 @@ def __init__(self, client, metrics, **configs): "different values for max_poll_interval_ms " "and session_timeout_ms") + if self.config['group_instance_id'] and self.config['api_version'] < (2, 3, 0): + raise Errors.KafkaConfigurationError( + 'Broker version %s does not support static membership' % (self.config['api_version'],), + ) + self._client = client self.group_id = self.config['group_id'] self.heartbeat = Heartbeat(**self.config) @@ -817,7 +827,7 @@ def maybe_leave_group(self): not self.coordinator_unknown() and self.state is not MemberState.UNJOINED and self._generation is not Generation.NO_GENERATION - and not self.config['group_instance_id'] + and self._leave_group_on_close() ): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. @@ -918,6 +928,9 @@ def _handle_heartbeat_response(self, future, send_time, response): log.error("Heartbeat failed: Unhandled error: %s", error) future.failure(error) + def _leave_group_on_close(self): + return self.config['leave_group_on_close'] is None or self.config['leave_group_on_close'] + class GroupCoordinatorMetrics(object): def __init__(self, heartbeat, metrics, prefix, tags=None): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ae7535de6..fd803b1c1 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -29,7 +29,8 @@ class ConsumerCoordinator(BaseCoordinator): """This class manages the coordination process with the consumer coordinator.""" DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', - 'group_instance_id': None, + 'group_instance_id': '', + 'leave_group_on_close': None, 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': None, @@ -50,8 +51,12 @@ def __init__(self, client, subscription, metrics, **configs): group_id (str): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' - group_instance_id (str or None): the unique identifier defined by - user to distinguish each client instance + group_instance_id (str): the unique identifier to distinguish + each client instance. If set and leave_group_on_close is + False consumer group rebalancing won't be triggered until + sessiont_timeout_ms is met. Requires 2.3.0+. + leave_group_on_close (bool or None): whether to leave a consumer + group or not on consumer shutdown. enable_auto_commit (bool): If true the consumer's offset will be periodically committed in the background. Default: True. auto_commit_interval_ms (int): milliseconds between automatic From 4e962e24628847174ecccc3effb81ac56d2706be Mon Sep 17 00:00:00 2001 From: Denis Kazakov Date: Mon, 24 Oct 2022 00:27:51 +0300 Subject: [PATCH 04/12] KIP-345 Add tests for static membership --- test/test_consumer.py | 5 +++++ test/test_consumer_group.py | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/test/test_consumer.py b/test/test_consumer.py index 436fe55c0..0c6110517 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -24,3 +24,8 @@ def test_subscription_copy(self): assert sub == set(['foo']) sub.add('fizz') assert consumer.subscription() == set(['foo']) + + def test_version_for_static_membership(self): + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(2, 3, 0), group_instance_id='test') + with pytest.raises(KafkaConfigurationError): + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(2, 2, 0), group_instance_id='test') diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 58dc7ebf9..1da1c0e81 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -177,3 +177,23 @@ def test_heartbeat_thread(kafka_broker, topic): consumer.poll(timeout_ms=100) assert consumer._coordinator.heartbeat.last_poll > last_poll consumer.close() + + +@pytest.mark.skipif(env_kafka_version() < (2, 3, 0), reason="Requires KAFKA_VERSION >= 2.3.0") +@pytest.mark.parametrize('leave, result', [ + (False, True), + (True, False), +]) +def test_kafka_consumer_rebalance_for_static_members(kafka_consumer_factory, leave, result): + GROUP_ID = random_string(10) + + consumer1 = kafka_consumer_factory(group_id=GROUP_ID, group_instance_id=GROUP_ID, leave_group_on_close=leave) + consumer1.poll() + generation1 = consumer1._coordinator.generation().generation_id + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id=GROUP_ID, group_instance_id=GROUP_ID, leave_group_on_close=leave) + consumer2.poll() + generation2 = consumer2._coordinator.generation().generation_id + consumer2.close() + assert (generation1 == generation2) is result From 7e474ab751dd1739b4d52b599e34f89bb0811e61 Mon Sep 17 00:00:00 2001 From: Denis Kazakov Date: Mon, 24 Oct 2022 23:21:03 +0300 Subject: [PATCH 05/12] KIP-345 Update docs for leave_group_on_close option --- README.rst | 3 ++- docs/usage.rst | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index fdaa027e5..36b3a6753 100644 --- a/README.rst +++ b/README.rst @@ -57,7 +57,8 @@ that expose basic message attributes: topic, partition, offset, key, and value: >>> from kafka import KafkaConsumer >>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') >>> # or as a static member with a fixed group member name ->>> # consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group', group_instance_id='consumer-1') +>>> # consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group', +>>> # group_instance_id='consumer-1', leave_group_on_close=False) >>> for msg in consumer: ... print (msg) diff --git a/docs/usage.rst b/docs/usage.rst index 9a36ee059..10c7e8a6a 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -49,10 +49,12 @@ KafkaConsumer consumer1 = KafkaConsumer('my-topic', group_id='my-group', group_instance_id='process-1', + leave_group_on_close=False, bootstrap_servers='my.server.com') consumer2 = KafkaConsumer('my-topic', group_id='my-group', group_instance_id='process-2', + leave_group_on_close=False, bootstrap_servers='my.server.com') From a9d5e8bc81083bbbc799b5da7021016b2272c3ef Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 22:28:19 -0500 Subject: [PATCH 06/12] Update changelog.rst --- docs/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index a215da03d..d0a346b0f 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,7 +1,7 @@ Changelog ========= -2.0.3 (under development) +2.1.0 #################### Consumer From 33b2754d24be0c2a7dc46c3e5b07d9014a0e9fc9 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 19 Mar 2024 18:19:43 -0400 Subject: [PATCH 07/12] remove six from base.py --- kafka/coordinator/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 7117ae795..a982d55ce 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -632,7 +632,7 @@ def _on_join_leader(self, response): group_assignment = [ (member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in six.iteritems(group_assignment) + for member_id, assignment in iteritems(group_assignment) ] if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: From 52c0d208c93c09223df67a02612d729f326099dd Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 19 Mar 2024 18:26:24 -0400 Subject: [PATCH 08/12] Update base.py --- kafka/coordinator/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index a982d55ce..982b7f167 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -632,7 +632,7 @@ def _on_join_leader(self, response): group_assignment = [ (member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in iteritems(group_assignment) + for member_id, assignment in group_assignment.items() ] if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: From 984c996f8b65c1d4e91bb7fe7dab617dade1c27f Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 19 Mar 2024 18:38:45 -0400 Subject: [PATCH 09/12] Update base.py --- kafka/coordinator/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 982b7f167..a982d55ce 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -632,7 +632,7 @@ def _on_join_leader(self, response): group_assignment = [ (member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in group_assignment.items() + for member_id, assignment in iteritems(group_assignment) ] if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: From ecdabd6b3b04c74af5713f4f610993800e5180cf Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 19 Mar 2024 19:23:38 -0400 Subject: [PATCH 10/12] Update base.py --- kafka/coordinator/base.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index a982d55ce..d5ec4c720 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -632,7 +632,7 @@ def _on_join_leader(self, response): group_assignment = [ (member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in iteritems(group_assignment) + for member_id, assignment in group_assignment.items() ] if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']: @@ -646,13 +646,12 @@ def _on_join_leader(self, response): ) else: version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( + args = ( self.group_id, self._generation.generation_id, self._generation.member_id, - [(member_id, - assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in group_assignment.items()]) + group_assignment, + ) request = SyncGroupRequest[version](*args) log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", From f65921bacd68514945c9d8da80ab26957051480c Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 19 Mar 2024 20:45:21 -0400 Subject: [PATCH 11/12] Update changelog.rst --- docs/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index d0a346b0f..67013247b 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,7 +1,7 @@ Changelog ========= -2.1.0 +2.2.0 #################### Consumer From 1b5a10e748e569dbd714c7191682e5ff67c205a6 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 19 Mar 2024 22:02:50 -0400 Subject: [PATCH 12/12] Update README.rst --- README.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/README.rst b/README.rst index 3d529114a..ce82c6d3b 100644 --- a/README.rst +++ b/README.rst @@ -72,7 +72,6 @@ that expose basic message attributes: topic, partition, offset, key, and value: # group_instance_id='consumer-1', leave_group_on_close=False) for msg in consumer: print (msg) - print (msg) .. code-block:: python