Open
Description
Hi team,
The latest release, 2.2.2, has a problem with the bootstrap socket being double unregistered.
This only happens occasionally. See the example logs below.
I think the most obvious issue here is that socket node_id=bootstrap-1
is unregistered twice in a row and the second time, it throws. Also, it is related to #177. We suspect #156 has some problem. The PR description does not provide any test about how it solves the 100% CPU usage. Should we revert that pr?
A simple fix could be adding a try catch block as below
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -637,7 +637,10 @@ class KafkaClient:
for key, events in ready:
if key.fileobj.fileno() < 0:
- self._selector.unregister(key.fileobj)
+ try:
+ self._selector.unregister(key.fileobj)
+ except KeyError:
+ pass
if key.fileobj is self._wake_r:
self._clear_wake_fd()
I am happy to raise a pr if it makes sense to you.
Example error Logs
"Running Kafka consumer for topics evergreen-production-1.admincoin.mutations with group id infra-consumer-group."
"Booting consumer lib.kafka.consumer.binlog_consumer.BinlogConsumer with pid: 1"
"Updating subscribed topics to: ('evergreen-production-1.admincoin.mutations',)"
"Consumer lib.kafka.consumer.binlog_consumer.BinlogConsumer starts running with pid: 1"
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.65.61', 9096)]>: connecting to b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.65.61', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.65.61', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.56.191', 9096)]>: connecting to b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.56.191', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.56.191', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.65.61', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.65.61', 9096)]>: Connection complete."
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: connecting to b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.94.126', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.56.191', 9096)]>: Connection complete."
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.56.191', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=1 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.65.61', 9096)]>: connecting to b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.65.61', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=1 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.65.61', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=3 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.56.191', 9096)]>: connecting to b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.56.191', 9096) IPv4]"
"<BrokerConnection node_id=3 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.56.191', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=1 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.65.61', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=1 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.65.61', 9096)]>: Connection complete."
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.94.126', 9096)]>: Closing connection. "
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connected> [IPv4 ('172.31.65.61', 9096)]>: Closing connection. "
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connected> [IPv4 ('172.31.56.191', 9096)]>: Closing connection. "
"<BrokerConnection node_id=2 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: connecting to b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.94.126', 9096) IPv4]"
"<BrokerConnection node_id=2 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: connecting to b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.94.126', 9096) IPv4]"
"<BrokerConnection node_id=3 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.56.191', 9096)]>: Connection complete."
"<BrokerConnection node_id=3 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.56.191', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: Closing connection. "
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: connecting to b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.94.126', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=2 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.94.126', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=2 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.94.126', 9096)]>: Connection complete."
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <handshake> [IPv4 ('172.31.94.126', 9096)]>: Closing connection. "
"Consumer error for group infra-consumer-group-INFRA-: Invalid file descriptor: -1"
Traceback (most recent call last):
File "/app/lib/kafka/consumer/base_consumer.py", line 197, in run
topic_to_msgs = self.kafka_consumer.poll( # pyright: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kafka/consumer/group.py", line 663, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kafka/consumer/group.py", line 685, in _poll_once
self._coordinator.poll()
File "/usr/local/lib/python3.11/site-packages/kafka/coordinator/consumer.py", line 274, in poll
self.ensure_coordinator_ready()
File "/usr/local/lib/python3.11/site-packages/kafka/coordinator/base.py", line 267, in ensure_coordinator_ready
self._client.poll(future=future)
File "/usr/local/lib/python3.11/site-packages/kafka/client_async.py", line 601, in poll
self._poll(timeout / 1000)
File "/usr/local/lib/python3.11/site-packages/kafka/client_async.py", line 640, in _poll
self._selector.unregister(key.fileobj)
File "/usr/local/lib/python3.11/selectors.py", line 366, in unregister
key = super().unregister(fileobj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/selectors.py", line 249, in unregister
key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/selectors.py", line 225, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/selectors.py", line 42, in _fileobj_to_fd
raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1