Closed
Description
Hi,
I have a use case where every five minutes I need to poll the topic lag corresponding to a topic.
I wrote the following code:
consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
group_id=kafka_group_name
)
# Getting the list of topics. It will not be used but consumer.topics() is required in order to populate the metadata which partitions_for_topic() requires.
topics = consumer.topics()
# Getting the partitions for the specified topic.
partitions = consumer.partitions_for_topic(self.target_kafka_consumer_name)
# Initializing the topic_lag = 0.
topic_lag = 0
# Computing the topic lag.
for partition in partitions:
topic_partition = TopicPartition(self.target_kafka_consumer_name,
partition) # Getting the topic partition.
consumer.assign([topic_partition]) # Selecting the topic partition.
committed = consumer.committed(topic_partition) # Getting the committed offset.
consumer.seek_to_end(topic_partition) # Seeking to the latest position of the offset.
last_offset = consumer.position(topic_partition) # Getting the latest offset.
# Checking if committed is of None type or not. If it is, set it to 0.
if committed is None:
committed = 0 # Setting committed to 0.
partition_lag = last_offset - committed # Calculating the partition lag.
topic_lag = topic_lag + partition_lag # Calculating the topic lag.
# Closing the consumer.
consumer.close(autocommit=True)
The issue is that Topic Lag is being set to "Sum of Partition Offsets" when the Consumer is idle for long time (say, for more than four hours).
However, when the Consumer is not idle, the code works fine and I get the desired Topic Lag.
Am I calculating the Topic Lag correctly? Any help would be appreciated.
Metadata
Metadata
Assignees
Labels
No labels