1
- from __future__ import absolute_import
1
+ from __future__ import absolute_import , division
2
2
3
3
import atexit
4
4
import copy
@@ -538,7 +538,7 @@ def close(self, timeout=None):
538
538
539
539
def partitions_for (self , topic ):
540
540
"""Returns set of all known partitions for the topic."""
541
- max_wait = self .config ['max_block_ms' ] / 1000.0
541
+ max_wait = self .config ['max_block_ms' ] / 1000
542
542
return self ._wait_on_metadata (topic , max_wait )
543
543
544
544
def _max_usable_produce_magic (self ):
@@ -596,19 +596,29 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
596
596
assert not (value is None and key is None ), 'Need at least one: key or value'
597
597
key_bytes = value_bytes = None
598
598
try :
599
- self ._wait_on_metadata (topic , self .config ['max_block_ms' ] / 1000.0 )
600
-
601
- key_bytes = self ._serialize (
602
- self .config ['key_serializer' ],
603
- topic , key )
604
- value_bytes = self ._serialize (
605
- self .config ['value_serializer' ],
606
- topic , value )
607
- assert type (key_bytes ) in (bytes , bytearray , memoryview , type (None ))
608
- assert type (value_bytes ) in (bytes , bytearray , memoryview , type (None ))
609
-
610
- partition = self ._partition (topic , partition , key , value ,
611
- key_bytes , value_bytes )
599
+ assigned_partition = None
600
+ elapsed = 0.0
601
+ begin = time .time ()
602
+ timeout = self .config ['max_block_ms' ] / 1000
603
+ while assigned_partition is None and elapsed < timeout :
604
+ elapsed = time .time () - begin
605
+ self ._wait_on_metadata (topic , timeout - elapsed )
606
+
607
+ key_bytes = self ._serialize (
608
+ self .config ['key_serializer' ],
609
+ topic , key )
610
+ value_bytes = self ._serialize (
611
+ self .config ['value_serializer' ],
612
+ topic , value )
613
+ assert type (key_bytes ) in (bytes , bytearray , memoryview , type (None ))
614
+ assert type (value_bytes ) in (bytes , bytearray , memoryview , type (None ))
615
+
616
+ assigned_partition = self ._partition (topic , partition , key , value ,
617
+ key_bytes , value_bytes )
618
+ if assigned_partition is None :
619
+ raise Errors .KafkaTimeoutError ("Failed to assign partition for message after %s secs." % timeout )
620
+ else :
621
+ partition = assigned_partition
612
622
613
623
if headers is None :
614
624
headers = []
@@ -710,6 +720,10 @@ def _wait_on_metadata(self, topic, max_wait):
710
720
if partitions is not None :
711
721
return partitions
712
722
723
+ if elapsed >= max_wait :
724
+ raise Errors .KafkaTimeoutError (
725
+ "Failed to update metadata after %.1f secs." % (max_wait ,))
726
+
713
727
if not metadata_event :
714
728
metadata_event = threading .Event ()
715
729
@@ -720,13 +734,13 @@ def _wait_on_metadata(self, topic, max_wait):
720
734
future .add_both (lambda e , * args : e .set (), metadata_event )
721
735
self ._sender .wakeup ()
722
736
metadata_event .wait (max_wait - elapsed )
723
- elapsed = time .time () - begin
724
737
if not metadata_event .is_set ():
725
738
raise Errors .KafkaTimeoutError (
726
739
"Failed to update metadata after %.1f secs." % (max_wait ,))
727
740
elif topic in self ._metadata .unauthorized_topics :
728
741
raise Errors .TopicAuthorizationFailedError (topic )
729
742
else :
743
+ elapsed = time .time () - begin
730
744
log .debug ("_wait_on_metadata woke after %s secs." , elapsed )
731
745
732
746
def _serialize (self , f , topic , data ):
@@ -738,16 +752,18 @@ def _serialize(self, f, topic, data):
738
752
739
753
def _partition (self , topic , partition , key , value ,
740
754
serialized_key , serialized_value ):
755
+ all_partitions = self ._metadata .partitions_for_topic (topic )
756
+ available = self ._metadata .available_partitions_for_topic (topic )
757
+ if all_partitions is None or available is None :
758
+ return None
741
759
if partition is not None :
742
760
assert partition >= 0
743
- assert partition in self . _metadata . partitions_for_topic ( topic ) , 'Unrecognized partition'
761
+ assert partition in all_partitions , 'Unrecognized partition'
744
762
return partition
745
763
746
- all_partitions = sorted (self ._metadata .partitions_for_topic (topic ))
747
- available = list (self ._metadata .available_partitions_for_topic (topic ))
748
764
return self .config ['partitioner' ](serialized_key ,
749
- all_partitions ,
750
- available )
765
+ sorted ( all_partitions ) ,
766
+ list ( available ) )
751
767
752
768
def metrics (self , raw = False ):
753
769
"""Get metrics on producer performance.
0 commit comments