From d6278ae0fc2452aad2459550758fbce3ac08365d Mon Sep 17 00:00:00 2001 From: Ruslan Date: Tue, 15 Jun 2021 16:12:58 +0300 Subject: [PATCH] Add delete records method support for kafka admin api (#1) --- kafka/admin/client.py | 59 ++++++++++++++++++++++++++++++++++++++++- kafka/protocol/admin.py | 33 ++++++++++++++++++++++- 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index d0fa84560..a17ee7304 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -16,7 +16,8 @@ from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, \ + DeleteRecordsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.structs import TopicPartition, OffsetAndMetadata @@ -948,6 +949,62 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal .format(version)) return self._send_request_to_controller(request) + def delete_records(self, records_to_delete, timeout_ms=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + + :return: List of DeleteRecordsResponse + """ + timeout_ms = self._validate_timeout(timeout_ms) + version = self._matching_api_version(MetadataRequest) + + topics = set() + + for topic2partition in records_to_delete: + topics.add(topic2partition.topic) + + request = MetadataRequest[version]( + topics=list(topics), + allow_auto_topic_creation=False + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + + self._wait_for_futures([future]) + response = future.value + + version = self._matching_api_version(DeleteRecordsRequest) + + PARTITIONS_INFO = 3 + NAME = 1 + PARTITION_INDEX = 1 + LEADER = 2 + + partition2leader = dict() + + for topic in response.topics: + for partition in topic[PARTITIONS_INFO]: + t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) + partition2leader[t2p] = partition[LEADER] + + responses = [] + + for topic2partition in records_to_delete: + request = DeleteRecordsRequest[version]( + topics=[(topic2partition.topic, [(topic2partition.partition, records_to_delete[topic2partition])])], + timeout_ms=timeout_ms + ) + # Sending separate request for each partition leader + future = self._send_request_to_node(partition2leader[topic2partition], request) + self._wait_for_futures([future]) + + response = future.value + responses.append(response) + + return responses + # delete records protocol not yet implemented # Note: send the request to the partition leaders diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index af88ea473..f6bdc8c16 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -239,6 +239,38 @@ class DeleteTopicsRequest_v3(Request): ] +class DeleteRecordsResponse_v0(Response): + API_KEY = 21 + API_VERSION = 0 + SCHEMA = Schema( + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('low_watermark', Int64), + ('error_code', Int16))))), + ('throttle_time_ms', Int32) + ) + + +class DeleteRecordsRequest_v0(Request): + API_KEY = 21 + API_VERSION = 0 + RESPONSE_TYPE = DeleteRecordsResponse_v0 + SCHEMA = Schema( + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('offset', Int64))))), + ('timeout_ms', Int32) + ) + + +DeleteRecordsResponse = [DeleteRecordsResponse_v0] +DeleteRecordsRequest = [DeleteRecordsRequest_v0] + + class ListGroupsResponse_v0(Response): API_KEY = 16 API_VERSION = 0 @@ -881,4 +913,3 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse = [ CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] -