Skip to content

CommitFailedError when there are very few messages to be read from topic #2610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
berrfred opened this issue May 2, 2025 · 5 comments · Fixed by #2614
Closed

CommitFailedError when there are very few messages to be read from topic #2610

berrfred opened this issue May 2, 2025 · 5 comments · Fixed by #2614

Comments

@berrfred
Copy link

berrfred commented May 2, 2025

This CommitFailedError arises when there are very few messages to be read from topic ... I've experiencing it since my early use of kafka-python (release 2.0.2) and it is still present with latest 2.2.3 .
Since I need to process every message only once, this led me to disable autocommit and do instead a manual commit after each poll returning some messages, then process the read messages.

In the below example, "no record available" gets printed after each poll with no returned messages, so you can see poll() gets called every 5 seconds (or immediately after processing the last message).

02/05/2025; 18:49:02.94; [I] $GMT22 ; no record available
02/05/2025; 18:49:08.09; [I] $GMT22 ; no record available
02/05/2025; 18:49:13.23; [I] $GMT22 ; no record available
02/05/2025; 18:49:18.33; [I] $GMT22 ; no record available
02/05/2025; 18:49:23.53; [I] $GMT22 ; no record available
02/05/2025; 18:49:28.70; [I] $GMT22 ; no record available
02/05/2025; 18:49:34.29; [E] $GMT22 ; read records - error handling polled messages - CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

02/05/2025; 18:49:39.39; [I] $GMT22 ; no record available
02/05/2025; 18:49:44.50; [I] $GMT22 ; no record available
02/05/2025; 18:49:49.51; [W] $GMT22 ; read records - committed (1)
02/05/2025; 18:49:49.51; [W] $GMT22 ; read|oamds-mtonline|1|189230|b'no_check_1000200ffff33gfff'|10000
02/05/2025; 18:49:49.68; [I] $GMT22 ; nowait response for tag 10000
02/05/2025; 18:49:49.79; [I] $GMT22 ; no record available

max_poll_interval_ms is left to default value (300 seconds I think) and not even provided as a parameter to my Consumer.

Not sure if it has to do with the above behaviour, but at least two Kafka brokers administrators told me they do not see my client registered ... even if my client is regularly polling and processing messages.

@dpkp
Copy link
Owner

dpkp commented May 2, 2025

And is this also with 0 timeout_ms? Can you also provide the parameters you use to configure KafkaConsumer?

@dpkp
Copy link
Owner

dpkp commented May 2, 2025

at least two Kafka brokers administrators told me they do not see my client registered

I think they are only seeing clients that implement KIP-511. That feature requires kafka-python 2.1.3

@berrfred
Copy link
Author

berrfred commented May 3, 2025

And is this also with 0 timeout_ms? Can you also provide the parameters you use to configure KafkaConsumer?

Yes, this has always been happening, with 0 timeout_ms in 2.0.2 or now with positive timeout_ms and 2.2.3 .

Hereafter the code for consumer creation and values for the most relevant parameters

consumer = kafka.KafkaConsumer(
	*kafka_topics,
	bootstrap_servers=kafka_servers, client_id='\\'+hostname+'.'+process_name, group_id=kafka_group,
	security_protocol=kafka_security_protocol, sasl_mechanism=kafka_sasl_mechanism, sasl_plain_username=kafka_sasl_username, sasl_plain_password=kafka_sasl_password,
	ssl_check_hostname=False,
	ssl_cafile=kafka_ssl_ca,
	ssl_certfile=kafka_ssl_cert,
	ssl_keyfile=kafka_ssl_key,
	ssl_password=kafka_ssl_key_passphrase,
	value_deserializer=lambda m: m.decode('utf-8'),
	enable_auto_commit=False,

	receive_buffer_bytes=None
	send_buffer_bytes=None
	fetch_max_wait_ms=2000
	fetch_min_bytes=100000
	max_poll_records=500
	heartbeat_interval_ms=3000
	session_timeout_ms=300000
	api_version=None # or (2,6,0) as a workaround to temporary AAAA timeouts on DNS
	metrics_enabled=False #introduced together with 2.2.3
)

Also poll() gets called every 5 seconds when no message is found or immediately after completing the processing of previously polled messages (one or a few hundredths of a second per message).

Again, this issue arises only when we have very few messages, e.g. one message every so many minutes, it is not a real problem since we handle it with manual commits but we find it strange since our consumer instances do not change over time.

@dpkp
Copy link
Owner

dpkp commented May 4, 2025

CommitFailed can happen if the OffsetCommitRequest is received by the broker during the middle of a group rebalance. There can be any number of reasons that you might get group rebalances, and generally speaking having a commit fail during a rebalance is fine so long as the consumer is able to rejoin the group normally and continue processing messages. The next offset commit after rebalance should be accepted.

CommitFailed can also happen if the group rebalances without this consumer. That usually means something is wrong with timings. The consumer should notice a group rebalance via the heartbeat thread. If it doesn't then you'd want to look more closely to figure out why. The default CommitFailed error messages suggests max_poll_interval_ms, and that is a good place to start, but it could be something else.

Unfortunately the error message right now does not say whether it's a (1) rebalance in progress error, or (2) stale member generation error. I can make some changes to make that difference more clear.

Are you able to identify which type of commit failure you're getting?

@dpkp
Copy link
Owner

dpkp commented May 4, 2025

Also note that in kafka "commits" are not really associated with individual messages like you might get in a message queue. Instead the commit is more like an array pointer. So if you fail to commit 1 but later commit 2 successfully, then after 2 is committed msg 1 will also be considered committed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants