Skip to content

Commit e6b7d31

Browse files
authored
Attempt to fix metadata race condition when partitioning in producer.send (#2523)
1 parent 8770049 commit e6b7d31

File tree

1 file changed

+37
-21
lines changed

1 file changed

+37
-21
lines changed

kafka/producer/kafka.py

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from __future__ import absolute_import
1+
from __future__ import absolute_import, division
22

33
import atexit
44
import copy
@@ -538,7 +538,7 @@ def close(self, timeout=None):
538538

539539
def partitions_for(self, topic):
540540
"""Returns set of all known partitions for the topic."""
541-
max_wait = self.config['max_block_ms'] / 1000.0
541+
max_wait = self.config['max_block_ms'] / 1000
542542
return self._wait_on_metadata(topic, max_wait)
543543

544544
def _max_usable_produce_magic(self):
@@ -596,19 +596,29 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
596596
assert not (value is None and key is None), 'Need at least one: key or value'
597597
key_bytes = value_bytes = None
598598
try:
599-
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
600-
601-
key_bytes = self._serialize(
602-
self.config['key_serializer'],
603-
topic, key)
604-
value_bytes = self._serialize(
605-
self.config['value_serializer'],
606-
topic, value)
607-
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
608-
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))
609-
610-
partition = self._partition(topic, partition, key, value,
611-
key_bytes, value_bytes)
599+
assigned_partition = None
600+
elapsed = 0.0
601+
begin = time.time()
602+
timeout = self.config['max_block_ms'] / 1000
603+
while assigned_partition is None and elapsed < timeout:
604+
elapsed = time.time() - begin
605+
self._wait_on_metadata(topic, timeout - elapsed)
606+
607+
key_bytes = self._serialize(
608+
self.config['key_serializer'],
609+
topic, key)
610+
value_bytes = self._serialize(
611+
self.config['value_serializer'],
612+
topic, value)
613+
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
614+
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))
615+
616+
assigned_partition = self._partition(topic, partition, key, value,
617+
key_bytes, value_bytes)
618+
if assigned_partition is None:
619+
raise Errors.KafkaTimeoutError("Failed to assign partition for message after %s secs." % timeout)
620+
else:
621+
partition = assigned_partition
612622

613623
if headers is None:
614624
headers = []
@@ -710,6 +720,10 @@ def _wait_on_metadata(self, topic, max_wait):
710720
if partitions is not None:
711721
return partitions
712722

723+
if elapsed >= max_wait:
724+
raise Errors.KafkaTimeoutError(
725+
"Failed to update metadata after %.1f secs." % (max_wait,))
726+
713727
if not metadata_event:
714728
metadata_event = threading.Event()
715729

@@ -720,13 +734,13 @@ def _wait_on_metadata(self, topic, max_wait):
720734
future.add_both(lambda e, *args: e.set(), metadata_event)
721735
self._sender.wakeup()
722736
metadata_event.wait(max_wait - elapsed)
723-
elapsed = time.time() - begin
724737
if not metadata_event.is_set():
725738
raise Errors.KafkaTimeoutError(
726739
"Failed to update metadata after %.1f secs." % (max_wait,))
727740
elif topic in self._metadata.unauthorized_topics:
728741
raise Errors.TopicAuthorizationFailedError(topic)
729742
else:
743+
elapsed = time.time() - begin
730744
log.debug("_wait_on_metadata woke after %s secs.", elapsed)
731745

732746
def _serialize(self, f, topic, data):
@@ -738,16 +752,18 @@ def _serialize(self, f, topic, data):
738752

739753
def _partition(self, topic, partition, key, value,
740754
serialized_key, serialized_value):
755+
all_partitions = self._metadata.partitions_for_topic(topic)
756+
available = self._metadata.available_partitions_for_topic(topic)
757+
if all_partitions is None or available is None:
758+
return None
741759
if partition is not None:
742760
assert partition >= 0
743-
assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition'
761+
assert partition in all_partitions, 'Unrecognized partition'
744762
return partition
745763

746-
all_partitions = sorted(self._metadata.partitions_for_topic(topic))
747-
available = list(self._metadata.available_partitions_for_topic(topic))
748764
return self.config['partitioner'](serialized_key,
749-
all_partitions,
750-
available)
765+
sorted(all_partitions),
766+
list(available))
751767

752768
def metrics(self, raw=False):
753769
"""Get metrics on producer performance.

0 commit comments

Comments
 (0)