Skip to content

Commit 46f9b1f

Browse files
ms7sdpkp
authored andcommitted
Sort partitions before calling partitioner (dpkp#905)
1 parent e828395 commit 46f9b1f

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

kafka/partitioner/default.py

+7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ class DefaultPartitioner(object):
1414
"""
1515
@classmethod
1616
def __call__(cls, key, all_partitions, available):
17+
"""
18+
Get the partition corresponding to key
19+
:param key: partitioning key
20+
:param all_partitions: list of all partitions sorted by partition ID
21+
:param available: list of available partitions in no particular order
22+
:return: one of the values from all_partitions or available
23+
"""
1724
if key is None:
1825
if available:
1926
return random.choice(available)

kafka/producer/kafka.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ def _partition(self, topic, partition, key, value,
625625
assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition'
626626
return partition
627627

628-
all_partitions = list(self._metadata.partitions_for_topic(topic))
628+
all_partitions = sorted(self._metadata.partitions_for_topic(topic))
629629
available = list(self._metadata.available_partitions_for_topic(topic))
630630
return self.config['partitioner'](serialized_key,
631631
all_partitions,

0 commit comments

Comments
 (0)