14
14
from kafka .metrics .stats import Avg , Count , Max , Rate
15
15
from kafka .protocol .fetch import FetchRequest
16
16
from kafka .protocol .message import PartialMessage
17
- from kafka .protocol .offset import OffsetRequest , OffsetResetStrategy
17
+ from kafka .protocol .offset import (
18
+ OffsetRequest , OffsetResetStrategy , UNKNOWN_OFFSET
19
+ )
18
20
from kafka .serializer import Deserializer
19
- from kafka .structs import TopicPartition
21
+ from kafka .structs import TopicPartition , OffsetAndTimestamp
20
22
21
23
log = logging .getLogger (__name__ )
22
24
@@ -48,6 +50,7 @@ class Fetcher(six.Iterator):
48
50
'iterator_refetch_records' : 1 , # undocumented -- interface may change
49
51
'metric_group_prefix' : 'consumer' ,
50
52
'api_version' : (0 , 8 , 0 ),
53
+ 'retry_backoff_ms' : 100
51
54
}
52
55
53
56
def __init__ (self , client , subscriptions , metrics , ** configs ):
@@ -180,6 +183,31 @@ def update_fetch_positions(self, partitions):
180
183
" offset %s" , tp , committed )
181
184
self ._subscriptions .seek (tp , committed )
182
185
186
+ def get_offsets_by_times (self , timestamps , timeout_ms ):
187
+ offsets = self ._retrieve_offsets (timestamps , timeout_ms )
188
+ for tp in timestamps :
189
+ if tp not in offsets :
190
+ offsets [tp ] = None
191
+ else :
192
+ offset , timestamp = offsets [tp ]
193
+ offsets [tp ] = OffsetAndTimestamp (offset , timestamp )
194
+ return offsets
195
+
196
+ def beginning_offsets (self , partitions , timeout_ms ):
197
+ return self .beginning_or_end_offset (
198
+ partitions , OffsetResetStrategy .EARLIEST , timeout_ms )
199
+
200
+ def end_offsets (self , partitions , timeout_ms ):
201
+ return self .beginning_or_end_offset (
202
+ partitions , OffsetResetStrategy .LATEST , timeout_ms )
203
+
204
+ def beginning_or_end_offset (self , partitions , timestamp , timeout_ms ):
205
+ timestamps = dict ([(tp , timestamp ) for tp in partitions ])
206
+ offsets = self ._retrieve_offsets (timestamps , timeout_ms )
207
+ for tp in timestamps :
208
+ offsets [tp ] = offsets [tp ][0 ]
209
+ return offsets
210
+
183
211
def _reset_offset (self , partition ):
184
212
"""Reset offsets for the given partition using the offset reset strategy.
185
213
@@ -199,40 +227,64 @@ def _reset_offset(self, partition):
199
227
200
228
log .debug ("Resetting offset for partition %s to %s offset." ,
201
229
partition , strategy )
202
- offset = self ._offset (partition , timestamp )
230
+ offsets = self ._retrieve_offsets ({partition : timestamp })
231
+ if partition not in offsets :
232
+ raise NoOffsetForPartitionError (partition )
233
+ offset = offsets [partition ][0 ]
203
234
204
235
# we might lose the assignment while fetching the offset,
205
236
# so check it is still active
206
237
if self ._subscriptions .is_assigned (partition ):
207
238
self ._subscriptions .seek (partition , offset )
208
239
209
- def _offset (self , partition , timestamp ):
210
- """Fetch a single offset before the given timestamp for the partition .
240
+ def _retrieve_offsets (self , timestamps , timeout_ms = float ( "inf" ) ):
241
+ """Fetch offset for each partition passed in ``timestamps`` map .
211
242
212
- Blocks until offset is obtained, or a non-retriable exception is raised
243
+ Blocks until offsets are obtained, a non-retriable exception is raised
244
+ or ``timeout_ms`` passed.
213
245
214
246
Arguments:
215
- partition The partition that needs fetching offset.
216
- timestamp (int): timestamp for fetching offset. -1 for the latest
217
- available, -2 for the earliest available. Otherwise timestamp
218
- is treated as epoch seconds.
247
+ timestamps: {TopicPartition: int} dict with timestamps to fetch
248
+ offsets by. -1 for the latest available, -2 for the earliest
249
+ available. Otherwise timestamp is treated as epoch miliseconds.
219
250
220
251
Returns:
221
- int: message offset
252
+ {TopicPartition: (int, int)}: Mapping of partition to
253
+ retrieved offset and timestamp. If offset does not exist for
254
+ the provided timestamp, that partition will be missing from
255
+ this mapping.
222
256
"""
223
- while True :
224
- future = self ._send_offset_request (partition , timestamp )
225
- self ._client .poll (future = future )
257
+ if not timestamps :
258
+ return {}
259
+
260
+ start_time = time .time ()
261
+ remaining_ms = timeout_ms
262
+ while remaining_ms > 0 :
263
+ future = self ._send_offset_requests (timestamps )
264
+ self ._client .poll (future = future , timeout_ms = remaining_ms )
226
265
227
266
if future .succeeded ():
228
267
return future .value
229
-
230
268
if not future .retriable ():
231
269
raise future .exception # pylint: disable-msg=raising-bad-type
232
270
271
+ elapsed_ms = (time .time () - start_time ) * 1000
272
+ remaining_ms = timeout_ms - elapsed_ms
273
+ if remaining_ms < 0 :
274
+ break
275
+
233
276
if future .exception .invalid_metadata :
234
277
refresh_future = self ._client .cluster .request_update ()
235
- self ._client .poll (future = refresh_future , sleep = True )
278
+ self ._client .poll (
279
+ future = refresh_future , sleep = True , timeout_ms = remaining_ms )
280
+ else :
281
+ time .sleep (self .config ['retry_backoff_ms' ] / 1000.0 )
282
+
283
+ elapsed_ms = (time .time () - start_time ) * 1000
284
+ remaining_ms = timeout_ms - elapsed_ms
285
+
286
+ raise Errors .KafkaTimeoutError (
287
+ "Failed to get offsets by timestamps in %s ms" % timeout_ms )
236
288
237
289
def _raise_if_offset_out_of_range (self ):
238
290
"""Check FetchResponses for offset out of range.
@@ -576,73 +628,140 @@ def _deserialize(self, f, topic, bytes_):
576
628
return f .deserialize (topic , bytes_ )
577
629
return f (bytes_ )
578
630
579
- def _send_offset_request (self , partition , timestamp ):
580
- """Fetch a single offset before the given timestamp for the partition.
631
+ def _send_offset_requests (self , timestamps ):
632
+ """Fetch offsets for each partition in timestamps dict. This may send
633
+ request to multiple nodes, based on who is Leader for partition.
581
634
582
635
Arguments:
583
- partition (TopicPartition ): partition that needs fetching offset
584
- timestamp (int): timestamp for fetching offset
636
+ timestamps (dict ): {TopicPartition: int} mapping of fetching
637
+ timestamps.
585
638
586
639
Returns:
587
- Future: resolves to the corresponding offset
640
+ Future: resolves to a mapping of retrieved offsets
588
641
"""
589
- node_id = self ._client .cluster .leader_for_partition (partition )
590
- if node_id is None :
591
- log .debug ("Partition %s is unknown for fetching offset,"
592
- " wait for metadata refresh" , partition )
593
- return Future ().failure (Errors .StaleMetadata (partition ))
594
- elif node_id == - 1 :
595
- log .debug ("Leader for partition %s unavailable for fetching offset,"
596
- " wait for metadata refresh" , partition )
597
- return Future ().failure (Errors .LeaderNotAvailableError (partition ))
598
-
599
- request = OffsetRequest [0 ](
600
- - 1 , [(partition .topic , [(partition .partition , timestamp , 1 )])]
601
- )
642
+ timestamps_by_node = collections .defaultdict (dict )
643
+ for partition , timestamp in six .iteritems (timestamps ):
644
+ node_id = self ._client .cluster .leader_for_partition (partition )
645
+ if node_id is None :
646
+ self ._client .add_topic (partition .topic )
647
+ log .debug ("Partition %s is unknown for fetching offset,"
648
+ " wait for metadata refresh" , partition )
649
+ return Future ().failure (Errors .StaleMetadata (partition ))
650
+ elif node_id == - 1 :
651
+ log .debug ("Leader for partition %s unavailable for fetching "
652
+ "offset, wait for metadata refresh" , partition )
653
+ return Future ().failure (
654
+ Errors .LeaderNotAvailableError (partition ))
655
+ else :
656
+ timestamps_by_node [node_id ][partition ] = timestamp
657
+
658
+ # Aggregate results until we have all
659
+ list_offsets_future = Future ()
660
+ responses = []
661
+ node_count = len (timestamps_by_node )
662
+
663
+ def on_success (value ):
664
+ responses .append (value )
665
+ if len (responses ) == node_count :
666
+ offsets = {}
667
+ for r in responses :
668
+ offsets .update (r )
669
+ list_offsets_future .success (offsets )
670
+
671
+ def on_fail (err ):
672
+ if not list_offsets_future .is_done :
673
+ list_offsets_future .failure (err )
674
+
675
+ for node_id , timestamps in six .iteritems (timestamps_by_node ):
676
+ _f = self ._send_offset_request (node_id , timestamps )
677
+ _f .add_callback (on_success )
678
+ _f .add_errback (on_fail )
679
+ return list_offsets_future
680
+
681
+ def _send_offset_request (self , node_id , timestamps ):
682
+ by_topic = collections .defaultdict (list )
683
+ for tp , timestamp in six .iteritems (timestamps ):
684
+ if self .config ['api_version' ] >= (0 , 10 , 1 ):
685
+ data = (tp .partition , timestamp )
686
+ else :
687
+ data = (tp .partition , timestamp , 1 )
688
+ by_topic [tp .topic ].append (data )
689
+
690
+ if self .config ['api_version' ] >= (0 , 10 , 1 ):
691
+ request = OffsetRequest [1 ](- 1 , list (six .iteritems (by_topic )))
692
+ else :
693
+ request = OffsetRequest [0 ](- 1 , list (six .iteritems (by_topic )))
694
+
602
695
# Client returns a future that only fails on network issues
603
696
# so create a separate future and attach a callback to update it
604
697
# based on response error codes
605
698
future = Future ()
699
+
606
700
_f = self ._client .send (node_id , request )
607
- _f .add_callback (self ._handle_offset_response , partition , future )
701
+ _f .add_callback (self ._handle_offset_response , future )
608
702
_f .add_errback (lambda e : future .failure (e ))
609
703
return future
610
704
611
- def _handle_offset_response (self , partition , future , response ):
705
+ def _handle_offset_response (self , future , response ):
612
706
"""Callback for the response of the list offset call above.
613
707
614
708
Arguments:
615
- partition (TopicPartition): The partition that was fetched
616
709
future (Future): the future to update based on response
617
710
response (OffsetResponse): response from the server
618
711
619
712
Raises:
620
713
AssertionError: if response does not match partition
621
714
"""
622
- topic , partition_info = response .topics [0 ]
623
- assert len (response .topics ) == 1 and len (partition_info ) == 1 , (
624
- 'OffsetResponse should only be for a single topic-partition' )
625
-
626
- part , error_code , offsets = partition_info [0 ]
627
- assert topic == partition .topic and part == partition .partition , (
628
- 'OffsetResponse partition does not match OffsetRequest partition' )
629
-
630
- error_type = Errors .for_code (error_code )
631
- if error_type is Errors .NoError :
632
- assert len (offsets ) == 1 , 'Expected OffsetResponse with one offset'
633
- offset = offsets [0 ]
634
- log .debug ("Fetched offset %d for partition %s" , offset , partition )
635
- future .success (offset )
636
- elif error_type in (Errors .NotLeaderForPartitionError ,
637
- Errors .UnknownTopicOrPartitionError ):
638
- log .debug ("Attempt to fetch offsets for partition %s failed due"
639
- " to obsolete leadership information, retrying." ,
640
- partition )
641
- future .failure (error_type (partition ))
642
- else :
643
- log .warning ("Attempt to fetch offsets for partition %s failed due to:"
644
- " %s" , partition , error_type )
645
- future .failure (error_type (partition ))
715
+ timestamp_offset_map = {}
716
+ for topic , part_data in response .topics :
717
+ for partition_info in part_data :
718
+ partition , error_code = partition_info [:2 ]
719
+ partition = TopicPartition (topic , partition )
720
+ error_type = Errors .for_code (error_code )
721
+ if error_type is Errors .NoError :
722
+ if response .API_VERSION == 0 :
723
+ offsets = partition_info [2 ]
724
+ assert len (offsets ) <= 1 , 'Expected OffsetResponse with one offset'
725
+ if not offsets :
726
+ offset = UNKNOWN_OFFSET
727
+ else :
728
+ offset = offsets [0 ]
729
+ log .debug ("Handling v0 ListOffsetResponse response for %s. "
730
+ "Fetched offset %s" , partition , offset )
731
+ if offset != UNKNOWN_OFFSET :
732
+ timestamp_offset_map [partition ] = (offset , None )
733
+ else :
734
+ timestamp , offset = partition_info [2 :]
735
+ log .debug ("Handling ListOffsetResponse response for %s. "
736
+ "Fetched offset %s, timestamp %s" ,
737
+ partition , offset , timestamp )
738
+ if offset != UNKNOWN_OFFSET :
739
+ timestamp_offset_map [partition ] = (offset , timestamp )
740
+ elif error_type is Errors .UnsupportedForMessageFormatError :
741
+ # The message format on the broker side is before 0.10.0,
742
+ # we simply put None in the response.
743
+ log .debug ("Cannot search by timestamp for partition %s because the"
744
+ " message format version is before 0.10.0" , partition )
745
+ elif error_type is Errors .NotLeaderForPartitionError :
746
+ log .debug ("Attempt to fetch offsets for partition %s failed due"
747
+ " to obsolete leadership information, retrying." ,
748
+ partition )
749
+ future .failure (error_type (partition ))
750
+ return
751
+ elif error_type is Errors .UnknownTopicOrPartitionError :
752
+ log .warn ("Received unknown topic or partition error in ListOffset "
753
+ "request for partition %s. The topic/partition " +
754
+ "may not exist or the user may not have Describe access "
755
+ "to it." , partition )
756
+ future .failure (error_type (partition ))
757
+ return
758
+ else :
759
+ log .warning ("Attempt to fetch offsets for partition %s failed due to:"
760
+ " %s" , partition , error_type )
761
+ future .failure (error_type (partition ))
762
+ return
763
+ if not future .is_done :
764
+ future .success (timestamp_offset_map )
646
765
647
766
def _fetchable_partitions (self ):
648
767
fetchable = self ._subscriptions .fetchable_partitions ()
0 commit comments