Skip to content

Commit 98e4ab3

Browse files
author
Dana Powers
committed
Improve KafkaConsumer iterator loop timeouts
- Consider all delayed tasks, not just heartbeat - Include metadata update timeout - Fix second / millisecond bug calling client.poll()
1 parent cc4cf23 commit 98e4ab3

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

kafka/consumer/group.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -623,19 +623,19 @@ def _message_generator(self):
623623
# fetch positions if we have partitions we're subscribed to that we
624624
# don't know the offset for
625625
if not self._subscription.has_all_fetch_positions():
626-
self._update_fetch_positions(self._subscription.missing_fetch_positions())
626+
partitions = self._subscription.missing_fetch_positions()
627+
self._update_fetch_positions(partitions)
627628

628629
# init any new fetches (won't resend pending fetches)
629630
self._fetcher.init_fetches()
630-
self._client.poll(self.config['request_timeout_ms'] / 1000.0)
631-
timeout = self._consumer_timeout
632-
if self.config['api_version'] >= (0, 9):
633-
heartbeat_timeout = time.time() + (
634-
self.config['heartbeat_interval_ms'] / 1000.0)
635-
timeout = min(heartbeat_timeout, timeout)
631+
self._client.poll()
632+
633+
timeout_at = min(self._consumer_timeout,
634+
self._client._delayed_tasks.next_at(),
635+
self._client.cluster.ttl() / 1000.0 + time.time())
636636
for msg in self._fetcher:
637637
yield msg
638-
if time.time() > timeout:
638+
if time.time() > timeout_at:
639639
break
640640

641641
def __iter__(self): # pylint: disable=non-iterator-returned

0 commit comments

Comments
 (0)