Skip to content

Commit 8cf4484

Browse files
Merge pull request dpkp#1161 from dpkp/issue1036_offset_by_time
Added support for offsets_for_times, beginning_offsets and end_offsets APIs.
2 parents da25df6 + 55ded55 commit 8cf4484

File tree

7 files changed

+609
-68
lines changed

7 files changed

+609
-68
lines changed

kafka/conn.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from kafka.protocol.admin import SaslHandShakeRequest
2020
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
2121
from kafka.protocol.metadata import MetadataRequest
22+
from kafka.protocol.fetch import FetchRequest
2223
from kafka.protocol.types import Int32
2324
from kafka.version import __version__
2425

@@ -886,7 +887,7 @@ def _handle_api_version_response(self, response):
886887

887888
def _infer_broker_version_from_api_versions(self, api_versions):
888889
# The logic here is to check the list of supported request versions
889-
# in descending order. As soon as we find one that works, return it
890+
# in reverse order. As soon as we find one that works, return it
890891
test_cases = [
891892
# format (<broker verion>, <needed struct>)
892893
((0, 11, 0), MetadataRequest[4]),

kafka/consumer/fetcher.py

+180-61
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
from kafka.metrics.stats import Avg, Count, Max, Rate
1515
from kafka.protocol.fetch import FetchRequest
1616
from kafka.protocol.message import PartialMessage
17-
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
17+
from kafka.protocol.offset import (
18+
OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
19+
)
1820
from kafka.serializer import Deserializer
19-
from kafka.structs import TopicPartition
21+
from kafka.structs import TopicPartition, OffsetAndTimestamp
2022

2123
log = logging.getLogger(__name__)
2224

@@ -48,6 +50,7 @@ class Fetcher(six.Iterator):
4850
'iterator_refetch_records': 1, # undocumented -- interface may change
4951
'metric_group_prefix': 'consumer',
5052
'api_version': (0, 8, 0),
53+
'retry_backoff_ms': 100
5154
}
5255

5356
def __init__(self, client, subscriptions, metrics, **configs):
@@ -180,6 +183,31 @@ def update_fetch_positions(self, partitions):
180183
" offset %s", tp, committed)
181184
self._subscriptions.seek(tp, committed)
182185

186+
def get_offsets_by_times(self, timestamps, timeout_ms):
187+
offsets = self._retrieve_offsets(timestamps, timeout_ms)
188+
for tp in timestamps:
189+
if tp not in offsets:
190+
offsets[tp] = None
191+
else:
192+
offset, timestamp = offsets[tp]
193+
offsets[tp] = OffsetAndTimestamp(offset, timestamp)
194+
return offsets
195+
196+
def beginning_offsets(self, partitions, timeout_ms):
197+
return self.beginning_or_end_offset(
198+
partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
199+
200+
def end_offsets(self, partitions, timeout_ms):
201+
return self.beginning_or_end_offset(
202+
partitions, OffsetResetStrategy.LATEST, timeout_ms)
203+
204+
def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
205+
timestamps = dict([(tp, timestamp) for tp in partitions])
206+
offsets = self._retrieve_offsets(timestamps, timeout_ms)
207+
for tp in timestamps:
208+
offsets[tp] = offsets[tp][0]
209+
return offsets
210+
183211
def _reset_offset(self, partition):
184212
"""Reset offsets for the given partition using the offset reset strategy.
185213
@@ -199,40 +227,64 @@ def _reset_offset(self, partition):
199227

200228
log.debug("Resetting offset for partition %s to %s offset.",
201229
partition, strategy)
202-
offset = self._offset(partition, timestamp)
230+
offsets = self._retrieve_offsets({partition: timestamp})
231+
if partition not in offsets:
232+
raise NoOffsetForPartitionError(partition)
233+
offset = offsets[partition][0]
203234

204235
# we might lose the assignment while fetching the offset,
205236
# so check it is still active
206237
if self._subscriptions.is_assigned(partition):
207238
self._subscriptions.seek(partition, offset)
208239

209-
def _offset(self, partition, timestamp):
210-
"""Fetch a single offset before the given timestamp for the partition.
240+
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
241+
"""Fetch offset for each partition passed in ``timestamps`` map.
211242
212-
Blocks until offset is obtained, or a non-retriable exception is raised
243+
Blocks until offsets are obtained, a non-retriable exception is raised
244+
or ``timeout_ms`` passed.
213245
214246
Arguments:
215-
partition The partition that needs fetching offset.
216-
timestamp (int): timestamp for fetching offset. -1 for the latest
217-
available, -2 for the earliest available. Otherwise timestamp
218-
is treated as epoch seconds.
247+
timestamps: {TopicPartition: int} dict with timestamps to fetch
248+
offsets by. -1 for the latest available, -2 for the earliest
249+
available. Otherwise timestamp is treated as epoch miliseconds.
219250
220251
Returns:
221-
int: message offset
252+
{TopicPartition: (int, int)}: Mapping of partition to
253+
retrieved offset and timestamp. If offset does not exist for
254+
the provided timestamp, that partition will be missing from
255+
this mapping.
222256
"""
223-
while True:
224-
future = self._send_offset_request(partition, timestamp)
225-
self._client.poll(future=future)
257+
if not timestamps:
258+
return {}
259+
260+
start_time = time.time()
261+
remaining_ms = timeout_ms
262+
while remaining_ms > 0:
263+
future = self._send_offset_requests(timestamps)
264+
self._client.poll(future=future, timeout_ms=remaining_ms)
226265

227266
if future.succeeded():
228267
return future.value
229-
230268
if not future.retriable():
231269
raise future.exception # pylint: disable-msg=raising-bad-type
232270

271+
elapsed_ms = (time.time() - start_time) * 1000
272+
remaining_ms = timeout_ms - elapsed_ms
273+
if remaining_ms < 0:
274+
break
275+
233276
if future.exception.invalid_metadata:
234277
refresh_future = self._client.cluster.request_update()
235-
self._client.poll(future=refresh_future, sleep=True)
278+
self._client.poll(
279+
future=refresh_future, sleep=True, timeout_ms=remaining_ms)
280+
else:
281+
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
282+
283+
elapsed_ms = (time.time() - start_time) * 1000
284+
remaining_ms = timeout_ms - elapsed_ms
285+
286+
raise Errors.KafkaTimeoutError(
287+
"Failed to get offsets by timestamps in %s ms" % timeout_ms)
236288

237289
def _raise_if_offset_out_of_range(self):
238290
"""Check FetchResponses for offset out of range.
@@ -576,73 +628,140 @@ def _deserialize(self, f, topic, bytes_):
576628
return f.deserialize(topic, bytes_)
577629
return f(bytes_)
578630

579-
def _send_offset_request(self, partition, timestamp):
580-
"""Fetch a single offset before the given timestamp for the partition.
631+
def _send_offset_requests(self, timestamps):
632+
"""Fetch offsets for each partition in timestamps dict. This may send
633+
request to multiple nodes, based on who is Leader for partition.
581634
582635
Arguments:
583-
partition (TopicPartition): partition that needs fetching offset
584-
timestamp (int): timestamp for fetching offset
636+
timestamps (dict): {TopicPartition: int} mapping of fetching
637+
timestamps.
585638
586639
Returns:
587-
Future: resolves to the corresponding offset
640+
Future: resolves to a mapping of retrieved offsets
588641
"""
589-
node_id = self._client.cluster.leader_for_partition(partition)
590-
if node_id is None:
591-
log.debug("Partition %s is unknown for fetching offset,"
592-
" wait for metadata refresh", partition)
593-
return Future().failure(Errors.StaleMetadata(partition))
594-
elif node_id == -1:
595-
log.debug("Leader for partition %s unavailable for fetching offset,"
596-
" wait for metadata refresh", partition)
597-
return Future().failure(Errors.LeaderNotAvailableError(partition))
598-
599-
request = OffsetRequest[0](
600-
-1, [(partition.topic, [(partition.partition, timestamp, 1)])]
601-
)
642+
timestamps_by_node = collections.defaultdict(dict)
643+
for partition, timestamp in six.iteritems(timestamps):
644+
node_id = self._client.cluster.leader_for_partition(partition)
645+
if node_id is None:
646+
self._client.add_topic(partition.topic)
647+
log.debug("Partition %s is unknown for fetching offset,"
648+
" wait for metadata refresh", partition)
649+
return Future().failure(Errors.StaleMetadata(partition))
650+
elif node_id == -1:
651+
log.debug("Leader for partition %s unavailable for fetching "
652+
"offset, wait for metadata refresh", partition)
653+
return Future().failure(
654+
Errors.LeaderNotAvailableError(partition))
655+
else:
656+
timestamps_by_node[node_id][partition] = timestamp
657+
658+
# Aggregate results until we have all
659+
list_offsets_future = Future()
660+
responses = []
661+
node_count = len(timestamps_by_node)
662+
663+
def on_success(value):
664+
responses.append(value)
665+
if len(responses) == node_count:
666+
offsets = {}
667+
for r in responses:
668+
offsets.update(r)
669+
list_offsets_future.success(offsets)
670+
671+
def on_fail(err):
672+
if not list_offsets_future.is_done:
673+
list_offsets_future.failure(err)
674+
675+
for node_id, timestamps in six.iteritems(timestamps_by_node):
676+
_f = self._send_offset_request(node_id, timestamps)
677+
_f.add_callback(on_success)
678+
_f.add_errback(on_fail)
679+
return list_offsets_future
680+
681+
def _send_offset_request(self, node_id, timestamps):
682+
by_topic = collections.defaultdict(list)
683+
for tp, timestamp in six.iteritems(timestamps):
684+
if self.config['api_version'] >= (0, 10, 1):
685+
data = (tp.partition, timestamp)
686+
else:
687+
data = (tp.partition, timestamp, 1)
688+
by_topic[tp.topic].append(data)
689+
690+
if self.config['api_version'] >= (0, 10, 1):
691+
request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
692+
else:
693+
request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))
694+
602695
# Client returns a future that only fails on network issues
603696
# so create a separate future and attach a callback to update it
604697
# based on response error codes
605698
future = Future()
699+
606700
_f = self._client.send(node_id, request)
607-
_f.add_callback(self._handle_offset_response, partition, future)
701+
_f.add_callback(self._handle_offset_response, future)
608702
_f.add_errback(lambda e: future.failure(e))
609703
return future
610704

611-
def _handle_offset_response(self, partition, future, response):
705+
def _handle_offset_response(self, future, response):
612706
"""Callback for the response of the list offset call above.
613707
614708
Arguments:
615-
partition (TopicPartition): The partition that was fetched
616709
future (Future): the future to update based on response
617710
response (OffsetResponse): response from the server
618711
619712
Raises:
620713
AssertionError: if response does not match partition
621714
"""
622-
topic, partition_info = response.topics[0]
623-
assert len(response.topics) == 1 and len(partition_info) == 1, (
624-
'OffsetResponse should only be for a single topic-partition')
625-
626-
part, error_code, offsets = partition_info[0]
627-
assert topic == partition.topic and part == partition.partition, (
628-
'OffsetResponse partition does not match OffsetRequest partition')
629-
630-
error_type = Errors.for_code(error_code)
631-
if error_type is Errors.NoError:
632-
assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
633-
offset = offsets[0]
634-
log.debug("Fetched offset %d for partition %s", offset, partition)
635-
future.success(offset)
636-
elif error_type in (Errors.NotLeaderForPartitionError,
637-
Errors.UnknownTopicOrPartitionError):
638-
log.debug("Attempt to fetch offsets for partition %s failed due"
639-
" to obsolete leadership information, retrying.",
640-
partition)
641-
future.failure(error_type(partition))
642-
else:
643-
log.warning("Attempt to fetch offsets for partition %s failed due to:"
644-
" %s", partition, error_type)
645-
future.failure(error_type(partition))
715+
timestamp_offset_map = {}
716+
for topic, part_data in response.topics:
717+
for partition_info in part_data:
718+
partition, error_code = partition_info[:2]
719+
partition = TopicPartition(topic, partition)
720+
error_type = Errors.for_code(error_code)
721+
if error_type is Errors.NoError:
722+
if response.API_VERSION == 0:
723+
offsets = partition_info[2]
724+
assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
725+
if not offsets:
726+
offset = UNKNOWN_OFFSET
727+
else:
728+
offset = offsets[0]
729+
log.debug("Handling v0 ListOffsetResponse response for %s. "
730+
"Fetched offset %s", partition, offset)
731+
if offset != UNKNOWN_OFFSET:
732+
timestamp_offset_map[partition] = (offset, None)
733+
else:
734+
timestamp, offset = partition_info[2:]
735+
log.debug("Handling ListOffsetResponse response for %s. "
736+
"Fetched offset %s, timestamp %s",
737+
partition, offset, timestamp)
738+
if offset != UNKNOWN_OFFSET:
739+
timestamp_offset_map[partition] = (offset, timestamp)
740+
elif error_type is Errors.UnsupportedForMessageFormatError:
741+
# The message format on the broker side is before 0.10.0,
742+
# we simply put None in the response.
743+
log.debug("Cannot search by timestamp for partition %s because the"
744+
" message format version is before 0.10.0", partition)
745+
elif error_type is Errors.NotLeaderForPartitionError:
746+
log.debug("Attempt to fetch offsets for partition %s failed due"
747+
" to obsolete leadership information, retrying.",
748+
partition)
749+
future.failure(error_type(partition))
750+
return
751+
elif error_type is Errors.UnknownTopicOrPartitionError:
752+
log.warn("Received unknown topic or partition error in ListOffset "
753+
"request for partition %s. The topic/partition " +
754+
"may not exist or the user may not have Describe access "
755+
"to it.", partition)
756+
future.failure(error_type(partition))
757+
return
758+
else:
759+
log.warning("Attempt to fetch offsets for partition %s failed due to:"
760+
" %s", partition, error_type)
761+
future.failure(error_type(partition))
762+
return
763+
if not future.is_done:
764+
future.success(timestamp_offset_map)
646765

647766
def _fetchable_partitions(self):
648767
fetchable = self._subscriptions.fetchable_partitions()

0 commit comments

Comments
 (0)