From 5969c14a06dcc2b6468e414cfb6ed504ac25222c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Jun 2025 09:32:03 -0700 Subject: [PATCH 1/2] Avoid RuntimeError on mutated _completed_fetches deque in consumer fetcher --- kafka/consumer/fetcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index b083deb1a..b8153ce0e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -612,7 +612,8 @@ def _handle_list_offsets_response(self, future, response): def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() # do not fetch a partition if we have a pending fetch response to process - discard = {fetch.topic_partition for fetch in self._completed_fetches} + # use copy.copy to avoid runtimeerror on mutation from different thread + discard = {fetch.topic_partition for fetch in copy.copy(self._completed_fetches)} current = self._next_partition_records if current: discard.add(current.topic_partition) From 1847314764053c6cbf67ede34f83bff7a0c7e8f2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Jun 2025 10:20:44 -0700 Subject: [PATCH 2/2] use copy() --- kafka/consumer/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index b8153ce0e..1888d38bf 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -613,7 +613,7 @@ def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() # do not fetch a partition if we have a pending fetch response to process # use copy.copy to avoid runtimeerror on mutation from different thread - discard = {fetch.topic_partition for fetch in copy.copy(self._completed_fetches)} + discard = {fetch.topic_partition for fetch in self._completed_fetches.copy()} current = self._next_partition_records if current: discard.add(current.topic_partition)