Closed
Description
Hi, faced with strange issue trying to calculate consumer offset lag. This is on kafka 0.9.0.0.
How to reproduce:
Creating topic like this:
/opt/kafka/bin/kafka-topics.sh --create --topic topic_a --partitions 5 --replication-factor 3 --zookeeper localhost
Then produce some messages:
from kafka import SimpleClient, SimpleProducer
client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
producer = SimpleProducer(client=client, async=False)
for i in xrange(10):
producer.send_messages(TOPIC, str(i))
On other side half messages were consumed with:
consumer = KafkaConsumer(TOPIC, bootstrap_servers='localhost:9092,localhost:9093,localhost:9094', group_id=GROUP)
print([next(consumer).value for _ in range(5)])
consumer.commit()
Now we see correct total lag - 5:
/opt/kafka/bin/kafka-consumer-offset-checker.sh --group group_a --topic topic_a --zookeeper localhost
Group Topic Pid Offset logSize Lag Owner
group_a topic_a 0 2 2 0 none
group_a topic_a 1 0 2 2 none
group_a topic_a 2 2 2 0 none
group_a topic_a 3 1 2 1 none
group_a topic_a 4 0 2 2 none
i'm trying to get same value with following code:
client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
client.load_metadata_for_topics()
partitions = client.topic_partitions[TOPIC]
offset_requests = [OffsetRequestPayload(TOPIC, p, -1, 1) for p in partitions.keys()]
latest_offset_by_partition = {r.partition: r.offsets[0]
for r in client.send_offset_request(offset_requests)}
current_offset_by_partition = {r.partition: r.offset
for r in client.send_offset_fetch_request(GROUP, offset_requests)}
lag = 0
for part in partitions.keys():
current = current_offset_by_partition.get(part, -1)
latest = latest_offset_by_partition.get(part)
lag += latest - current
print('lag: {}'.format(lag))
but getting UnknownTopicOrPartitionError: UnknownTopicOrPartitionError - 3 - This request is for a topic or partition that does not exist on this broker.
if i do same: create, produce, but consume with:
client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
consumer = SimpleConsumer(client, group=GROUP, topic=TOPIC)
consumer.get_messages(5)
consumer.commit()
i'm getting correct lag with my code, and it is same as see with kafka-consumer-offset-checker
A bit confusing issue, probably i'm doing something wrong, so let me know if it is.
Metadata
Metadata
Assignees
Labels
No labels