-
Notifications
You must be signed in to change notification settings - Fork 1.4k
CommitFailedError when there are very few messages to be read from topic #2610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
And is this also with 0 timeout_ms? Can you also provide the parameters you use to configure KafkaConsumer? |
I think they are only seeing clients that implement KIP-511. That feature requires kafka-python 2.1.3 |
Yes, this has always been happening, with 0 timeout_ms in 2.0.2 or now with positive timeout_ms and 2.2.3 . Hereafter the code for consumer creation and values for the most relevant parameters consumer = kafka.KafkaConsumer(
*kafka_topics,
bootstrap_servers=kafka_servers, client_id='\\'+hostname+'.'+process_name, group_id=kafka_group,
security_protocol=kafka_security_protocol, sasl_mechanism=kafka_sasl_mechanism, sasl_plain_username=kafka_sasl_username, sasl_plain_password=kafka_sasl_password,
ssl_check_hostname=False,
ssl_cafile=kafka_ssl_ca,
ssl_certfile=kafka_ssl_cert,
ssl_keyfile=kafka_ssl_key,
ssl_password=kafka_ssl_key_passphrase,
value_deserializer=lambda m: m.decode('utf-8'),
enable_auto_commit=False,
receive_buffer_bytes=None
send_buffer_bytes=None
fetch_max_wait_ms=2000
fetch_min_bytes=100000
max_poll_records=500
heartbeat_interval_ms=3000
session_timeout_ms=300000
api_version=None # or (2,6,0) as a workaround to temporary AAAA timeouts on DNS
metrics_enabled=False #introduced together with 2.2.3
) Also poll() gets called every 5 seconds when no message is found or immediately after completing the processing of previously polled messages (one or a few hundredths of a second per message). Again, this issue arises only when we have very few messages, e.g. one message every so many minutes, it is not a real problem since we handle it with manual commits but we find it strange since our consumer instances do not change over time. |
CommitFailed can happen if the OffsetCommitRequest is received by the broker during the middle of a group rebalance. There can be any number of reasons that you might get group rebalances, and generally speaking having a commit fail during a rebalance is fine so long as the consumer is able to rejoin the group normally and continue processing messages. The next offset commit after rebalance should be accepted. CommitFailed can also happen if the group rebalances without this consumer. That usually means something is wrong with timings. The consumer should notice a group rebalance via the heartbeat thread. If it doesn't then you'd want to look more closely to figure out why. The default CommitFailed error messages suggests Unfortunately the error message right now does not say whether it's a (1) rebalance in progress error, or (2) stale member generation error. I can make some changes to make that difference more clear. Are you able to identify which type of commit failure you're getting? |
Also note that in kafka "commits" are not really associated with individual messages like you might get in a message queue. Instead the commit is more like an array pointer. So if you fail to commit 1 but later commit 2 successfully, then after 2 is committed msg 1 will also be considered committed. |
This CommitFailedError arises when there are very few messages to be read from topic ... I've experiencing it since my early use of kafka-python (release 2.0.2) and it is still present with latest 2.2.3 .
Since I need to process every message only once, this led me to disable autocommit and do instead a manual commit after each poll returning some messages, then process the read messages.
In the below example, "no record available" gets printed after each poll with no returned messages, so you can see poll() gets called every 5 seconds (or immediately after processing the last message).
max_poll_interval_ms is left to default value (300 seconds I think) and not even provided as a parameter to my Consumer.
Not sure if it has to do with the above behaviour, but at least two Kafka brokers administrators told me they do not see my client registered ... even if my client is regularly polling and processing messages.
The text was updated successfully, but these errors were encountered: