You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
bug recurrence:
1、At first, the kafka cluster running normally,and consumer was created at this time.
kafka cluster:
node id | node port
1 9092
2 9093
3 9094
consumer code:
from kafka import KafkaConsumer
class KafkaMonitor(object):
def __init__(self):
self.consumer = KafkaConsumer("btest", bootstrap_servers="172.20.68.142:9093")
def run(self):
for message in self.consumer:
print(str(message))
if __name__ == "__main__":
KafkaMonitor().run()
2、Secondly, the kafka cluster was killed, all node services have been terminated。
Then, I tried to restart the service, but the node ID and the corresponding port information changed。
kafka cluster:
node id | node port
1 9094
2 9092
3 9093
3、The consumer process CPU occupies 100%
bug found:
I found that, when kafka cluster restart, the cluster metadata has not been refreshed, but the broker node can establish TCP connection normally. self._conn directly caches the current TCP link that does not match the node ID. The consumer get the node id and send message ( self.epoll.poll(), Kafka Fetch V4 Request ), and get results quickly (Not leader For Partition).
bug fix:
file: kafka/client_async.py
def poll(self, timeout_ms=None, future=None):
...
# check the ip\port in conn are consistent with ip\port in metadata
for node_id, conn in self._conns.items():
broker = self.cluster.broker_metadata(conn.node_id)
if broker is None:
self.close(node_id)
continue
host, _, afi = get_ip_port_afi(broker.host)
if conn.host != host or conn.port != broker.port:
self.close(node_id)
continue
Hello,
bug recurrence:
1、At first, the kafka cluster running normally,and consumer was created at this time.
kafka cluster:
node id | node port
1 9092
2 9093
3 9094
consumer code:
2、Secondly, the kafka cluster was killed, all node services have been terminated。
Then, I tried to restart the service, but the node ID and the corresponding port information changed。
kafka cluster:
node id | node port
1 9094
2 9092
3 9093
3、The consumer process CPU occupies 100%

bug found:
I found that, when kafka cluster restart, the cluster metadata has not been refreshed, but the broker node can establish TCP connection normally. self._conn directly caches the current TCP link that does not match the node ID. The consumer get the node id and send message ( self.epoll.poll(), Kafka Fetch V4 Request ), and get results quickly (Not leader For Partition).
bug fix:

file: kafka/client_async.py
After testing, the bug has been resolved, please check the solution(code) whether such fixed code can be merged into next version?
thanks
The text was updated successfully, but these errors were encountered: