File tree 1 file changed +8
-8
lines changed
1 file changed +8
-8
lines changed Original file line number Diff line number Diff line change @@ -623,19 +623,19 @@ def _message_generator(self):
623
623
# fetch positions if we have partitions we're subscribed to that we
624
624
# don't know the offset for
625
625
if not self ._subscription .has_all_fetch_positions ():
626
- self ._update_fetch_positions (self ._subscription .missing_fetch_positions ())
626
+ partitions = self ._subscription .missing_fetch_positions ()
627
+ self ._update_fetch_positions (partitions )
627
628
628
629
# init any new fetches (won't resend pending fetches)
629
630
self ._fetcher .init_fetches ()
630
- self ._client .poll (self .config ['request_timeout_ms' ] / 1000.0 )
631
- timeout = self ._consumer_timeout
632
- if self .config ['api_version' ] >= (0 , 9 ):
633
- heartbeat_timeout = time .time () + (
634
- self .config ['heartbeat_interval_ms' ] / 1000.0 )
635
- timeout = min (heartbeat_timeout , timeout )
631
+ self ._client .poll ()
632
+
633
+ timeout_at = min (self ._consumer_timeout ,
634
+ self ._client ._delayed_tasks .next_at (),
635
+ self ._client .cluster .ttl () / 1000.0 + time .time ())
636
636
for msg in self ._fetcher :
637
637
yield msg
638
- if time .time () > timeout :
638
+ if time .time () > timeout_at :
639
639
break
640
640
641
641
def __iter__ (self ): # pylint: disable=non-iterator-returned
You can’t perform that action at this time.
0 commit comments