|
| 1 | +import collections |
| 2 | +import logging |
| 3 | + |
| 4 | +import six |
| 5 | + |
| 6 | +from .abstract import AbstractPartitionAssignor |
| 7 | +from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment |
| 8 | + |
| 9 | +log = logging.getLogger(__name__) |
| 10 | + |
| 11 | + |
| 12 | +class RangePartitionAssignor(AbstractPartitionAssignor): |
| 13 | + """ |
| 14 | + The range assignor works on a per-topic basis. For each topic, we lay out |
| 15 | + the available partitions in numeric order and the consumers in |
| 16 | + lexicographic order. We then divide the number of partitions by the total |
| 17 | + number of consumers to determine the number of partitions to assign to each |
| 18 | + consumer. If it does not evenly divide, then the first few consumers will |
| 19 | + have one extra partition. |
| 20 | +
|
| 21 | + For example, suppose there are two consumers C0 and C1, two topics t0 and |
| 22 | + t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, |
| 23 | + t0p2, t1p0, t1p1, and t1p2. |
| 24 | +
|
| 25 | + The assignment will be: |
| 26 | + C0: [t0p0, t0p1, t1p0, t1p1] |
| 27 | + C1: [t0p2, t1p2] |
| 28 | + """ |
| 29 | + name = 'range' |
| 30 | + version = 0 |
| 31 | + |
| 32 | + @classmethod |
| 33 | + def assign(cls, cluster, member_metadata): |
| 34 | + consumers_per_topic = collections.defaultdict(list) |
| 35 | + for member, metadata in six.iteritems(member_metadata): |
| 36 | + for topic in metadata.subscription: |
| 37 | + consumers_per_topic[topic].append(member) |
| 38 | + |
| 39 | + # construct {member_id: {topic: [partition, ...]}} |
| 40 | + assignment = collections.defaultdict(dict) |
| 41 | + |
| 42 | + for topic, consumers_for_topic in six.iteritems(consumers_per_topic): |
| 43 | + partitions = cluster.partitions_for_topic(topic) |
| 44 | + if partitions is None: |
| 45 | + log.warning('No partition metadata for topic %s', topic) |
| 46 | + continue |
| 47 | + partitions = sorted(list(partitions)) |
| 48 | + partitions_for_topic = len(partitions) |
| 49 | + consumers_for_topic.sort() |
| 50 | + |
| 51 | + partitions_per_consumer = len(partitions) // len(consumers_for_topic) |
| 52 | + consumers_with_extra = len(partitions) % len(consumers_for_topic) |
| 53 | + |
| 54 | + for i in range(len(consumers_for_topic)): |
| 55 | + start = partitions_per_consumer * i |
| 56 | + start += min(i, consumers_with_extra) |
| 57 | + length = partitions_per_consumer |
| 58 | + if not i + 1 > consumers_with_extra: |
| 59 | + length += 1 |
| 60 | + member = consumers_for_topic[i] |
| 61 | + assignment[member][topic] = partitions[start:start+length] |
| 62 | + |
| 63 | + protocol_assignment = {} |
| 64 | + for member_id in member_metadata: |
| 65 | + protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( |
| 66 | + cls.version, |
| 67 | + sorted(assignment[member_id].items()), |
| 68 | + b'') |
| 69 | + return protocol_assignment |
| 70 | + |
| 71 | + @classmethod |
| 72 | + def metadata(cls, topics): |
| 73 | + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') |
| 74 | + |
| 75 | + @classmethod |
| 76 | + def on_assignment(cls, assignment): |
| 77 | + pass |
0 commit comments