Skip to content

Commit 458bdb5

Browse files
author
Dana Powers
committed
Reorganize init_fetches calls during iteration
Generally should not init_fetches while the generator has pending messages; this revision adds an explicit check / noop to the public interface, and uses a private method internally to attempt to pipeline fetch requests.
1 parent 76e7d13 commit 458bdb5

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

kafka/consumer/fetcher.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,21 @@ def __init__(self, client, subscriptions, **configs):
8686
def init_fetches(self):
8787
"""Send FetchRequests asynchronously for all assigned partitions.
8888
89+
Note: noop if there are unconsumed records internal to the fetcher
90+
8991
Returns:
9092
List of Futures: each future resolves to a FetchResponse
9193
"""
94+
# We need to be careful when creating fetch records during iteration
95+
# so we verify that there are no records in the deque, or in an
96+
# iterator
97+
if self._records or self._iterator:
98+
log.debug('Skipping init_fetches because there are unconsumed'
99+
' records internally')
100+
return []
101+
return self._init_fetches()
102+
103+
def _init_fetches(self):
92104
futures = []
93105
for node_id, request in six.iteritems(self._create_fetch_requests()):
94106
if self._client.ready(node_id):
@@ -339,6 +351,11 @@ def _message_generator(self):
339351
self._raise_if_unauthorized_topics()
340352
self._raise_if_record_too_large()
341353

354+
# Send additional FetchRequests when the internal queue is low
355+
# this should enable moderate pipelining
356+
if len(self._records) == 1:
357+
self._init_fetches()
358+
342359
(fetch_offset, tp, messages) = self._records.popleft()
343360

344361
if not self._subscriptions.is_assigned(tp):
@@ -378,11 +395,6 @@ def _message_generator(self):
378395
log.debug("Ignoring fetched records for %s at offset %s",
379396
tp, fetch_offset)
380397

381-
# Send any additional FetchRequests that we can now
382-
# this will likely fetch each partition individually, rather than
383-
# fetch multiple partitions in bulk when they are on the same broker
384-
self.init_fetches()
385-
386398
def __iter__(self): # pylint: disable=non-iterator-returned
387399
return self
388400

kafka/consumer/group.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -626,8 +626,6 @@ def _message_generator(self):
626626
partitions = self._subscription.missing_fetch_positions()
627627
self._update_fetch_positions(partitions)
628628

629-
# init any new fetches (won't resend pending fetches)
630-
self._fetcher.init_fetches()
631629
self._client.poll(
632630
max(0, self._consumer_timeout - time.time()) * 1000)
633631

@@ -641,13 +639,16 @@ def _message_generator(self):
641639
if time.time() > timeout_at:
642640
log.debug("internal iterator timeout - breaking for poll")
643641
break
642+
else:
643+
self._fetcher.init_fetches()
644644

645645
def __iter__(self): # pylint: disable=non-iterator-returned
646646
return self
647647

648648
def __next__(self):
649649
if not self._iterator:
650650
self._iterator = self._message_generator()
651+
self._fetcher.init_fetches()
651652

652653
# consumer_timeout_ms can be used to stop iteration early
653654
if self.config['consumer_timeout_ms'] >= 0:

0 commit comments

Comments
 (0)