Skip to content

feat AdminClient: support delete_records #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
119 changes: 114 additions & 5 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
UnrecognizedBrokerVersion, IllegalArgumentError)
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, UnknownTopicOrPartitionError,
UnrecognizedBrokerVersion, IllegalArgumentError
)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest, DescribeLogDirsRequest
DeleteGroupsRequest, DescribeLogDirsRequest, DeleteRecordsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -959,8 +960,116 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
.format(version))
return self._send_request_to_controller(request)

# delete records protocol not yet implemented
# Note: send the request to the partition leaders
def _get_leader_for_partitions(self, partitions, timeout_ms=None):
"""Finds ID of the leader node for every given topic partition.

Will raise UnknownTopicOrPartitionError if for some partition no leader can be found.

:param partitions: ``[TopicPartition]``: partitions for which to find leaders.
:param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from
config.

:return: Dictionary with ``{leader_id -> {partitions}}``
"""
timeout_ms = self._validate_timeout(timeout_ms)

partitions = set(partitions)
topics = set(tp.topic for tp in partitions)

response = self._get_cluster_metadata(topics=topics).to_object()

leader2partitions = defaultdict(list)
valid_partitions = set()
for topic in response.get("topics", ()):
for partition in topic.get("partitions", ()):
t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"])
if t2p in partitions:
leader2partitions[partition["leader"]].append(t2p)
valid_partitions.add(t2p)

if len(partitions) != len(valid_partitions):
unknown = set(partitions) - valid_partitions
raise UnknownTopicOrPartitionError(
"The following partitions are not known: %s"
% ", ".join(str(x) for x in unknown)
)

return leader2partitions

def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None):
"""Delete records whose offset is smaller than the given offset of the corresponding partition.

:param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the
given partitions.
:param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from
config.
:param partition_leader_id: ``str``: If specified, all deletion requests will be sent to
this node. No check is performed verifying that this is indeed the leader for all
listed partitions: use with caution.

:return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker.
See DeleteRecordsResponse for possible fields. error_code for all partitions is
guaranteed to be zero, otherwise an exception is raised.
"""
timeout_ms = self._validate_timeout(timeout_ms)
responses = []
version = self._matching_api_version(DeleteRecordsRequest)

# We want to make as few requests as possible
# If a single node serves as a partition leader for multiple partitions (and/or
# topics), we can send all of those in a single request.
# For that we store {leader -> {partitions for leader}}, and do 1 request per leader
if partition_leader_id is None:
leader2partitions = self._get_leader_for_partitions(
set(records_to_delete), timeout_ms
)
else:
leader2partitions = {partition_leader_id: set(records_to_delete)}

for leader, partitions in leader2partitions.items():
topic2partitions = defaultdict(list)
for partition in partitions:
topic2partitions[partition.topic].append(partition)

request = DeleteRecordsRequest[version](
topics=[
(topic, [(tp.partition, records_to_delete[tp]) for tp in partitions])
for topic, partitions in topic2partitions.items()
],
timeout_ms=timeout_ms
)
future = self._send_request_to_node(leader, request)
self._wait_for_futures([future])

responses.append(future.value.to_object())

partition2result = {}
partition2error = {}
for response in responses:
for topic in response["topics"]:
for partition in topic["partitions"]:
tp = TopicPartition(topic["name"], partition["partition_index"])
partition2result[tp] = partition
if partition["error_code"] != 0:
partition2error[tp] = partition["error_code"]

if partition2error:
if len(partition2error) == 1:
key, error = next(iter(partition2error.items()))
raise Errors.for_code(error)(
"Error deleting records from topic %s partition %s" % (key.topic, key.partition)
)
else:
raise Errors.BrokerResponseError(
"The following errors occured when trying to delete records: " +
", ".join(
"%s(partition=%d): %s" %
(partition.topic, partition.partition, Errors.for_code(error).__name__)
for partition, error in partition2error.items()
)
)

return partition2result

# create delegation token protocol not yet implemented
# Note: send the request to the least_loaded_node()
Expand Down
32 changes: 32 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,38 @@ class DeleteTopicsRequest_v3(Request):
]


class DeleteRecordsResponse_v0(Response):
API_KEY = 21
API_VERSION = 0
SCHEMA = Schema(
('throttle_time_ms', Int32),
('topics', Array(
('name', String('utf-8')),
('partitions', Array(
('partition_index', Int32),
('low_watermark', Int64),
('error_code', Int16))))),
)


class DeleteRecordsRequest_v0(Request):
API_KEY = 21
API_VERSION = 0
RESPONSE_TYPE = DeleteRecordsResponse_v0
SCHEMA = Schema(
('topics', Array(
('name', String('utf-8')),
('partitions', Array(
('partition_index', Int32),
('offset', Int64))))),
('timeout_ms', Int32)
)


DeleteRecordsResponse = [DeleteRecordsResponse_v0]
DeleteRecordsRequest = [DeleteRecordsRequest_v0]


class ListGroupsResponse_v0(Response):
API_KEY = 16
API_VERSION = 0
Expand Down
4 changes: 2 additions & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
_consumer = [None]

def factory(**kafka_consumer_params):
def factory(topics=(topic,), **kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
params.setdefault('auto_offset_reset', 'earliest')
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params))
return _consumer[0]

yield factory
Expand Down
73 changes: 72 additions & 1 deletion test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from threading import Event, Thread
from time import time, sleep

from kafka.structs import TopicPartition
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError)
from kafka.errors import (
BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError,
GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError)


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
Expand Down Expand Up @@ -317,3 +320,71 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa
assert group1 not in consumergroups
assert group2 in consumergroups
assert group3 not in consumergroups

@pytest.fixture(name="topic2")
def _topic2(kafka_broker, request):
"""Same as `topic` fixture, but a different name if you need to topics."""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
return topic_name

@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0")
def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic, topic2):
t0p0 = TopicPartition(topic, 0)
t0p1 = TopicPartition(topic, 1)
t0p2 = TopicPartition(topic, 2)
t1p0 = TopicPartition(topic2, 0)
t1p1 = TopicPartition(topic2, 1)
t1p2 = TopicPartition(topic2, 2)

partitions = (t0p0, t0p1, t0p2, t1p0, t1p1, t1p2)

for p in partitions:
send_messages(range(0, 100), partition=p.partition, topic=p.topic)

consumer1 = kafka_consumer_factory(group_id=None, topics=())
consumer1.assign(partitions)
for _ in range(600):
next(consumer1)

result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000)
assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition}
assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition}
assert result[t1p0] == {"low_watermark": 60, "error_code": 0, "partition_index": t1p0.partition}
assert result[t1p2] == {"low_watermark": 70, "error_code": 0, "partition_index": t1p2.partition}

consumer2 = kafka_consumer_factory(group_id=None, topics=())
consumer2.assign(partitions)
all_messages = consumer2.poll(max_records=600, timeout_ms=2000)
assert sum(len(x) for x in all_messages.values()) == 600 - 100 - 50 - 40 - 30
assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure there are no delayed messages

assert not all_messages.get(t0p0, [])
assert [r.offset for r in all_messages[t0p1]] == list(range(50, 100))
assert [r.offset for r in all_messages[t0p2]] == list(range(100))

assert [r.offset for r in all_messages[t1p0]] == list(range(40, 100))
assert [r.offset for r in all_messages[t1p1]] == list(range(100))
assert [r.offset for r in all_messages[t1p2]] == list(range(30, 100))


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0")
def test_delete_records_with_errors(kafka_admin_client, topic, send_messages):
sleep(1) # sometimes the topic is not created yet...?
p0 = TopicPartition(topic, 0)
p1 = TopicPartition(topic, 1)
p2 = TopicPartition(topic, 2)
# verify that topic has been created
send_messages(range(0, 1), partition=p2.partition, topic=p2.topic)

with pytest.raises(UnknownTopicOrPartitionError):
kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1})
with pytest.raises(UnknownTopicOrPartitionError):
kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1})
with pytest.raises(OffsetOutOfRangeError):
kafka_admin_client.delete_records({p0: 1000})
with pytest.raises(BrokerResponseError):
kafka_admin_client.delete_records({p0: 1000, p1: 1000})



Loading