Skip to content

KIP-345 Static membership implementation #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Mar 20, 2024
Merged
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 2.0.3 (under development)

Consumer
* KIP-345: Implement static membership support

# 2.0.2 (Sep 29, 2020)

Consumer
Expand Down
6 changes: 5 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ that expose basic message attributes: topic, partition, offset, key, and value:

.. code-block:: python

# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
# or as a static member with a fixed group member name
# consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group',
# group_instance_id='consumer-1', leave_group_on_close=False)
for msg in consumer:
print (msg)

Expand Down
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

2.2.0
####################

Consumer
--------
* KIP-345: Implement static membership support


2.0.2 (Sep 29, 2020)
####################
Expand Down
12 changes: 12 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ KafkaConsumer
group_id='my-group',
bootstrap_servers='my.server.com')

# Use multiple static consumers w/ 2.3.0 kafka brokers
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
group_instance_id='process-1',
leave_group_on_close=False,
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
group_instance_id='process-2',
leave_group_on_close=False,
bootstrap_servers='my.server.com')


There are many configuration options for the consumer class. See
:class:`~kafka.KafkaConsumer` API documentation for more details.
Expand Down
12 changes: 11 additions & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class KafkaConsumer:
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str): the unique identifier to distinguish
each client instance. If set and leave_group_on_close is
False consumer group rebalancing won't be triggered until
sessiont_timeout_ms is met. Requires 2.3.0+.
leave_group_on_close (bool or None): whether to leave a consumer
group or not on consumer shutdown.
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (callable): Any callable that takes a
Expand Down Expand Up @@ -241,6 +247,7 @@ class KafkaConsumer:
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances
coordinator (callable): Custom class / callable for creating ConsumerCoordinator instances

Note:
Configuration parameters are described in more detail at
Expand All @@ -250,6 +257,8 @@ class KafkaConsumer:
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
'group_id': None,
'group_instance_id': '',
'leave_group_on_close': None,
'key_deserializer': None,
'value_deserializer': None,
'fetch_max_wait_ms': 500,
Expand Down Expand Up @@ -304,6 +313,7 @@ class KafkaConsumer:
'sasl_oauth_token_provider': None,
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
'kafka_client': KafkaClient,
'coordinator': ConsumerCoordinator,
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000

Expand Down Expand Up @@ -379,7 +389,7 @@ def __init__(self, *topics, **configs):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, self._metrics, **self.config)
self._coordinator = ConsumerCoordinator(
self._coordinator = self.config['coordinator'](
self._client, self._subscription, self._metrics,
assignors=self.config['partition_assignment_strategy'],
**self.config)
Expand Down
140 changes: 111 additions & 29 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class BaseCoordinator:

DEFAULT_CONFIG = {
'group_id': 'kafka-python-default-group',
'group_instance_id': '',
'leave_group_on_close': None,
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'max_poll_interval_ms': 300000,
Expand All @@ -92,6 +94,12 @@ def __init__(self, client, metrics, **configs):
group_id (str): name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
committing offsets. Default: 'kafka-python-default-group'
group_instance_id (str): the unique identifier to distinguish
each client instance. If set and leave_group_on_close is
False consumer group rebalancing won't be triggered until
sessiont_timeout_ms is met. Requires 2.3.0+.
leave_group_on_close (bool or None): whether to leave a consumer
group or not on consumer shutdown.
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group management facilities. Default: 30000
heartbeat_interval_ms (int): The expected time in milliseconds
Expand All @@ -117,6 +125,11 @@ def __init__(self, client, metrics, **configs):
"different values for max_poll_interval_ms "
"and session_timeout_ms")

if self.config['group_instance_id'] and self.config['api_version'] < (2, 3, 0):
raise Errors.KafkaConfigurationError(
'Broker version %s does not support static membership' % (self.config['api_version'],),
)

self._client = client
self.group_id = self.config['group_id']
self.heartbeat = Heartbeat(**self.config)
Expand Down Expand Up @@ -451,30 +464,48 @@ def _send_join_group_request(self):
if self.config['api_version'] < (0, 9):
raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers')
elif (0, 9) <= self.config['api_version'] < (0, 10, 1):
request = JoinGroupRequest[0](
version = 0
args = (
self.group_id,
self.config['session_timeout_ms'],
self._generation.member_id,
self.protocol_type(),
member_metadata)
member_metadata,
)
elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
request = JoinGroupRequest[1](
version = 1
args = (
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.protocol_type(),
member_metadata)
member_metadata,
)
elif self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 5
args = (
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.config['group_instance_id'],
self.protocol_type(),
member_metadata,
)
else:
request = JoinGroupRequest[2](
version = 2
args = (
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.protocol_type(),
member_metadata)
member_metadata,
)

# create the request for the coordinator
request = JoinGroupRequest[version](*args)
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
Expand Down Expand Up @@ -558,12 +589,25 @@ def _handle_join_group_response(self, future, send_time, response):

def _on_join_follower(self):
# send follower's sync group with an empty assignment
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
{})
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 3
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.config['group_instance_id'],
{},
)
else:
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
{},
)

request = SyncGroupRequest[version](*args)
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
Expand All @@ -586,15 +630,30 @@ def _on_join_leader(self, response):
except Exception as e:
return Future().failure(e)

version = 0 if self.config['api_version'] < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
[(member_id,
assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in group_assignment.items()])
group_assignment = [
(member_id, assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in group_assignment.items()
]

if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 3
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.config['group_instance_id'],
group_assignment,
)
else:
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
group_assignment,
)

request = SyncGroupRequest[version](*args)
log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
Expand Down Expand Up @@ -760,15 +819,22 @@ def close(self):
def maybe_leave_group(self):
"""Leave the current group and reset local generation/memberId."""
with self._client._lock, self._lock:
if (not self.coordinator_unknown()
if (
not self.coordinator_unknown()
and self.state is not MemberState.UNJOINED
and self._generation is not Generation.NO_GENERATION):

and self._generation is not Generation.NO_GENERATION
and self._leave_group_on_close()
):
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
log.info('Leaving consumer group (%s).', self.group_id)
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 3
args = (self.group_id, [(self._generation.member_id, self.config['group_instance_id'])])
else:
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
args = self.group_id, self._generation.member_id
request = LeaveGroupRequest[version](*args)
future = self._client.send(self.coordinator_id, request)
future.add_callback(self._handle_leave_group_response)
future.add_errback(log.error, "LeaveGroup request failed: %s")
Expand All @@ -795,10 +861,23 @@ def _send_heartbeat_request(self):
e = Errors.NodeNotReadyError(self.coordinator_id)
return Future().failure(e)

version = 0 if self.config['api_version'] < (0, 11, 0) else 1
request = HeartbeatRequest[version](self.group_id,
self._generation.generation_id,
self._generation.member_id)
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 2
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.config['group_instance_id'],
)
else:
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
)

request = HeartbeatRequest[version](*args)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
Expand Down Expand Up @@ -845,6 +924,9 @@ def _handle_heartbeat_response(self, future, send_time, response):
log.error("Heartbeat failed: Unhandled error: %s", error)
future.failure(error)

def _leave_group_on_close(self):
return self.config['leave_group_on_close'] is None or self.config['leave_group_on_close']


class GroupCoordinatorMetrics:
def __init__(self, heartbeat, metrics, prefix, tags=None):
Expand Down
17 changes: 15 additions & 2 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class ConsumerCoordinator(BaseCoordinator):
"""This class manages the coordination process with the consumer coordinator."""
DEFAULT_CONFIG = {
'group_id': 'kafka-python-default-group',
'group_instance_id': '',
'leave_group_on_close': None,
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': None,
Expand All @@ -45,6 +47,12 @@ def __init__(self, client, subscription, metrics, **configs):
group_id (str): name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
committing offsets. Default: 'kafka-python-default-group'
group_instance_id (str): the unique identifier to distinguish
each client instance. If set and leave_group_on_close is
False consumer group rebalancing won't be triggered until
sessiont_timeout_ms is met. Requires 2.3.0+.
leave_group_on_close (bool or None): whether to leave a consumer
group or not on consumer shutdown.
enable_auto_commit (bool): If true the consumer's offset will be
periodically committed in the background. Default: True.
auto_commit_interval_ms (int): milliseconds between automatic
Expand Down Expand Up @@ -304,10 +312,15 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
assert assignor, f'Invalid assignment protocol: {assignment_strategy}'
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:

for member in members:
if len(member) == 3:
member_id, group_instance_id, metadata_bytes = member
else:
member_id, metadata_bytes = member
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
member_metadata[member_id] = metadata
all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member
all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member

# the leader will begin watching for changes to any of the topics
# the group is interested in, which ensures that all metadata changes
Expand Down
Loading
Loading