Skip to content

Bug: KafkaConsumer process CPU occupies 100% when kafka cluster restart #2272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
shijie3 opened this issue Oct 21, 2021 · 1 comment
Closed

Comments

@shijie3
Copy link

shijie3 commented Oct 21, 2021

Hello,

bug recurrence:
1、At first, the kafka cluster running normally,and consumer was created at this time.

kafka cluster:
node id | node port
1 9092
2 9093
3 9094

consumer code:

from kafka import KafkaConsumer
class KafkaMonitor(object):
    def __init__(self):
        self.consumer = KafkaConsumer("btest", bootstrap_servers="172.20.68.142:9093")

    def run(self):
        for message in self.consumer:
            print(str(message))
if __name__ == "__main__":
    KafkaMonitor().run()

2、Secondly, the kafka cluster was killed, all node services have been terminated。
Then, I tried to restart the service, but the node ID and the corresponding port information changed。

kafka cluster:
node id | node port
1 9094
2 9092
3 9093

3、The consumer process CPU occupies 100%
image

bug found:
I found that, when kafka cluster restart, the cluster metadata has not been refreshed, but the broker node can establish TCP connection normally. self._conn directly caches the current TCP link that does not match the node ID. The consumer get the node id and send message ( self.epoll.poll(), Kafka Fetch V4 Request ), and get results quickly (Not leader For Partition).

bug fix:
file: kafka/client_async.py
image

image

def poll(self, timeout_ms=None, future=None):
...
  # check the ip\port in conn are consistent with ip\port in metadata
  for node_id, conn in self._conns.items():
      broker = self.cluster.broker_metadata(conn.node_id)
      if broker is None:
          self.close(node_id)
          continue
  
      host, _, afi = get_ip_port_afi(broker.host)
      if conn.host != host or conn.port != broker.port:
          self.close(node_id)
          continue
def _register_send_sockets(self):
    while self._sending:
        conn = self._sending.pop()
        if conn.disconnected():
            continue
        try:
            key = self._selector.get_key(conn._sock)
            events = key.events | selectors.EVENT_WRITE
            self._selector.modify(key.fileobj, events, key.data)
        except KeyError:
            self._selector.register(conn._sock, selectors.EVENT_WRITE, conn)

After testing, the bug has been resolved, please check the solution(code) whether such fixed code can be merged into next version?

thanks

@dpkp
Copy link
Owner

dpkp commented Feb 15, 2025

Fixed in #2480

@dpkp dpkp closed this as completed Feb 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants