Closed
Description
Dear kafka-python team,
Im trying to get lag from topics. I read and try the following issues:
based on comment:
for msg in consumer:
tp = TopicPartition(msg.topic, msg.partition)
highwater = consumer.highwater(tp)
lag = (highwater - 1) - msg.offset
I tried the following code (based an some others post from stackoverflow and issues)
self.consumer=KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
group_id='test',
enable_auto_commit=False,
consumer_timeout_ms=5000
)
topics = self.consumer.topics()
for topic in topics:
self.logger.info("Getting Metrics for topic {topic}".format(topic=topic))
lag_in_seconds = 0.0
offset = 0
partitions = self.consumer.partitions_for_topic(topic)
if partitions is not None:
self.logger.info("Partitions {}".format(str(len(partitions))))
for p in partitions:
tp = TopicPartition(topic, p)
self.consumer.assign([tp])
committed = self.consumer.committed(tp)
self.consumer.seek_to_end(tp)
last_offset = self.consumer.position(tp)
offset = offset + last_offset
highwater = self.consumer.highwater(tp)
if highwater is None:
lag = None
else:
lag = (highwater - 1) - last_offset
I get always highwater None.
What would be the correct way to get lag?
Thanks,
Pablo.
Metadata
Metadata
Assignees
Labels
No labels