Skip to content

Commit b8717b4

Browse files
barrotsteindevdpkp
authored andcommitted
Update Partitioners for use with KafkaProducer (dpkp#827)
1 parent 5c78489 commit b8717b4

File tree

5 files changed

+112
-24
lines changed

5 files changed

+112
-24
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ servers/*/resources/ssl*
1212
docs/_build
1313
.cache*
1414
.idea/
15+
integration-test/
16+
tests-env/

kafka/partitioner/base.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,23 @@ class Partitioner(object):
55
"""
66
Base class for a partitioner
77
"""
8-
def __init__(self, partitions):
8+
def __init__(self, partitions=None):
99
"""
1010
Initialize the partitioner
1111
1212
Arguments:
13-
partitions: A list of available partitions (during startup)
13+
partitions: A list of available partitions (during startup) OPTIONAL.
1414
"""
1515
self.partitions = partitions
1616

17-
def partition(self, key, partitions=None):
17+
def __call__(self, key, all_partitions=None, available_partitions=None):
1818
"""
19-
Takes a string key and num_partitions as argument and returns
19+
Takes a string key, num_partitions and available_partitions as argument and returns
2020
a partition to be used for the message
2121
2222
Arguments:
23-
key: the key to use for partitioning
24-
partitions: (optional) a list of partitions.
23+
key: the key to use for partitioning.
24+
all_partitions: a list of the topic's partitions.
25+
available_partitions: a list of the broker's currently avaliable partitions(optional).
2526
"""
2627
raise NotImplementedError('partition function has to be implemented')

kafka/partitioner/hashed.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ class Murmur2Partitioner(Partitioner):
1111
the hash of the key. Attempts to apply the same hashing
1212
function as mainline java client.
1313
"""
14+
def __call__(self, key, partitions=None, available=None):
15+
if available:
16+
return self.partition(key, available)
17+
return self.partition(key, partitions)
18+
1419
def partition(self, key, partitions=None):
1520
if not partitions:
1621
partitions = self.partitions
@@ -21,12 +26,15 @@ def partition(self, key, partitions=None):
2126
return partitions[idx]
2227

2328

24-
class LegacyPartitioner(Partitioner):
29+
class LegacyPartitioner(object):
2530
"""DEPRECATED -- See Issue 374
2631
2732
Implements a partitioner which selects the target partition based on
2833
the hash of the key
2934
"""
35+
def __init__(self, partitions):
36+
self.partitions = partitions
37+
3038
def partition(self, key, partitions=None):
3139
if not partitions:
3240
partitions = self.partitions

kafka/partitioner/roundrobin.py

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,70 @@
11
from __future__ import absolute_import
22

3-
from itertools import cycle
4-
53
from .base import Partitioner
64

75

86
class RoundRobinPartitioner(Partitioner):
9-
"""
10-
Implements a round robin partitioner which sends data to partitions
11-
in a round robin fashion
12-
"""
13-
def __init__(self, partitions):
14-
super(RoundRobinPartitioner, self).__init__(partitions)
15-
self.iterpart = cycle(partitions)
16-
17-
def _set_partitions(self, partitions):
7+
def __init__(self, partitions=None):
8+
self.partitions_iterable = CachedPartitionCycler(partitions)
9+
if partitions:
10+
self._set_partitions(partitions)
11+
else:
12+
self.partitions = None
13+
14+
def __call__(self, key, all_partitions=None, available_partitions=None):
15+
if available_partitions:
16+
cur_partitions = available_partitions
17+
else:
18+
cur_partitions = all_partitions
19+
if not self.partitions:
20+
self._set_partitions(cur_partitions)
21+
elif cur_partitions != self.partitions_iterable.partitions and cur_partitions is not None:
22+
self._set_partitions(cur_partitions)
23+
return next(self.partitions_iterable)
24+
25+
def _set_partitions(self, available_partitions):
26+
self.partitions = available_partitions
27+
self.partitions_iterable.set_partitions(available_partitions)
28+
29+
def partition(self, key, all_partitions=None, available_partitions=None):
30+
return self.__call__(key, all_partitions, available_partitions)
31+
32+
33+
class CachedPartitionCycler(object):
34+
def __init__(self, partitions=None):
1835
self.partitions = partitions
19-
self.iterpart = cycle(partitions)
36+
if partitions:
37+
assert type(partitions) is list
38+
self.cur_pos = None
2039

21-
def partition(self, key, partitions=None):
22-
# Refresh the partition list if necessary
23-
if partitions and self.partitions != partitions:
24-
self._set_partitions(partitions)
40+
def __next__(self):
41+
return self.next()
42+
43+
@staticmethod
44+
def _index_available(cur_pos, partitions):
45+
return cur_pos < len(partitions)
46+
47+
def set_partitions(self, partitions):
48+
if self.cur_pos:
49+
if not self._index_available(self.cur_pos, partitions):
50+
self.cur_pos = 0
51+
self.partitions = partitions
52+
return None
53+
54+
self.partitions = partitions
55+
next_item = self.partitions[self.cur_pos]
56+
if next_item in partitions:
57+
self.cur_pos = partitions.index(next_item)
58+
else:
59+
self.cur_pos = 0
60+
return None
61+
self.partitions = partitions
2562

26-
return next(self.iterpart)
63+
def next(self):
64+
assert self.partitions is not None
65+
if self.cur_pos is None or not self._index_available(self.cur_pos, self.partitions):
66+
self.cur_pos = 1
67+
return self.partitions[0]
68+
cur_item = self.partitions[self.cur_pos]
69+
self.cur_pos += 1
70+
return cur_item

test/test_partitioner.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from kafka.partitioner import Murmur2Partitioner
55
from kafka.partitioner.default import DefaultPartitioner
6+
from kafka.partitioner import RoundRobinPartitioner
67

78

89
def test_default_partitioner():
@@ -22,6 +23,38 @@ def test_default_partitioner():
2223
assert partitioner(None, all_partitions, []) in all_partitions
2324

2425

26+
def test_roundrobin_partitioner():
27+
partitioner = RoundRobinPartitioner()
28+
all_partitions = list(range(100))
29+
available = all_partitions
30+
# partitioner should cycle between partitions
31+
i = 0
32+
max_partition = all_partitions[len(all_partitions) - 1]
33+
while i <= max_partition:
34+
assert i == partitioner(None, all_partitions, available)
35+
i += 1
36+
37+
i = 0
38+
while i <= int(max_partition / 2):
39+
assert i == partitioner(None, all_partitions, available)
40+
i += 1
41+
42+
# test dynamic partition re-assignment
43+
available = available[:-25]
44+
45+
while i <= max(available):
46+
assert i == partitioner(None, all_partitions, available)
47+
i += 1
48+
49+
all_partitions = list(range(200))
50+
available = all_partitions
51+
52+
max_partition = all_partitions[len(all_partitions) - 1]
53+
while i <= max_partition:
54+
assert i == partitioner(None, all_partitions, available)
55+
i += 1
56+
57+
2558
def test_hash_bytes():
2659
p = Murmur2Partitioner(range(1000))
2760
assert p.partition(bytearray(b'test')) == p.partition(b'test')

0 commit comments

Comments
 (0)