Skip to content

can't calculate offset lag if consuming with KafkaConsumer #509

Closed
@vershininm

Description

@vershininm

Hi, faced with strange issue trying to calculate consumer offset lag. This is on kafka 0.9.0.0.

How to reproduce:
Creating topic like this:

/opt/kafka/bin/kafka-topics.sh --create --topic topic_a  --partitions 5 --replication-factor 3 --zookeeper localhost

Then produce some messages:

from kafka import SimpleClient, SimpleProducer
client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
producer = SimpleProducer(client=client, async=False)
for i in xrange(10):
    producer.send_messages(TOPIC, str(i))

On other side half messages were consumed with:

consumer = KafkaConsumer(TOPIC, bootstrap_servers='localhost:9092,localhost:9093,localhost:9094', group_id=GROUP)
print([next(consumer).value for _ in range(5)])
consumer.commit()

Now we see correct total lag - 5:

/opt/kafka/bin/kafka-consumer-offset-checker.sh --group group_a --topic  topic_a --zookeeper localhost

Group           Topic                          Pid Offset          logSize         Lag             Owner
group_a         topic_a                               0   2               2               0               none
group_a         topic_a                               1   0               2               2               none
group_a         topic_a                               2   2               2               0               none
group_a         topic_a                               3   1               2               1               none
group_a         topic_a                               4   0               2               2               none

i'm trying to get same value with following code:

client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
client.load_metadata_for_topics()

partitions = client.topic_partitions[TOPIC]
offset_requests = [OffsetRequestPayload(TOPIC, p, -1, 1) for p in partitions.keys()]

latest_offset_by_partition = {r.partition: r.offsets[0]
                              for r in client.send_offset_request(offset_requests)}
current_offset_by_partition = {r.partition: r.offset
                               for r in client.send_offset_fetch_request(GROUP, offset_requests)}
lag = 0
for part in partitions.keys():
    current = current_offset_by_partition.get(part, -1)
    latest = latest_offset_by_partition.get(part)
    lag += latest - current

print('lag: {}'.format(lag))

but getting UnknownTopicOrPartitionError: UnknownTopicOrPartitionError - 3 - This request is for a topic or partition that does not exist on this broker.

if i do same: create, produce, but consume with:

client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
consumer = SimpleConsumer(client, group=GROUP, topic=TOPIC)
consumer.get_messages(5)
consumer.commit()

i'm getting correct lag with my code, and it is same as see with kafka-consumer-offset-checker

A bit confusing issue, probably i'm doing something wrong, so let me know if it is.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions