Skip to content

Commit 9837927

Browse files
author
Dana Powers
committed
KAFKA-2978: consumer stops fetching when consumed and fetch positions get out of sync
1 parent 814b599 commit 9837927

File tree

4 files changed

+53
-74
lines changed

4 files changed

+53
-74
lines changed

kafka/consumer/fetcher.py

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,11 @@ def _raise_if_offset_out_of_range(self):
209209
log.debug("Ignoring fetched records for %s since it is no"
210210
" longer fetchable", partition)
211211
continue
212-
consumed = self._subscriptions.assignment[partition].consumed
213-
# ignore partition if its consumed offset != offset in FetchResponse
212+
position = self._subscriptions.assignment[partition].position
213+
# ignore partition if the current position != offset in FetchResponse
214214
# e.g. after seek()
215-
if consumed is not None and offset == consumed:
216-
current_out_of_range_partitions[partition] = offset
215+
if position is not None and offset == position:
216+
current_out_of_range_partitions[partition] = position
217217

218218
self._offset_out_of_range_partitions.clear()
219219
if current_out_of_range_partitions:
@@ -290,31 +290,30 @@ def fetched_records(self):
290290
" since it is no longer assigned", tp)
291291
continue
292292

293-
# note that the consumed position should always be available
293+
# note that the position should always be available
294294
# as long as the partition is still assigned
295-
consumed = self._subscriptions.assignment[tp].consumed
295+
position = self._subscriptions.assignment[tp].position
296296
if not self._subscriptions.is_fetchable(tp):
297-
# this can happen when a partition consumption paused before
297+
# this can happen when a partition is paused before
298298
# fetched records are returned to the consumer's poll call
299299
log.debug("Not returning fetched records for assigned partition"
300300
" %s since it is no longer fetchable", tp)
301301

302-
# we also need to reset the fetch positions to pretend we did
303-
# not fetch this partition in the previous request at all
304-
self._subscriptions.assignment[tp].fetched = consumed
305-
elif fetch_offset == consumed:
302+
elif fetch_offset == position:
306303
next_offset = messages[-1][0] + 1
307-
log.debug("Returning fetched records for assigned partition %s"
308-
" and update consumed position to %s", tp, next_offset)
309-
self._subscriptions.assignment[tp].consumed = next_offset
304+
log.debug("Returning fetched records at offset %d for assigned"
305+
" partition %s and update position to %s", position,
306+
tp, next_offset)
307+
self._subscriptions.assignment[tp].position = next_offset
310308

311309
for record in self._unpack_message_set(tp, messages):
312310
drained[tp].append(record)
313311
else:
314312
# these records aren't next in line based on the last consumed
315313
# position, ignore them they must be from an obsolete request
316-
log.debug("Ignoring fetched records for %s at offset %s",
317-
tp, fetch_offset)
314+
log.debug("Ignoring fetched records for %s at offset %s since"
315+
" the current position is %d", tp, fetch_offset,
316+
position)
318317
return dict(drained)
319318

320319
def _unpack_message_set(self, tp, messages):
@@ -351,20 +350,16 @@ def _message_generator(self):
351350

352351
# note that the consumed position should always be available
353352
# as long as the partition is still assigned
354-
consumed = self._subscriptions.assignment[tp].consumed
353+
position = self._subscriptions.assignment[tp].position
355354
if not self._subscriptions.is_fetchable(tp):
356355
# this can happen when a partition consumption paused before
357356
# fetched records are returned
358357
log.warning("Not returning fetched records for assigned partition"
359358
" %s since it is no longer fetchable", tp)
360359

361-
# we also need to reset the fetch positions to pretend we did
362-
# not fetch this partition in the previous request at all
363-
self._subscriptions.assignment[tp].fetched = consumed
364-
365-
elif fetch_offset == consumed:
360+
elif fetch_offset == position:
366361
for msg in self._unpack_message_set(tp, messages):
367-
self._subscriptions.assignment[tp].consumed = msg.offset + 1
362+
self._subscriptions.assignment[tp].position = msg.offset + 1
368363
yield msg
369364
else:
370365
# these records aren't next in line based on the last consumed
@@ -494,19 +489,15 @@ def _create_fetch_requests(self):
494489
# if there is a leader and no in-flight requests,
495490
# issue a new fetch but only fetch data for partitions whose
496491
# previously fetched data has been consumed
497-
fetched = self._subscriptions.assignment[partition].fetched
498-
consumed = self._subscriptions.assignment[partition].consumed
499-
if consumed == fetched:
500-
partition_info = (
501-
partition.partition,
502-
fetched,
503-
self.config['max_partition_fetch_bytes']
504-
)
505-
fetchable[node_id][partition.topic].append(partition_info)
506-
else:
507-
log.debug("Skipping FetchRequest to %s because previously"
508-
" fetched offsets (%s) have not been fully"
509-
" consumed yet (%s)", node_id, fetched, consumed)
492+
position = self._subscriptions.assignment[partition].position
493+
partition_info = (
494+
partition.partition,
495+
position,
496+
self.config['max_partition_fetch_bytes']
497+
)
498+
fetchable[node_id][partition.topic].append(partition_info)
499+
log.debug("Adding fetch request for partition %d at offset %d",
500+
partition, position)
510501

511502
requests = {}
512503
for node_id, partition_data in six.iteritems(fetchable):
@@ -541,25 +532,24 @@ def _handle_fetch_response(self, request, response):
541532

542533
# we are interested in this fetch only if the beginning
543534
# offset matches the current consumed position
544-
consumed = self._subscriptions.assignment[tp].consumed
545-
if consumed is None:
546-
continue
547-
elif consumed != fetch_offset:
548-
# the fetched position has gotten out of sync with the
549-
# consumed position (which might happen when a
550-
# rebalance occurs with a fetch in-flight), so we need
551-
# to reset the fetch position so the next fetch is right
552-
self._subscriptions.assignment[tp].fetched = consumed
535+
position = self._subscriptions.assignment[tp].position
536+
if position is None or position != fetch_offset:
537+
log.debug("Discarding fetch response for partition %s"
538+
" since its offset %d does not match the"
539+
" expected offset %d", tp, fetch_offset,
540+
position)
553541
continue
554542

555543
partial = None
556544
if messages and isinstance(messages[-1][-1], PartialMessage):
557545
partial = messages.pop()
558546

559547
if messages:
560-
last_offset, _, _ = messages[-1]
561-
self._subscriptions.assignment[tp].fetched = last_offset + 1
548+
log.debug("Adding fetched record for partition %s with"
549+
" offset %d to buffered record list", tp,
550+
position)
562551
self._records.append((fetch_offset, tp, messages))
552+
#last_offset, _, _ = messages[-1]
563553
#self.sensors.records_fetch_lag.record(highwater - last_offset)
564554
elif partial:
565555
# we did not read a single message from a non-empty
@@ -581,7 +571,7 @@ def _handle_fetch_response(self, request, response):
581571
else:
582572
self._offset_out_of_range_partitions[tp] = fetch_offset
583573
log.info("Fetch offset %s is out of range, resetting offset",
584-
self._subscriptions.assignment[tp].fetched)
574+
fetch_offset)
585575
elif error_type is Errors.TopicAuthorizationFailedError:
586576
log.warn("Not authorized to read from topic %s.", tp.topic)
587577
self._unauthorized_topics.add(tp.topic)

kafka/consumer/group.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,10 +452,10 @@ def position(self, partition):
452452
"""
453453
assert self._subscription.is_assigned(partition)
454454

455-
offset = self._subscription.assignment[partition].consumed
455+
offset = self._subscription.assignment[partition].position
456456
if offset is None:
457457
self._update_fetch_positions(partition)
458-
offset = self._subscription.assignment[partition].consumed
458+
offset = self._subscription.assignment[partition].position
459459
return offset
460460

461461
def pause(self, *partitions):

kafka/consumer/subscription_state.py

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ def all_consumed_offsets(self):
279279
all_consumed = {}
280280
for partition, state in six.iteritems(self.assignment):
281281
if state.has_valid_position:
282-
all_consumed[partition] = OffsetAndMetadata(state.consumed, '')
282+
all_consumed[partition] = OffsetAndMetadata(state.position, '')
283283
return all_consumed
284284

285285
def need_offset_reset(self, partition, offset_reset_strategy=None):
@@ -335,41 +335,29 @@ def _add_assigned_partition(self, partition):
335335
class TopicPartitionState(object):
336336
def __init__(self):
337337
self.committed = None # last committed position
338-
self.has_valid_position = False # whether we have valid consumed and fetched positions
338+
self.has_valid_position = False # whether we have valid position
339339
self.paused = False # whether this partition has been paused by the user
340340
self.awaiting_reset = False # whether we are awaiting reset
341341
self.reset_strategy = None # the reset strategy if awaitingReset is set
342-
self._consumed = None # offset exposed to the user
343-
self._fetched = None # current fetch position
342+
self._position = None # offset exposed to the user
344343

345-
def _set_fetched(self, offset):
346-
assert self.has_valid_position, 'Valid consumed/fetch position required'
347-
self._fetched = offset
344+
def _set_position(self, offset):
345+
assert self.has_valid_position, 'Valid position required'
346+
self._position = offset
348347

349-
def _get_fetched(self):
350-
return self._fetched
348+
def _get_position(self):
349+
return self._position
351350

352-
fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
353-
354-
def _set_consumed(self, offset):
355-
assert self.has_valid_position, 'Valid consumed/fetch position required'
356-
self._consumed = offset
357-
358-
def _get_consumed(self):
359-
return self._consumed
360-
361-
consumed = property(_get_consumed, _set_consumed, None, "last consumed position")
351+
position = property(_get_position, _set_position, None, "last position")
362352

363353
def await_reset(self, strategy):
364354
self.awaiting_reset = True
365355
self.reset_strategy = strategy
366-
self._consumed = None
367-
self._fetched = None
356+
self._position = None
368357
self.has_valid_position = False
369358

370359
def seek(self, offset):
371-
self._consumed = offset
372-
self._fetched = offset
360+
self._position = offset
373361
self.awaiting_reset = False
374362
self.reset_strategy = None
375363
self.has_valid_position = True

kafka/coordinator/consumer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ def _subscription_metadata_changed(self):
146146
old_partitions_per_topic = self._partitions_per_topic
147147
self._partitions_per_topic = {}
148148
for topic in self._subscription.group_subscription():
149-
self._partitions_per_topic[topic] = set(self._cluster.partitions_for_topic(topic))
149+
partitions = self._cluster.partitions_for_topic(topic) or []
150+
self._partitions_per_topic[topic] = set(partitions)
150151

151152
if self._partitions_per_topic != old_partitions_per_topic:
152153
return True

0 commit comments

Comments
 (0)