Skip to content

Commit 40e2faa

Browse files
authored
Metadata with_partitions() (dpkp#787)
add method ClusterMetadata.with_partitions also fixup ClusterMetadata __str__
1 parent 99891ad commit 40e2faa

File tree

1 file changed

+20
-1
lines changed

1 file changed

+20
-1
lines changed

kafka/cluster.py

+20-1
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,25 @@ def add_group_coordinator(self, group, response):
326326
self._groups[group] = node_id
327327
return True
328328

329+
def with_partitions(self, partitions_to_add):
330+
"""Returns a copy of cluster metadata with partitions added"""
331+
new_metadata = ClusterMetadata(**self.config)
332+
new_metadata._brokers = copy.deepcopy(self._brokers)
333+
new_metadata._partitions = copy.deepcopy(self._partitions)
334+
new_metadata._broker_partitions = copy.deepcopy(self._broker_partitions)
335+
new_metadata._groups = copy.deepcopy(self._groups)
336+
new_metadata.internal_topics = copy.deepcopy(self.internal_topics)
337+
new_metadata.unauthorized_topics = copy.deepcopy(self.unauthorized_topics)
338+
339+
for partition in partitions_to_add:
340+
new_metadata._partitions[partition.topic][partition.partition] = partition
341+
342+
if partition.leader is not None and partition.leader != -1:
343+
new_metadata._broker_partitions[partition.leader].add(
344+
TopicPartition(partition.topic, partition.partition))
345+
346+
return new_metadata
347+
329348
def __str__(self):
330-
return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \
349+
return 'ClusterMetadata(brokers: %d, topics: %d, groups: %d)' % \
331350
(len(self._brokers), len(self._partitions), len(self._groups))

0 commit comments

Comments
 (0)