Skip to content

After pausing and resuming partition, the offset is reset far ahead. #2011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
smalyshev opened this issue Mar 4, 2020 · 8 comments · Fixed by #2555
Closed

After pausing and resuming partition, the offset is reset far ahead. #2011

smalyshev opened this issue Mar 4, 2020 · 8 comments · Fixed by #2555

Comments

@smalyshev
Copy link

If I use code that pauses and then resumes reading from partitions, the offset for the partition is set not to the last message read, but to the message far ahead. Example code:

consumer = KafkaConsumer(bootstrap_servers=[SERVER], auto_offset_reset='earliest', max_poll_records=10)
consumer.subscribe([TOPIC])
partition_count = len(consumer.partitions_for_topic(TOPIC))
paused_tp = set()
while True:
    batch = consumer.poll(timeout_ms=1000)
    topause = None
    for tp, items in batch.items():
        for item in items:
            print(item.partition, item.offset)
        if len(paused_tp) == partition_count-1:
            topause = tp
            break
        consumer.pause(tp)
        paused_tp.add(tp)
    if topause:
        consumer.resume(*paused_tp)
        consumer.pause(topause)
        break

batch = consumer.poll(timeout_ms=1000)
for tp, items in batch.items():
    for item in items:
        print(item.partition, item.offset)

This code will read 10 messages from each partition on the topic, and then pause it. After all partitions but one were treated this way, it will pause last partition and resume all the rest. One would expect that the next read resumes from offset 10 in one of the partitions. But this is not what happens, the real result looks something like:

6 2308
6 2309
6 2310
6 2311
6 2312
6 2313
6 2314
6 2315
6 2316
6 2317

And if we look at the debug message, there's this:

DEBUG: kafka.consumer.fetcher Advance position for partition TopicPartition(topic='TOPIC', partition=6) from 0 to 2308 (last message batch location plus one) to correct for deleted compacted messages
DEBUG: kafka.consumer.fetcher Adding fetch request for partition TopicPartition(topic='TOPIC', partition=6) at offset 2308

and so on fo the rest of the topic. The offset 2308 is far ahead the last consumed message at offset 9 and there are certainly messages at offsets between 9 and 2308. But for some reason Kafka jumps ahead, though this is certainly not what I expect when I just pause and unpause it. Is there any way to make pause/resume work as expected, i.e. resume from the last consumed offset?

@smalyshev
Copy link
Author

smalyshev commented Mar 4, 2020

If I add something like this after resume():

            offsets = {p: consumer.position(p) for p in paused_tp}
            for p, off in offsets.items():
                consumer.seek(p, off)

then everything works as expected. Even though seek() to the result of position() as I understand should be a no-op semantically?

@phoenix-mstu
Copy link

Same problem for me. I'm going to use the hack with seek

@a-shkarupin
Copy link

Same issue here, seek works for me as well.

@yarik3571
Copy link

The same for me. The fix above works as well.
But IMHO, this looks like something to be fixed in library

@peter-marlowe
Copy link

Same for me, and the workaround works. Thanks @smalyshev

This bug is nasty since some users may not even notice it as it silently corrupts the returned message stream.

@yeungalan0
Copy link

Seeing this issue as well, and am using the same seek method to workaround this issue.

@chubutin
Copy link

Same workaround worked for me. Thanks @smalyshev

@paolomilani
Copy link

Still seeing this bug, and the same workaround works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants