Skip to content

Commit 9f0db5d

Browse files
committed
Merge pull request dpkp#550 from dpkp/range_assignor
Add RangePartitionAssignor (and use as default)
2 parents d5c05c8 + c8be93b commit 9f0db5d

File tree

7 files changed

+171
-13
lines changed

7 files changed

+171
-13
lines changed

kafka/consumer/group.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from kafka.consumer.fetcher import Fetcher
1111
from kafka.consumer.subscription_state import SubscriptionState
1212
from kafka.coordinator.consumer import ConsumerCoordinator
13+
from kafka.coordinator.assignors.range import RangePartitionAssignor
1314
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1415
from kafka.protocol.offset import OffsetResetStrategy
1516
from kafka.version import __version__
@@ -98,7 +99,8 @@ class KafkaConsumer(six.Iterator):
9899
brokers or partitions. Default: 300000
99100
partition_assignment_strategy (list): List of objects to use to
100101
distribute partition ownership amongst consumer instances when
101-
group management is used. Default: [RoundRobinPartitionAssignor]
102+
group management is used.
103+
Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
102104
heartbeat_interval_ms (int): The expected time in milliseconds
103105
between heartbeats to the consumer coordinator when using
104106
Kafka's group management feature. Heartbeats are used to ensure
@@ -146,7 +148,7 @@ class KafkaConsumer(six.Iterator):
146148
'auto_commit_interval_ms': 5000,
147149
'check_crcs': True,
148150
'metadata_max_age_ms': 5 * 60 * 1000,
149-
'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
151+
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
150152
'heartbeat_interval_ms': 3000,
151153
'session_timeout_ms': 30000,
152154
'send_buffer_bytes': 128 * 1024,

kafka/coordinator/assignors/range.py

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import collections
2+
import logging
3+
4+
import six
5+
6+
from .abstract import AbstractPartitionAssignor
7+
from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
8+
9+
log = logging.getLogger(__name__)
10+
11+
12+
class RangePartitionAssignor(AbstractPartitionAssignor):
13+
"""
14+
The range assignor works on a per-topic basis. For each topic, we lay out
15+
the available partitions in numeric order and the consumers in
16+
lexicographic order. We then divide the number of partitions by the total
17+
number of consumers to determine the number of partitions to assign to each
18+
consumer. If it does not evenly divide, then the first few consumers will
19+
have one extra partition.
20+
21+
For example, suppose there are two consumers C0 and C1, two topics t0 and
22+
t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
23+
t0p2, t1p0, t1p1, and t1p2.
24+
25+
The assignment will be:
26+
C0: [t0p0, t0p1, t1p0, t1p1]
27+
C1: [t0p2, t1p2]
28+
"""
29+
name = 'range'
30+
version = 0
31+
32+
@classmethod
33+
def assign(cls, cluster, member_metadata):
34+
consumers_per_topic = collections.defaultdict(list)
35+
for member, metadata in six.iteritems(member_metadata):
36+
for topic in metadata.subscription:
37+
consumers_per_topic[topic].append(member)
38+
39+
# construct {member_id: {topic: [partition, ...]}}
40+
assignment = collections.defaultdict(dict)
41+
42+
for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
43+
partitions = cluster.partitions_for_topic(topic)
44+
if partitions is None:
45+
log.warning('No partition metadata for topic %s', topic)
46+
continue
47+
partitions = sorted(list(partitions))
48+
partitions_for_topic = len(partitions)
49+
consumers_for_topic.sort()
50+
51+
partitions_per_consumer = len(partitions) // len(consumers_for_topic)
52+
consumers_with_extra = len(partitions) % len(consumers_for_topic)
53+
54+
for i in range(len(consumers_for_topic)):
55+
start = partitions_per_consumer * i
56+
start += min(i, consumers_with_extra)
57+
length = partitions_per_consumer
58+
if not i + 1 > consumers_with_extra:
59+
length += 1
60+
member = consumers_for_topic[i]
61+
assignment[member][topic] = partitions[start:start+length]
62+
63+
protocol_assignment = {}
64+
for member_id in member_metadata:
65+
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
66+
cls.version,
67+
sorted(assignment[member_id].items()),
68+
b'')
69+
return protocol_assignment
70+
71+
@classmethod
72+
def metadata(cls, topics):
73+
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
74+
75+
@classmethod
76+
def on_assignment(cls, assignment):
77+
pass

kafka/coordinator/assignors/roundrobin.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,22 @@
1212

1313

1414
class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
15+
"""
16+
The roundrobin assignor lays out all the available partitions and all the
17+
available consumers. It then proceeds to do a roundrobin assignment from
18+
partition to consumer. If the subscriptions of all consumer instances are
19+
identical, then the partitions will be uniformly distributed. (i.e., the
20+
partition ownership counts will be within a delta of exactly one across all
21+
consumers.)
22+
23+
For example, suppose there are two consumers C0 and C1, two topics t0 and
24+
t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
25+
t0p2, t1p0, t1p1, and t1p2.
26+
27+
The assignment will be:
28+
C0: [t0p0, t0p2, t1p1]
29+
C1: [t0p1, t1p0, t1p2]
30+
"""
1531
name = 'roundrobin'
1632
version = 0
1733

@@ -50,7 +66,7 @@ def assign(cls, cluster, member_metadata):
5066
for member_id in member_metadata:
5167
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
5268
cls.version,
53-
assignment[member_id].items(),
69+
sorted(assignment[member_id].items()),
5470
b'')
5571
return protocol_assignment
5672

kafka/coordinator/consumer.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import six
99

1010
from .base import BaseCoordinator
11+
from .assignors.range import RangePartitionAssignor
1112
from .assignors.roundrobin import RoundRobinPartitionAssignor
1213
from .protocol import (
1314
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
@@ -30,7 +31,7 @@ class ConsumerCoordinator(BaseCoordinator):
3031
'enable_auto_commit': True,
3132
'auto_commit_interval_ms': 5000,
3233
'default_offset_commit_callback': lambda offsets, response: True,
33-
'assignors': (RoundRobinPartitionAssignor,),
34+
'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
3435
'session_timeout_ms': 30000,
3536
'heartbeat_interval_ms': 3000,
3637
'retry_backoff_ms': 100,
@@ -54,7 +55,7 @@ def __init__(self, client, subscription, **configs):
5455
trigger custom actions when a commit request completes.
5556
assignors (list): List of objects to use to distribute partition
5657
ownership amongst consumer instances when group management is
57-
used. Default: [RoundRobinPartitionAssignor]
58+
used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
5859
heartbeat_interval_ms (int): The expected time in milliseconds
5960
between heartbeats to the consumer coordinator when using
6061
Kafka's group management feature. Heartbeats are used to ensure

kafka/coordinator/protocol.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ def partitions(self):
2828

2929
class ConsumerProtocol(object):
3030
PROTOCOL_TYPE = 'consumer'
31-
ASSIGNMENT_STRATEGIES = ('roundrobin',)
31+
ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
3232
METADATA = ConsumerProtocolMemberMetadata
3333
ASSIGNMENT = ConsumerProtocolMemberAssignment

test/test_assignors.py

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# pylint: skip-file
2+
from __future__ import absolute_import
3+
4+
import pytest
5+
6+
from kafka.coordinator.assignors.range import RangePartitionAssignor
7+
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
8+
from kafka.coordinator.protocol import (
9+
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
10+
11+
12+
@pytest.fixture
13+
def cluster(mocker):
14+
cluster = mocker.MagicMock()
15+
cluster.partitions_for_topic.return_value = set([0, 1, 2])
16+
return cluster
17+
18+
19+
def test_assignor_roundrobin(cluster):
20+
assignor = RoundRobinPartitionAssignor
21+
22+
member_metadata = {
23+
'C0': assignor.metadata(set(['t0', 't1'])),
24+
'C1': assignor.metadata(set(['t0', 't1'])),
25+
}
26+
27+
ret = assignor.assign(cluster, member_metadata)
28+
expected = {
29+
'C0': ConsumerProtocolMemberAssignment(
30+
assignor.version, [('t0', [0, 2]), ('t1', [1])], b''),
31+
'C1': ConsumerProtocolMemberAssignment(
32+
assignor.version, [('t0', [1]), ('t1', [0, 2])], b'')
33+
}
34+
assert ret == expected
35+
assert set(ret) == set(expected)
36+
for member in ret:
37+
assert ret[member].encode() == expected[member].encode()
38+
39+
40+
def test_assignor_range(cluster):
41+
assignor = RangePartitionAssignor
42+
43+
member_metadata = {
44+
'C0': assignor.metadata(set(['t0', 't1'])),
45+
'C1': assignor.metadata(set(['t0', 't1'])),
46+
}
47+
48+
ret = assignor.assign(cluster, member_metadata)
49+
expected = {
50+
'C0': ConsumerProtocolMemberAssignment(
51+
assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''),
52+
'C1': ConsumerProtocolMemberAssignment(
53+
assignor.version, [('t0', [2]), ('t1', [2])], b'')
54+
}
55+
assert ret == expected
56+
assert set(ret) == set(expected)
57+
for member in ret:
58+
assert ret[member].encode() == expected[member].encode()

test/test_coordinator.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from kafka.common import TopicPartition, OffsetAndMetadata
88
from kafka.consumer.subscription_state import (
99
SubscriptionState, ConsumerRebalanceListener)
10+
from kafka.coordinator.assignors.range import RangePartitionAssignor
1011
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1112
from kafka.coordinator.consumer import ConsumerCoordinator
1213
from kafka.coordinator.protocol import (
@@ -72,13 +73,16 @@ def test_group_protocols(coordinator):
7273
assert False, 'Exception not raised when expected'
7374

7475
coordinator._subscription.subscribe(topics=['foobar'])
75-
assert coordinator.group_protocols() == [(
76-
'roundrobin',
77-
ConsumerProtocolMemberMetadata(
76+
assert coordinator.group_protocols() == [
77+
('range', ConsumerProtocolMemberMetadata(
78+
RangePartitionAssignor.version,
79+
['foobar'],
80+
b'')),
81+
('roundrobin', ConsumerProtocolMemberMetadata(
7882
RoundRobinPartitionAssignor.version,
7983
['foobar'],
80-
b'')
81-
)]
84+
b'')),
85+
]
8286

8387

8488
@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)])
@@ -113,8 +117,8 @@ def test_pattern_subscription(coordinator, api_version):
113117

114118

115119
def test_lookup_assignor(coordinator):
116-
assignor = coordinator._lookup_assignor('roundrobin')
117-
assert assignor is RoundRobinPartitionAssignor
120+
assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor
121+
assert coordinator._lookup_assignor('range') is RangePartitionAssignor
118122
assert coordinator._lookup_assignor('foobar') is None
119123

120124

0 commit comments

Comments
 (0)