diff --git a/kafka/partitioner/fnv1a_32.py b/kafka/partitioner/fnv1a_32.py new file mode 100644 index 000000000..ae1b04496 --- /dev/null +++ b/kafka/partitioner/fnv1a_32.py @@ -0,0 +1,92 @@ +from __future__ import absolute_import + +import random + +from kafka.vendor import six + + +class FNV1a32Partitioner(object): + """Partitioner with FNV1a 32-bit hash algorithm. + + Hashes key to partition using FNV1a 32-bit hashing. + If key is None, selects partition randomly from available, + or from all partitions if none are currently available. + """ + @classmethod + def __call__(cls, key, all_partitions, available): + """ + Partitioner implementation using FNV1a 32-bit hashing function and + decimal conversion with two's complement. If key is passed with None + value, the selection of the partition is random. + + The implementation details are selected to make sure the same key + is mapped to the same partition in Goka/Sarama. It is confirmed + that this implementation works the same as the partitioner of + github.com/lovoo/goka v1.0.5 with Go version 1.16. + + Algorithm details: + http://www.isthe.com/chongo/tech/comp/fnv/#FNV-param + + :param key: partitioning key + :param all_partitions: list of all partitions sorted by partition ID + :param available: list of available partitions in no particular order + :return: one of the values from all_partitions or available + """ + if key is None: + if available: + return random.choice(available) + return random.choice(all_partitions) + + key_hash = _get_fnv1a_32(key) + key_hash = _get_twos_complement_32bit(key_hash) + key_hash = abs(key_hash) + idx = key_hash % len(all_partitions) + return all_partitions[idx] + + +def _get_twos_complement_32bit(value: int) -> int: + """ + Returns the signed two's complement decimal conversion. + + Algorithm details: + http://sandbox.mc.edu/~bennet/cs110/tc/tctod.html + + Taken from: + https://stackoverflow.com/questions/1604464/twos-complement-in-python + """ + bit_base = 32 + if (value & (1 << (bit_base - 1))) != 0: + value = value - (1 << bit_base) + return value + + +def _get_fnv1a_32(key: bytes) -> int: + """ + Returns the FNV1a 32bit hash of the given key. + + Algorithm details: + http://www.isthe.com/chongo/tech/comp/fnv/#FNV-param + + Taken from: + https://github.com/znerol/py-fnvhash/blob/master/fnvhash/__init__.py + """ + # We set the same init_offset and prime for the FNV hasher as + # defined in Golang FNV package. The Go FNV is the package Sarama + # uses for its hashing calculations under the hood. + # References: + # https://cs.opensource.google/go/go/+/refs/tags/go1.17.3:src/hash/fnv/fnv.go;l=31 + # https://cs.opensource.google/go/go/+/refs/tags/go1.17.3:src/hash/fnv/fnv.go;l=35 + init_offset = 0x811c9dc5 + prime = 0x01000193 + hash_size = 2 ** 32 + + # Python2 bytes is really a str, causing the bitwise operations below to fail + # so convert to bytearray. + if six.PY2: + key = bytearray(bytes(key)) + + key_hash = init_offset + for byte in key: + key_hash = key_hash ^ byte + key_hash = (key_hash * prime) % hash_size + return key_hash diff --git a/test/test_partitioner.py b/test/test_partitioner.py index 09fa0412a..c15bd9beb 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -4,6 +4,7 @@ from kafka.partitioner import DefaultPartitioner, murmur2 +from kafka.partitioner.fnv1a_32 import FNV1a32Partitioner, _get_twos_complement_32bit, _get_fnv1a_32 def test_default_partitioner(): @@ -37,3 +38,17 @@ def test_murmur2_not_ascii(): # Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode murmur2(b'\xa4') murmur2(b'\x81' * 1000) + + +@pytest.mark.parametrize("key,partitions,available,expected", [ + (b"123", [0, 1, 2], [0, 1, 2], 2), + (b"123", [0, 1], [0, 1], 1), + (b"123", [0], [0], 0), + (b"f232oo3232", [0, 1, 2, 3], [0, 1, 2, 3], 2), + (b"f232oo3232", [0, 1], [0, 1], 0), + (b"f232oo3232", [0], [0], 0), +]) +def test_fnv1a_32_partitioner(key, partitions, available, expected): + partitioner = FNV1a32Partitioner() + out = partitioner(key, partitions, available) + assert out == expected