From 39f0e50b9441609e9dce4e60a1ab2c3f16680476 Mon Sep 17 00:00:00 2001 From: Taras Voinarovskiy Date: Sun, 30 Jul 2017 15:42:27 +0000 Subject: [PATCH 1/6] Added basic support for offsets_for_times API. Still needs to group by nodes and send in parallel. --- kafka/conn.py | 1 + kafka/consumer/fetcher.py | 94 +++++++++++++++++++++++++------ kafka/consumer/group.py | 42 +++++++++++++- kafka/protocol/offset.py | 4 +- kafka/structs.py | 3 + test/test_consumer_integration.py | 46 ++++++++++++++- 6 files changed, 169 insertions(+), 21 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index ac8bb3da3..d04230022 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,6 +19,7 @@ from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.fetch import FetchRequest from kafka.protocol.types import Int32 from kafka.version import __version__ diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8db89a19b..cb80a6f51 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -14,9 +14,11 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage -from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.protocol.offset import ( + OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET +) from kafka.serializer import Deserializer -from kafka.structs import TopicPartition +from kafka.structs import TopicPartition, OffsetAndTimestamp log = logging.getLogger(__name__) @@ -48,6 +50,7 @@ class Fetcher(six.Iterator): 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), + 'retry_backoff_ms': 100 } def __init__(self, client, subscriptions, metrics, **configs): @@ -180,6 +183,14 @@ def update_fetch_positions(self, partitions): " offset %s", tp, committed) self._subscriptions.seek(tp, committed) + def get_offsets_by_times(self, timestamps, timeout_ms): + response = {} + for tp, timestamp in timestamps.items(): + timestamp = int(timestamp) + offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms) + response[tp] = OffsetAndTimestamp(offset, tmst) + return response + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -199,14 +210,14 @@ def _reset_offset(self, partition): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offset = self._offset(partition, timestamp) + offset, _ = self._offset(partition, timestamp) # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) - def _offset(self, partition, timestamp): + def _offset(self, partition, timestamp, timeout_ms=None): """Fetch a single offset before the given timestamp for the partition. Blocks until offset is obtained, or a non-retriable exception is raised @@ -218,21 +229,37 @@ def _offset(self, partition, timestamp): is treated as epoch seconds. Returns: - int: message offset + (int, int): message offset and timestamp. None if not available """ + start_time = time.time() + remaining_ms = timeout_ms while True: future = self._send_offset_request(partition, timestamp) - self._client.poll(future=future) + self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): return future.value - if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type + if timeout_ms is not None: + remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + if remaining_ms < 0: + break + if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() - self._client.poll(future=refresh_future, sleep=True) + self._client.poll( + future=refresh_future, sleep=True, timeout_ms=remaining_ms) + else: + time.sleep(self.config['retry_backoff_ms'] / 1000.0) + + if timeout_ms is not None: + remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + + # Will only happen when timeout_ms != None + raise Errors.KafkaTimeoutError( + "Failed to get offsets by times in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -596,9 +623,15 @@ def _send_offset_request(self, partition, timestamp): " wait for metadata refresh", partition) return Future().failure(Errors.LeaderNotAvailableError(partition)) - request = OffsetRequest[0]( - -1, [(partition.topic, [(partition.partition, timestamp, 1)])] - ) + if self.config['api_version'] >= (0, 10, 1): + request = OffsetRequest[1]( + -1, [(partition.topic, [(partition.partition, timestamp)])] + ) + else: + request = OffsetRequest[0]( + -1, [(partition.topic, [(partition.partition, timestamp, 1)])] + ) + # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes @@ -623,22 +656,47 @@ def _handle_offset_response(self, partition, future, response): assert len(response.topics) == 1 and len(partition_info) == 1, ( 'OffsetResponse should only be for a single topic-partition') - part, error_code, offsets = partition_info[0] + partition_info = partition_info[0] + part, error_code = partition_info[:2] + assert topic == partition.topic and part == partition.partition, ( 'OffsetResponse partition does not match OffsetRequest partition') error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - assert len(offsets) == 1, 'Expected OffsetResponse with one offset' - offset = offsets[0] - log.debug("Fetched offset %d for partition %s", offset, partition) - future.success(offset) - elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): + if response.API_VERSION == 0: + offsets = partition_info[2] + assert len(offsets) == 1, 'Expected OffsetResponse with one offset' + offset = offsets[0] + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + future.success((offset, None)) + else: + timestamp, offset = partition_info[2:] + log.debug("Handling ListOffsetResponse response for %s. " + "Fetched offset %s, timestamp %s", + partition, offset, timestamp) + if offset != UNKNOWN_OFFSET: + future.success((offset, timestamp)) + else: + future.success((None, None)) + elif error_type is Errors.UnsupportedForMessageFormatError: + # The message format on the broker side is before 0.10.0, we simply + # put None in the response. + log.debug("Cannot search by timestamp for partition %s because the" + " message format version is before 0.10.0", partition) + future.success((None, None)) + elif error_type is Errors.NotLeaderForPartitionError: log.debug("Attempt to fetch offsets for partition %s failed due" " to obsolete leadership information, retrying.", partition) future.failure(error_type(partition)) + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warn("Received unknown topic or partition error in ListOffset " + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) + future.failure(error_type(partition)) else: log.warning("Attempt to fetch offsets for partition %s failed due to:" " %s", partition, error_type) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6adb154bc..f9b8f1682 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -6,7 +6,7 @@ import sys import time -from kafka.errors import KafkaConfigurationError +from kafka.errors import KafkaConfigurationError, UnsupportedVersionError from kafka.vendor import six @@ -861,6 +861,46 @@ def metrics(self, raw=False): metrics[k.group][k.name] = v.value() return metrics + def offsets_for_times(self, timestamps): + """ + Look up the offsets for the given partitions by timestamp. The returned + offset for each partition is the earliest offset whose timestamp is + greater than or equal to the given timestamp in the corresponding + partition. + + This is a blocking call. The consumer does not have to be assigned the + partitions. + + If the message format version in a partition is before 0.10.0, i.e. + the messages do not have timestamps, ``None`` will be returned for that + partition. + + Note: + Notice that this method may block indefinitely if the partition + does not exist. + + Arguments: + timestamps (dict): ``{TopicPartition: int}`` mapping from partition + to the timestamp to look up. + + Raises: + ValueError: if the target timestamp is negative + UnsupportedVersionError: if the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: if fetch failed in request_timeout_ms + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + for tp, ts in timestamps.items(): + if ts < 0: + raise ValueError( + "The target time for partition {} is {}. The target time " + "cannot be negative.".format(tp, ts)) + return self._fetcher.get_offsets_by_times( + timestamps, self.config['request_timeout_ms']) + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 8353f8caa..517965836 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -3,6 +3,8 @@ from .api import Request, Response from .types import Array, Int8, Int16, Int32, Int64, Schema, String +UNKNOWN_OFFSET = -1 + class OffsetResetStrategy(object): LATEST = -1 @@ -91,7 +93,7 @@ class OffsetRequest_v2(Request): RESPONSE_TYPE = OffsetResponse_v2 SCHEMA = Schema( ('replica_id', Int32), - ('isolation_level', Int8), + ('isolation_level', Int8), # <- added isolation_level ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( diff --git a/kafka/structs.py b/kafka/structs.py index 48321e718..62f36dd4c 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -74,6 +74,9 @@ OffsetAndMetadata = namedtuple("OffsetAndMetadata", ["offset", "metadata"]) +OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", + ["offset", "timestamp"]) + # Deprecated structs OffsetAndMessage = namedtuple("OffsetAndMessage", diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 193a57039..218ed2c36 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,12 +1,14 @@ import logging import os +import time from six.moves import xrange import six from . import unittest from kafka import ( - KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message + KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, + create_gzip_message, KafkaProducer ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError @@ -88,6 +90,12 @@ def kafka_consumer(self, **configs): **configs) return consumer + def kafka_producer(self, **configs): + brokers = '%s:%d' % (self.server.host, self.server.port) + producer = KafkaProducer( + bootstrap_servers=brokers, **configs) + return producer + def test_simple_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -624,3 +632,39 @@ def test_kafka_consumer_max_bytes_one_msg(self): fetched_msgs = [next(consumer) for i in range(10)] self.assertEqual(len(fetched_msgs), 10) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_offsets_for_time(self): + late_time = int(time.time()) + middle_time = late_time - 1 + early_time = late_time - 2 + tp = TopicPartition(self.topic, 0) + + kafka_producer = self.kafka_producer() + early_msg = kafka_producer.send( + self.topic, partition=0, value=b"first", + timestamp_ms=early_time).get() + late_msg = kafka_producer.send( + self.topic, partition=0, value=b"last", + timestamp_ms=late_time).get() + + consumer = self.kafka_consumer() + offsets = consumer.offsets_for_times({tp: early_time}) + self.assertEqual(offsets[tp].offset, early_msg.offset) + self.assertEqual(offsets[tp].timestamp, early_time) + + offsets = consumer.offsets_for_times({tp: middle_time}) + self.assertEqual(offsets[tp].offset, late_msg.offset) + self.assertEqual(offsets[tp].timestamp, late_time) + + offsets = consumer.offsets_for_times({tp: late_time}) + self.assertEqual(offsets[tp].offset, late_msg.offset) + self.assertEqual(offsets[tp].timestamp, late_time) + + @kafka_versions('<0.10.1') + def test_kafka_consumer_offsets_for_time_old(self): + consumer = self.kafka_consumer() + tp = TopicPartition(self.topic, 0) + + with self.assertRaises(): + consumer.offsets_for_times({tp: int(time.time())}) From f244e527a9674fa22b0bf9771585598cb758c8b1 Mon Sep 17 00:00:00 2001 From: Taras Voinarovskiy Date: Sun, 30 Jul 2017 20:27:10 +0000 Subject: [PATCH 2/6] Fix test for older brokers --- test/test_consumer_integration.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 218ed2c36..2169145c8 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -11,7 +11,9 @@ create_gzip_message, KafkaProducer ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES -from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError +from kafka.errors import ( + ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError +) from kafka.structs import ProduceRequestPayload, TopicPartition from test.fixtures import ZookeeperFixture, KafkaFixture @@ -666,5 +668,5 @@ def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() tp = TopicPartition(self.topic, 0) - with self.assertRaises(): + with self.assertRaises(UnsupportedVersionError): consumer.offsets_for_times({tp: int(time.time())}) From 63992f907aaabc4055d02de60f789443fcb4b54f Mon Sep 17 00:00:00 2001 From: Taras Voinarovskiy Date: Mon, 31 Jul 2017 12:41:53 +0000 Subject: [PATCH 3/6] Changed retrieve_offsets to allow fetching multiple offsets at once --- kafka/consumer/fetcher.py | 225 +++++++++++++++++------------- kafka/consumer/group.py | 4 +- test/test_consumer_integration.py | 45 +++++- 3 files changed, 174 insertions(+), 100 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index cb80a6f51..19982b195 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -184,12 +184,14 @@ def update_fetch_positions(self, partitions): self._subscriptions.seek(tp, committed) def get_offsets_by_times(self, timestamps, timeout_ms): - response = {} - for tp, timestamp in timestamps.items(): - timestamp = int(timestamp) - offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms) - response[tp] = OffsetAndTimestamp(offset, tmst) - return response + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + if tp not in offsets: + offsets[tp] = None + else: + offset, timestamp = offsets[tp] + offsets[tp] = OffsetAndTimestamp(offset, timestamp) + return offsets def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -210,31 +212,39 @@ def _reset_offset(self, partition): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offset, _ = self._offset(partition, timestamp) + offsets = self._retrieve_offsets({partition: timestamp}) + assert partition in offsets + offset = offsets[partition][0] # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) - def _offset(self, partition, timestamp, timeout_ms=None): - """Fetch a single offset before the given timestamp for the partition. + def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): + """ Fetch offset for each partition passed in ``timestamps`` map. - Blocks until offset is obtained, or a non-retriable exception is raised + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed (if it's not ``None``). Arguments: - partition The partition that needs fetching offset. - timestamp (int): timestamp for fetching offset. -1 for the latest - available, -2 for the earliest available. Otherwise timestamp - is treated as epoch seconds. + timestamps: {TopicPartition: int} dict with timestamps to fetch + offsets by. -1 for the latest available, -2 for the earliest + available. Otherwise timestamp is treated as epoch miliseconds. Returns: - (int, int): message offset and timestamp. None if not available + {TopicPartition: (int, int)}: Mapping of partition to + retrieved offset and timestamp. If offset does not exist for + the provided timestamp, that partition will be missing from + this mapping. """ + if not timestamps: + return {} + start_time = time.time() remaining_ms = timeout_ms - while True: - future = self._send_offset_request(partition, timestamp) + while remaining_ms > 0: + future = self._send_offset_requests(timestamps) self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): @@ -242,10 +252,10 @@ def _offset(self, partition, timestamp, timeout_ms=None): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - if timeout_ms is not None: - remaining_ms = timeout_ms - (time.time() - start_time) * 1000 - if remaining_ms < 0: - break + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms + if remaining_ms < 0: + break if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() @@ -254,10 +264,9 @@ def _offset(self, partition, timestamp, timeout_ms=None): else: time.sleep(self.config['retry_backoff_ms'] / 1000.0) - if timeout_ms is not None: - remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms - # Will only happen when timeout_ms != None raise Errors.KafkaTimeoutError( "Failed to get offsets by times in %s ms" % timeout_ms) @@ -603,104 +612,130 @@ def _deserialize(self, f, topic, bytes_): return f.deserialize(topic, bytes_) return f(bytes_) - def _send_offset_request(self, partition, timestamp): - """Fetch a single offset before the given timestamp for the partition. + def _send_offset_requests(self, timestamps): + """ Fetch offsets for each partition in timestamps dict. This may send + request to multiple nodes, based on who is Leader for partition. Arguments: - partition (TopicPartition): partition that needs fetching offset - timestamp (int): timestamp for fetching offset + timestamps (dict): {TopicPartition: int} mapping of fetching + timestamps. Returns: - Future: resolves to the corresponding offset + Future: resolves to a mapping of retrieved offsets """ - node_id = self._client.cluster.leader_for_partition(partition) - if node_id is None: - log.debug("Partition %s is unknown for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.StaleMetadata(partition)) - elif node_id == -1: - log.debug("Leader for partition %s unavailable for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.LeaderNotAvailableError(partition)) + timestamps_by_node = collections.defaultdict(dict) + for partition, timestamp in six.iteritems(timestamps): + node_id = self._client.cluster.leader_for_partition(partition) + if node_id is None: + self._client.add_topic(partition.topic) + log.debug("Partition %s is unknown for fetching offset," + " wait for metadata refresh", partition) + return Future().failure(Errors.StaleMetadata(partition)) + elif node_id == -1: + log.debug("Leader for partition %s unavailable for fetching " + "offset, wait for metadata refresh", partition) + return Future().failure( + Errors.LeaderNotAvailableError(partition)) + else: + timestamps_by_node[node_id][partition] = timestamp + + # Aggregate results until we have all + list_offsets_future = Future() + responses = [] + node_count = len(timestamps_by_node) + + def on_success(value): + responses.append(value) + if len(responses) == node_count: + offsets = {} + for r in responses: + offsets.update(r) + list_offsets_future.success(offsets) + + for node_id, timestamps in six.iteritems(timestamps_by_node): + _f = self._send_offset_request(node_id, timestamps) + _f.add_callback(on_success) + _f.add_errback(lambda e: list_offsets_future.failure(e)) + return list_offsets_future + + def _send_offset_request(self, node_id, timestamps): + by_topic = collections.defaultdict(list) + for tp, timestamp in six.iteritems(timestamps): + if self.config['api_version'] >= (0, 10, 1): + data = (tp.partition, timestamp) + else: + data = (tp.partition, timestamp, 1) + by_topic[tp.topic].append(data) if self.config['api_version'] >= (0, 10, 1): - request = OffsetRequest[1]( - -1, [(partition.topic, [(partition.partition, timestamp)])] - ) + request = OffsetRequest[1](-1, list(six.iteritems(by_topic))) else: - request = OffsetRequest[0]( - -1, [(partition.topic, [(partition.partition, timestamp, 1)])] - ) + request = OffsetRequest[0](-1, list(six.iteritems(by_topic))) # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes future = Future() + _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_response, partition, future) + _f.add_callback(self._handle_offset_response, future) _f.add_errback(lambda e: future.failure(e)) return future - def _handle_offset_response(self, partition, future, response): + def _handle_offset_response(self, future, response): """Callback for the response of the list offset call above. Arguments: - partition (TopicPartition): The partition that was fetched future (Future): the future to update based on response response (OffsetResponse): response from the server Raises: AssertionError: if response does not match partition """ - topic, partition_info = response.topics[0] - assert len(response.topics) == 1 and len(partition_info) == 1, ( - 'OffsetResponse should only be for a single topic-partition') - - partition_info = partition_info[0] - part, error_code = partition_info[:2] - - assert topic == partition.topic and part == partition.partition, ( - 'OffsetResponse partition does not match OffsetRequest partition') - - error_type = Errors.for_code(error_code) - if error_type is Errors.NoError: - if response.API_VERSION == 0: - offsets = partition_info[2] - assert len(offsets) == 1, 'Expected OffsetResponse with one offset' - offset = offsets[0] - log.debug("Handling v0 ListOffsetResponse response for %s. " - "Fetched offset %s", partition, offset) - future.success((offset, None)) - else: - timestamp, offset = partition_info[2:] - log.debug("Handling ListOffsetResponse response for %s. " - "Fetched offset %s, timestamp %s", - partition, offset, timestamp) - if offset != UNKNOWN_OFFSET: - future.success((offset, timestamp)) + timestamp_offset_map = {} + for topic, part_data in response.topics: + for partition_info in part_data: + partition, error_code = partition_info[:2] + partition = TopicPartition(topic, partition) + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + if response.API_VERSION == 0: + offsets = partition_info[2] + assert len(offsets) > 1, 'Expected OffsetResponse with one offset' + if offsets: + offset = offsets[0] + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + timestamp_offset_map[partition] = (offset, None) + else: + timestamp, offset = partition_info[2:] + log.debug("Handling ListOffsetResponse response for %s. " + "Fetched offset %s, timestamp %s", + partition, offset, timestamp) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = (offset, timestamp) + elif error_type is Errors.UnsupportedForMessageFormatError: + # The message format on the broker side is before 0.10.0, + # we simply put None in the response. + log.debug("Cannot search by timestamp for partition %s because the" + " message format version is before 0.10.0", partition) + elif error_type is Errors.NotLeaderForPartitionError: + log.debug("Attempt to fetch offsets for partition %s failed due" + " to obsolete leadership information, retrying.", + partition) + future.failure(error_type(partition)) + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warn("Received unknown topic or partition error in ListOffset " + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) + future.failure(error_type(partition)) else: - future.success((None, None)) - elif error_type is Errors.UnsupportedForMessageFormatError: - # The message format on the broker side is before 0.10.0, we simply - # put None in the response. - log.debug("Cannot search by timestamp for partition %s because the" - " message format version is before 0.10.0", partition) - future.success((None, None)) - elif error_type is Errors.NotLeaderForPartitionError: - log.debug("Attempt to fetch offsets for partition %s failed due" - " to obsolete leadership information, retrying.", - partition) - future.failure(error_type(partition)) - elif error_type is Errors.UnknownTopicOrPartitionError: - log.warn("Received unknown topic or partition error in ListOffset " - "request for partition %s. The topic/partition " + - "may not exist or the user may not have Describe access " - "to it.", partition) - future.failure(error_type(partition)) - else: - log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type) - future.failure(error_type(partition)) + log.warning("Attempt to fetch offsets for partition %s failed due to:" + " %s", partition, error_type) + future.failure(error_type(partition)) + if not future.is_done: + future.success(timestamp_offset_map) def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f9b8f1682..48a88b29f 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -881,7 +881,8 @@ def offsets_for_times(self, timestamps): Arguments: timestamps (dict): ``{TopicPartition: int}`` mapping from partition - to the timestamp to look up. + to the timestamp to look up. Unit should be milliseconds since + beginning of the epoch (midnight Jan 1, 1970 (UTC)) Raises: ValueError: if the target timestamp is negative @@ -894,6 +895,7 @@ def offsets_for_times(self, timestamps): "offsets_for_times API not supported for cluster version {}" .format(self.config['api_version'])) for tp, ts in timestamps.items(): + timestamps[tp] = int(ts) if ts < 0: raise ValueError( "The target time for partition {} is {}. The target time " diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 2169145c8..eab93beb4 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -14,7 +14,9 @@ from kafka.errors import ( ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError ) -from kafka.structs import ProduceRequestPayload, TopicPartition +from kafka.structs import ( + ProduceRequestPayload, TopicPartition, OffsetAndTimestamp +) from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -637,9 +639,9 @@ def test_kafka_consumer_max_bytes_one_msg(self): @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_for_time(self): - late_time = int(time.time()) - middle_time = late_time - 1 - early_time = late_time - 2 + late_time = int(time.time()) * 1000 + middle_time = late_time - 1000 + early_time = late_time - 2000 tp = TopicPartition(self.topic, 0) kafka_producer = self.kafka_producer() @@ -652,6 +654,7 @@ def test_kafka_consumer_offsets_for_time(self): consumer = self.kafka_consumer() offsets = consumer.offsets_for_times({tp: early_time}) + self.assertEqual(len(offsets), 1) self.assertEqual(offsets[tp].offset, early_msg.offset) self.assertEqual(offsets[tp].timestamp, early_time) @@ -663,6 +666,40 @@ def test_kafka_consumer_offsets_for_time(self): self.assertEqual(offsets[tp].offset, late_msg.offset) self.assertEqual(offsets[tp].timestamp, late_time) + # Out of bound timestamps check + + offsets = consumer.offsets_for_times({tp: 0}) + self.assertEqual(offsets[tp].offset, early_msg.offset) + self.assertEqual(offsets[tp].timestamp, early_time) + + offsets = consumer.offsets_for_times({tp: 9999999999999}) + self.assertEqual(offsets[tp], None) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_offsets_search_many_partitions(self): + tp0 = TopicPartition(self.topic, 0) + tp1 = TopicPartition(self.topic, 1) + + kafka_producer = self.kafka_producer() + send_time = int(time.time() * 1000) + p0msg = kafka_producer.send( + self.topic, partition=0, value=b"XXX", + timestamp_ms=send_time).get() + p1msg = kafka_producer.send( + self.topic, partition=1, value=b"XXX", + timestamp_ms=send_time).get() + + consumer = self.kafka_consumer() + offsets = consumer.offsets_for_times({ + tp0: send_time, + tp1: send_time + }) + + self.assertEqual(offsets, { + tp0: OffsetAndTimestamp(p0msg.offset, send_time), + tp1: OffsetAndTimestamp(p1msg.offset, send_time) + }) + @kafka_versions('<0.10.1') def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() From efc03d083d323e35a2d32bcbdbccc053f737836e Mon Sep 17 00:00:00 2001 From: Taras Voinarovskiy Date: Mon, 31 Jul 2017 14:33:22 +0000 Subject: [PATCH 4/6] Fix test for older brokers --- kafka/consumer/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 19982b195..1a3dfd52c 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -701,7 +701,7 @@ def _handle_offset_response(self, future, response): if error_type is Errors.NoError: if response.API_VERSION == 0: offsets = partition_info[2] - assert len(offsets) > 1, 'Expected OffsetResponse with one offset' + assert len(offsets) <= 1, 'Expected OffsetResponse with one offset' if offsets: offset = offsets[0] log.debug("Handling v0 ListOffsetResponse response for %s. " From 1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5 Mon Sep 17 00:00:00 2001 From: Taras Voinarovskiy Date: Sat, 5 Aug 2017 17:19:54 +0000 Subject: [PATCH 5/6] Added `beginning_offsets` and `end_offsets` API's and fixed @jeffwidman review issues --- kafka/conn.py | 2 +- kafka/consumer/fetcher.py | 23 ++++++-- kafka/consumer/group.py | 87 +++++++++++++++++++++++++++---- test/test_consumer_integration.py | 47 ++++++++++++++++- 4 files changed, 142 insertions(+), 17 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index d04230022..61d63bfc4 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -887,7 +887,7 @@ def _handle_api_version_response(self, response): def _infer_broker_version_from_api_versions(self, api_versions): # The logic here is to check the list of supported request versions - # in descending order. As soon as we find one that works, return it + # in reverse order. As soon as we find one that works, return it test_cases = [ # format (, ) ((0, 11, 0), MetadataRequest[4]), diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1a3dfd52c..6a7b79448 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -193,6 +193,21 @@ def get_offsets_by_times(self, timestamps, timeout_ms): offsets[tp] = OffsetAndTimestamp(offset, timestamp) return offsets + def beginning_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + + def end_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.LATEST, timeout_ms) + + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + timestamps = dict([(tp, timestamp) for tp in partitions]) + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + offsets[tp] = offsets[tp][0] + return offsets + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -222,10 +237,10 @@ def _reset_offset(self, partition): self._subscriptions.seek(partition, offset) def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): - """ Fetch offset for each partition passed in ``timestamps`` map. + """Fetch offset for each partition passed in ``timestamps`` map. Blocks until offsets are obtained, a non-retriable exception is raised - or ``timeout_ms`` passed (if it's not ``None``). + or ``timeout_ms`` passed. Arguments: timestamps: {TopicPartition: int} dict with timestamps to fetch @@ -268,7 +283,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): remaining_ms = timeout_ms - elapsed_ms raise Errors.KafkaTimeoutError( - "Failed to get offsets by times in %s ms" % timeout_ms) + "Failed to get offsets by timestamps in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -613,7 +628,7 @@ def _deserialize(self, f, topic, bytes_): return f(bytes_) def _send_offset_requests(self, timestamps): - """ Fetch offsets for each partition in timestamps dict. This may send + """Fetch offsets for each partition in timestamps dict. This may send request to multiple nodes, based on who is Leader for partition. Arguments: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 48a88b29f..54a3711ae 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -862,33 +862,37 @@ def metrics(self, raw=False): return metrics def offsets_for_times(self, timestamps): - """ - Look up the offsets for the given partitions by timestamp. The returned - offset for each partition is the earliest offset whose timestamp is - greater than or equal to the given timestamp in the corresponding - partition. + """Look up the offsets for the given partitions by timestamp. The + returned offset for each partition is the earliest offset whose + timestamp is greater than or equal to the given timestamp in the + corresponding partition. This is a blocking call. The consumer does not have to be assigned the partitions. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, ``None`` will be returned for that - partition. + partition. ``None`` will also be returned for the partition if there + are no messages in it. Note: - Notice that this method may block indefinitely if the partition - does not exist. + This method may block indefinitely if the partition does not exist. Arguments: timestamps (dict): ``{TopicPartition: int}`` mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)) + Returns: + ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition + to the timestamp and offset of the first message with timestamp + greater than or equal to the target timestamp. + Raises: - ValueError: if the target timestamp is negative - UnsupportedVersionError: if the broker does not support looking + ValueError: If the target timestamp is negative + UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: if fetch failed in request_timeout_ms + KafkaTimeoutError: If fetch failed in request_timeout_ms """ if self.config['api_version'] <= (0, 10, 0): raise UnsupportedVersionError( @@ -903,6 +907,67 @@ def offsets_for_times(self, timestamps): return self._fetcher.get_offsets_by_times( timestamps, self.config['request_timeout_ms']) + def beginning_offsets(self, partitions): + """Get the first offset for the given partitions. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list): List of TopicPartition instances to fetch + offsets for. + + Returns: + ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms. + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + offsets = self._fetcher.beginning_offsets( + partitions, self.config['request_timeout_ms']) + return offsets + + def end_offsets(self, partitions): + """Get the last offset for the given partitions. The last offset of a + partition is the offset of the upcoming message, i.e. the offset of the + last available message + 1. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list): List of TopicPartition instances to fetch + offsets for. + + Returns: + ``{TopicPartition: int}``: The end offsets for the given partitions. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + offsets = self._fetcher.end_offsets( + partitions, self.config['request_timeout_ms']) + return offsets + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index eab93beb4..803b16a49 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -12,7 +12,8 @@ ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( - ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError + ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, + KafkaTimeoutError ) from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp @@ -666,6 +667,9 @@ def test_kafka_consumer_offsets_for_time(self): self.assertEqual(offsets[tp].offset, late_msg.offset) self.assertEqual(offsets[tp].timestamp, late_time) + offsets = consumer.offsets_for_times({}) + self.assertEqual(offsets, {}) + # Out of bound timestamps check offsets = consumer.offsets_for_times({tp: 0}) @@ -675,6 +679,17 @@ def test_kafka_consumer_offsets_for_time(self): offsets = consumer.offsets_for_times({tp: 9999999999999}) self.assertEqual(offsets[tp], None) + # Beginning/End offsets + + offsets = consumer.beginning_offsets([tp]) + self.assertEqual(offsets, { + tp: early_msg.offset, + }) + offsets = consumer.end_offsets([tp]) + self.assertEqual(offsets, { + tp: late_msg.offset + 1 + }) + @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_search_many_partitions(self): tp0 = TopicPartition(self.topic, 0) @@ -700,6 +715,18 @@ def test_kafka_consumer_offsets_search_many_partitions(self): tp1: OffsetAndTimestamp(p1msg.offset, send_time) }) + offsets = consumer.beginning_offsets([tp0, tp1]) + self.assertEqual(offsets, { + tp0: p0msg.offset, + tp1: p1msg.offset + }) + + offsets = consumer.end_offsets([tp0, tp1]) + self.assertEqual(offsets, { + tp0: p0msg.offset + 1, + tp1: p1msg.offset + 1 + }) + @kafka_versions('<0.10.1') def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() @@ -707,3 +734,21 @@ def test_kafka_consumer_offsets_for_time_old(self): with self.assertRaises(UnsupportedVersionError): consumer.offsets_for_times({tp: int(time.time())}) + + with self.assertRaises(UnsupportedVersionError): + consumer.beginning_offsets([tp]) + + with self.assertRaises(UnsupportedVersionError): + consumer.end_offsets([tp]) + + @kafka_versions('<0.10.1') + def test_kafka_consumer_offsets_for_times_errors(self): + consumer = self.kafka_consumer() + tp = TopicPartition(self.topic, 0) + bad_tp = TopicPartition(self.topic, 100) + + with self.assertRaises(ValueError): + consumer.offsets_for_times({tp: -1}) + + with self.assertRaises(KafkaTimeoutError): + consumer.offsets_for_times({bad_tp: 0}) From 55ded554f9f5b470eeb53500e455ecd87f4d8f87 Mon Sep 17 00:00:00 2001 From: Taras Voinarovskiy Date: Sun, 6 Aug 2017 10:50:16 +0000 Subject: [PATCH 6/6] Added unit tests for fetcher's `_reset_offset` and related functions. --- kafka/consumer/fetcher.py | 21 +++- test/test_consumer_integration.py | 2 +- test/test_fetcher.py | 183 +++++++++++++++++++++++++++++- 3 files changed, 199 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 6a7b79448..c0d607550 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -228,7 +228,8 @@ def _reset_offset(self, partition): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) offsets = self._retrieve_offsets({partition: timestamp}) - assert partition in offsets + if partition not in offsets: + raise NoOffsetForPartitionError(partition) offset = offsets[partition][0] # we might lose the assignment while fetching the offset, @@ -667,10 +668,14 @@ def on_success(value): offsets.update(r) list_offsets_future.success(offsets) + def on_fail(err): + if not list_offsets_future.is_done: + list_offsets_future.failure(err) + for node_id, timestamps in six.iteritems(timestamps_by_node): _f = self._send_offset_request(node_id, timestamps) _f.add_callback(on_success) - _f.add_errback(lambda e: list_offsets_future.failure(e)) + _f.add_errback(on_fail) return list_offsets_future def _send_offset_request(self, node_id, timestamps): @@ -717,10 +722,13 @@ def _handle_offset_response(self, future, response): if response.API_VERSION == 0: offsets = partition_info[2] assert len(offsets) <= 1, 'Expected OffsetResponse with one offset' - if offsets: + if not offsets: + offset = UNKNOWN_OFFSET + else: offset = offsets[0] - log.debug("Handling v0 ListOffsetResponse response for %s. " - "Fetched offset %s", partition, offset) + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + if offset != UNKNOWN_OFFSET: timestamp_offset_map[partition] = (offset, None) else: timestamp, offset = partition_info[2:] @@ -739,16 +747,19 @@ def _handle_offset_response(self, future, response): " to obsolete leadership information, retrying.", partition) future.failure(error_type(partition)) + return elif error_type is Errors.UnknownTopicOrPartitionError: log.warn("Received unknown topic or partition error in ListOffset " "request for partition %s. The topic/partition " + "may not exist or the user may not have Describe access " "to it.", partition) future.failure(error_type(partition)) + return else: log.warning("Attempt to fetch offsets for partition %s failed due to:" " %s", partition, error_type) future.failure(error_type(partition)) + return if not future.is_done: future.success(timestamp_offset_map) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 803b16a49..4b5e78a35 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -741,7 +741,7 @@ def test_kafka_consumer_offsets_for_time_old(self): with self.assertRaises(UnsupportedVersionError): consumer.end_offsets([tp]) - @kafka_versions('<0.10.1') + @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_for_times_errors(self): consumer = self.kafka_consumer() tp = TopicPartition(self.topic, 0) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index dcfba78be..0562ec58c 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -3,12 +3,21 @@ import pytest +import itertools +from collections import OrderedDict + from kafka.client_async import KafkaClient -from kafka.consumer.fetcher import Fetcher +from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest +from kafka.protocol.offset import OffsetResponse from kafka.structs import TopicPartition +from kafka.future import Future +from kafka.errors import ( + StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, + UnknownTopicOrPartitionError +) @pytest.fixture @@ -101,3 +110,175 @@ def test_update_fetch_positions(fetcher, mocker): fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 fetcher._subscriptions.seek.assert_called_with(partition, 123) + + +def test__reset_offset(fetcher, mocker): + tp = TopicPartition("topic", 0) + fetcher._subscriptions.subscribe(topics="topic") + fetcher._subscriptions.assign_from_subscribed([tp]) + fetcher._subscriptions.need_offset_reset(tp) + mocked = mocker.patch.object(fetcher, '_retrieve_offsets') + + mocked.return_value = {} + with pytest.raises(NoOffsetForPartitionError): + fetcher._reset_offset(tp) + + mocked.return_value = {tp: (1001, None)} + fetcher._reset_offset(tp) + assert not fetcher._subscriptions.assignment[tp].awaiting_reset + assert fetcher._subscriptions.assignment[tp].position == 1001 + + +def test__send_offset_requests(fetcher, mocker): + tp = TopicPartition("topic_send_offset", 1) + mocked_send = mocker.patch.object(fetcher, "_send_offset_request") + send_futures = [] + + def send_side_effect(*args, **kw): + f = Future() + send_futures.append(f) + return f + mocked_send.side_effect = send_side_effect + + mocked_leader = mocker.patch.object( + fetcher._client.cluster, "leader_for_partition") + # First we report unavailable leader 2 times different ways and later + # always as available + mocked_leader.side_effect = itertools.chain( + [None, -1], itertools.cycle([0])) + + # Leader == None + fut = fetcher._send_offset_requests({tp: 0}) + assert fut.failed() + assert isinstance(fut.exception, StaleMetadata) + assert not mocked_send.called + + # Leader == -1 + fut = fetcher._send_offset_requests({tp: 0}) + assert fut.failed() + assert isinstance(fut.exception, LeaderNotAvailableError) + assert not mocked_send.called + + # Leader == 0, send failed + fut = fetcher._send_offset_requests({tp: 0}) + assert not fut.is_done + assert mocked_send.called + # Check that we bound the futures correctly to chain failure + send_futures.pop().failure(NotLeaderForPartitionError(tp)) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError) + + # Leader == 0, send success + fut = fetcher._send_offset_requests({tp: 0}) + assert not fut.is_done + assert mocked_send.called + # Check that we bound the futures correctly to chain success + send_futures.pop().success({tp: (10, 10000)}) + assert fut.succeeded() + assert fut.value == {tp: (10, 10000)} + + +def test__send_offset_requests_multiple_nodes(fetcher, mocker): + tp1 = TopicPartition("topic_send_offset", 1) + tp2 = TopicPartition("topic_send_offset", 2) + tp3 = TopicPartition("topic_send_offset", 3) + tp4 = TopicPartition("topic_send_offset", 4) + mocked_send = mocker.patch.object(fetcher, "_send_offset_request") + send_futures = [] + + def send_side_effect(node_id, timestamps): + f = Future() + send_futures.append((node_id, timestamps, f)) + return f + mocked_send.side_effect = send_side_effect + + mocked_leader = mocker.patch.object( + fetcher._client.cluster, "leader_for_partition") + mocked_leader.side_effect = itertools.cycle([0, 1]) + + # -- All node succeeded case + tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) + fut = fetcher._send_offset_requests(tss) + assert not fut.is_done + assert mocked_send.call_count == 2 + + req_by_node = {} + second_future = None + for node, timestamps, f in send_futures: + req_by_node[node] = timestamps + if node == 0: + # Say tp3 does not have any messages so it's missing + f.success({tp1: (11, 1001)}) + else: + second_future = f + assert req_by_node == { + 0: {tp1: 0, tp3: 0}, + 1: {tp2: 0, tp4: 0} + } + + # We only resolved 1 future so far, so result future is not yet ready + assert not fut.is_done + second_future.success({tp2: (12, 1002), tp4: (14, 1004)}) + assert fut.succeeded() + assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)} + + # -- First succeeded second not + del send_futures[:] + fut = fetcher._send_offset_requests(tss) + assert len(send_futures) == 2 + send_futures[0][2].success({tp1: (11, 1001)}) + send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1)) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + # -- First fails second succeeded + del send_futures[:] + fut = fetcher._send_offset_requests(tss) + assert len(send_futures) == 2 + send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1)) + send_futures[1][2].success({tp1: (11, 1001)}) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + +def test__handle_offset_response(fetcher, mocker): + # Broker returns UnsupportedForMessageFormatError, will omit partition + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 43, -1, -1)]), + ("topic", [(1, 0, 1000, 9999)]) + ]) + fetcher._handle_offset_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 1): (9999, 1000)} + + # Broker returns NotLeaderForPartitionError + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 6, -1, -1)]), + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError) + + # Broker returns UnknownTopicOrPartitionError + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 3, -1, -1)]), + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + # Broker returns many errors and 1 result + # Will fail on 1st error and return + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 43, -1, -1)]), + ("topic", [(1, 6, -1, -1)]), + ("topic", [(2, 3, -1, -1)]), + ("topic", [(3, 0, 1000, 9999)]) + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError)