From 37c766a663a95370a11964a0c81be7249a0ecf53 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Tue, 6 Dec 2016 12:03:10 +0100 Subject: [PATCH 1/2] Sort partitions before calling partitioner Current partitioners assume that the partitions are sorted according to partition ID in all_partitions. However, this is not guaranteed in the KafkaProducer implementation as the values that are passed come from a set. Sets are not guaranteed to iterate values in any particular order, so we need to sort the values before passing them further along. Before this change, the code depended on internal implementation of Python interpreters. In CPython 3.5 and lower it seems that integers are returned in sorted order from sets so the code appears to work. In PyPy and CPython 3.6, sets and dictionaries preserve the order of insertions [1] which means that the code may not work in these environments (I have not tested this). As far as I could find, the order of partitions used in this case is the order that is returned by the broker, but the documentation does not say anything about partition order. [1] https://docs.python.org/3.6/whatsnew/3.6.html#whatsnew36-compactdict --- kafka/partitioner/default.py | 7 +++++++ kafka/producer/kafka.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/kafka/partitioner/default.py b/kafka/partitioner/default.py index 79205b672..087166c0f 100644 --- a/kafka/partitioner/default.py +++ b/kafka/partitioner/default.py @@ -14,6 +14,13 @@ class DefaultPartitioner(object): """ @classmethod def __call__(cls, key, all_partitions, available): + """ + Get the partition corresponding to key + :param key: partitioning key + :param all_partitions: list of all partitions sorted by partition ID + :param available: list of available partitions in no particular order + :return: one of the values from all_partitions or available + """ if key is None: if available: return random.choice(available) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 747f620dd..5bbaff6a8 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -625,7 +625,7 @@ def _partition(self, topic, partition, key, value, assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition' return partition - all_partitions = list(self._metadata.partitions_for_topic(topic)) + all_partitions = sorted(list(self._metadata.partitions_for_topic(topic))) available = list(self._metadata.available_partitions_for_topic(topic)) return self.config['partitioner'](serialized_key, all_partitions, From 4003949ac969136c024338beb780231b392d7403 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Mon, 19 Dec 2016 09:05:10 +0100 Subject: [PATCH 2/2] Don't convert to list twice --- kafka/producer/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 5bbaff6a8..4c3152cc0 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -625,7 +625,7 @@ def _partition(self, topic, partition, key, value, assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition' return partition - all_partitions = sorted(list(self._metadata.partitions_for_topic(topic))) + all_partitions = sorted(self._metadata.partitions_for_topic(topic)) available = list(self._metadata.available_partitions_for_topic(topic)) return self.config['partitioner'](serialized_key, all_partitions,