-
Notifications
You must be signed in to change notification settings - Fork 1.4k
CommitFailedError when there are very few messages to be read from topic #2610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
And is this also with 0 timeout_ms? Can you also provide the parameters you use to configure KafkaConsumer? |
I think they are only seeing clients that implement KIP-511. That feature requires kafka-python 2.1.3 |
Yes, this has always been happening, with 0 timeout_ms in 2.0.2 or now with positive timeout_ms and 2.2.3 . Hereafter the code for consumer creation and values for the most relevant parameters consumer = kafka.KafkaConsumer(
*kafka_topics,
bootstrap_servers=kafka_servers, client_id='\\'+hostname+'.'+process_name, group_id=kafka_group,
security_protocol=kafka_security_protocol, sasl_mechanism=kafka_sasl_mechanism, sasl_plain_username=kafka_sasl_username, sasl_plain_password=kafka_sasl_password,
ssl_check_hostname=False,
ssl_cafile=kafka_ssl_ca,
ssl_certfile=kafka_ssl_cert,
ssl_keyfile=kafka_ssl_key,
ssl_password=kafka_ssl_key_passphrase,
value_deserializer=lambda m: m.decode('utf-8'),
enable_auto_commit=False,
receive_buffer_bytes=None
send_buffer_bytes=None
fetch_max_wait_ms=2000
fetch_min_bytes=100000
max_poll_records=500
heartbeat_interval_ms=3000
session_timeout_ms=300000
api_version=None # or (2,6,0) as a workaround to temporary AAAA timeouts on DNS
metrics_enabled=False #introduced together with 2.2.3
) Also poll() gets called every 5 seconds when no message is found or immediately after completing the processing of previously polled messages (one or a few hundredths of a second per message). Again, this issue arises only when we have very few messages, e.g. one message every so many minutes, it is not a real problem since we handle it with manual commits but we find it strange since our consumer instances do not change over time. |
CommitFailed can happen if the OffsetCommitRequest is received by the broker during the middle of a group rebalance. There can be any number of reasons that you might get group rebalances, and generally speaking having a commit fail during a rebalance is fine so long as the consumer is able to rejoin the group normally and continue processing messages. The next offset commit after rebalance should be accepted. CommitFailed can also happen if the group rebalances without this consumer. That usually means something is wrong with timings. The consumer should notice a group rebalance via the heartbeat thread. If it doesn't then you'd want to look more closely to figure out why. The default CommitFailed error messages suggests Unfortunately the error message right now does not say whether it's a (1) rebalance in progress error, or (2) stale member generation error. I can make some changes to make that difference more clear. Are you able to identify which type of commit failure you're getting? |
Also note that in kafka "commits" are not really associated with individual messages like you might get in a message queue. Instead the commit is more like an array pointer. So if you fail to commit 1 but later commit 2 successfully, then after 2 is committed msg 1 will also be considered committed. |
I reverted back from release 2.2.4 to 2.0.6 because of some serious issue with cpu consumption on producers (100% usage almost immediately with little or no message to be sent) but this is another issue I still have to understand better... regarding the CommitFailedError I made some more testing in our lab with 2.0.6 and understood that it happens whenever no message is available within the I tried varying the session_timeout_ms from 10.000 to 300.000 and always got the same result with commit error when there is no message to be consumed within the session timeout .. otherwise if at least one message is available inside the session interval it runs without any error. Hereafter a piece of kafka log :
|
Thanks for the additional notes. I think I've found a race condition between the heartbeat thread enable/disable and the join group callbacks that can be triggered with zero or very small timeout_ms. |
Release 2.2.6 - Rebalancing issue when no message within session_timeout_ms is still present. My consumer program log
Kafka logging at INFO level
|
The more interesting logs would be before this. What is happening (or not happening) to cause "Heartbeat session expired, marking coordinator dead" ? Are you able to provide a simple test case to replicate? |
This CommitFailedError arises when there are very few messages to be read from topic ... I've experiencing it since my early use of kafka-python (release 2.0.2) and it is still present with latest 2.2.3 .
Since I need to process every message only once, this led me to disable autocommit and do instead a manual commit after each poll returning some messages, then process the read messages.
In the below example, "no record available" gets printed after each poll with no returned messages, so you can see poll() gets called every 5 seconds (or immediately after processing the last message).
max_poll_interval_ms is left to default value (300 seconds I think) and not even provided as a parameter to my Consumer.
Not sure if it has to do with the above behaviour, but at least two Kafka brokers administrators told me they do not see my client registered ... even if my client is regularly polling and processing messages.
The text was updated successfully, but these errors were encountered: