Skip to content

Topic Lag being set to "Sum of Partition Offsets" when Consumer is idle #1831

Closed
@bb-greatdevaks

Description

@bb-greatdevaks

Hi,

I have a use case where every five minutes I need to poll the topic lag corresponding to a topic.
I wrote the following code:

          consumer = KafkaConsumer(
                bootstrap_servers=self.bootstrap_servers,
                group_id=kafka_group_name
            )

            # Getting the list of topics. It will not be used but consumer.topics() is required in order to populate the metadata which partitions_for_topic() requires.
            topics = consumer.topics()

            # Getting the partitions for the specified topic.
            partitions = consumer.partitions_for_topic(self.target_kafka_consumer_name)

            # Initializing the topic_lag = 0.
            topic_lag = 0

            # Computing the topic lag.
            for partition in partitions:
                topic_partition = TopicPartition(self.target_kafka_consumer_name,
                                                 partition)  # Getting the topic partition.
                consumer.assign([topic_partition])  # Selecting the topic partition.
                committed = consumer.committed(topic_partition)  # Getting the committed offset.
                consumer.seek_to_end(topic_partition)  # Seeking to the latest position of the offset.
                last_offset = consumer.position(topic_partition)  # Getting the latest offset.
                # Checking if committed is of None type or not. If it is, set it to 0.
                if committed is None:
                    committed = 0  # Setting committed to 0.
                partition_lag = last_offset - committed  # Calculating the partition lag.
                topic_lag = topic_lag + partition_lag  # Calculating the topic lag.

            # Closing the consumer.
            consumer.close(autocommit=True)

The issue is that Topic Lag is being set to "Sum of Partition Offsets" when the Consumer is idle for long time (say, for more than four hours).
However, when the Consumer is not idle, the code works fine and I get the desired Topic Lag.

Am I calculating the Topic Lag correctly? Any help would be appreciated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions