Skip to content

Commit 2f262b9

Browse files
committed
KAFKA-4547: Avoid resetting paused partitions to committed offsets
1 parent e92defe commit 2f262b9

File tree

4 files changed

+64
-14
lines changed

4 files changed

+64
-14
lines changed

kafka/consumer/fetcher.py

+1-7
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,7 @@ def update_fetch_positions(self, partitions, timeout_ms=None):
194194
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
195195
# reset the fetch position to the committed position
196196
for tp in partitions:
197-
if not self._subscriptions.is_assigned(tp):
198-
log.warning("partition %s is not assigned - skipping offset"
199-
" update", tp)
200-
continue
201-
elif self._subscriptions.is_fetchable(tp):
202-
log.warning("partition %s is still fetchable -- skipping offset"
203-
" update", tp)
197+
if not self._subscriptions.is_assigned(tp) or self._subscriptions.has_valid_position(tp):
204198
continue
205199

206200
if self._subscriptions.is_offset_reset_needed(tp):

kafka/consumer/group.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1144,9 +1144,9 @@ def _update_fetch_positions(self, partitions, timeout_ms=None):
11441144
# their own offsets).
11451145
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms())
11461146

1147-
if not self._subscription.has_all_fetch_positions():
1148-
# if we still don't have offsets for all partitions, then we should either seek
1149-
# to the last committed position or reset using the auto reset policy
1147+
if not self._subscription.has_all_fetch_positions(partitions):
1148+
# if we still don't have offsets for the given partitions, then we should either
1149+
# seek to the last committed position or reset using the auto reset policy
11501150
if (self.config['api_version'] >= (0, 8, 1) and
11511151
self.config['group_id'] is not None):
11521152
# first refresh commits for all assigned partitions

kafka/consumer/subscription_state.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,11 @@ def has_default_offset_reset_policy(self):
351351
def is_offset_reset_needed(self, partition):
352352
return self.assignment[partition].awaiting_reset
353353

354-
def has_all_fetch_positions(self):
355-
for state in self.assignment.values():
356-
if not state.has_valid_position:
354+
def has_all_fetch_positions(self, partitions=None):
355+
if partitions is None:
356+
partitions = self.assigned_partitions()
357+
for tp in partitions:
358+
if not self.has_valid_position(tp):
357359
return False
358360
return True
359361

@@ -364,6 +366,9 @@ def missing_fetch_positions(self):
364366
missing.add(partition)
365367
return missing
366368

369+
def has_valid_position(self, partition):
370+
return partition in self.assignment and self.assignment[partition].has_valid_position
371+
367372
def is_assigned(self, partition):
368373
return partition in self.assignment
369374

@@ -387,6 +392,9 @@ def move_partition_to_end(self, partition):
387392
state = self.assignment.pop(partition)
388393
self.assignment[partition] = state
389394

395+
def position(self, partition):
396+
return self.assignment[partition].position
397+
390398

391399
class TopicPartitionState(object):
392400
def __init__(self):

test/test_fetcher.py

+49-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from kafka.future import Future
1818
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
1919
from kafka.protocol.fetch import FetchRequest, FetchResponse
20-
from kafka.protocol.list_offsets import ListOffsetsResponse
20+
from kafka.protocol.list_offsets import ListOffsetsResponse, OffsetResetStrategy
2121
from kafka.errors import (
2222
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
2323
UnknownTopicOrPartitionError, OffsetOutOfRangeError
@@ -610,3 +610,51 @@ def test_partition_records_compacted_offset(mocker):
610610
msgs = records.take()
611611
assert len(msgs) == batch_end - fetch_offset - 1
612612
assert msgs[0].offset == fetch_offset + 1
613+
614+
615+
def test_update_fetch_positions_paused(subscription_state, client, mocker):
616+
fetcher = Fetcher(client, subscription_state)
617+
tp = TopicPartition('foo', 0)
618+
subscription_state.assign_from_user([tp])
619+
subscription_state.pause(tp) # paused partition does not have a valid position
620+
subscription_state.need_offset_reset(tp, OffsetResetStrategy.LATEST)
621+
622+
mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(10, 1, -1)})
623+
fetcher.update_fetch_positions([tp])
624+
625+
assert not subscription_state.is_offset_reset_needed(tp)
626+
assert not subscription_state.is_fetchable(tp) # because tp is paused
627+
assert subscription_state.has_valid_position(tp)
628+
assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1)
629+
630+
631+
def test_update_fetch_positions_paused_without_valid(subscription_state, client, mocker):
632+
fetcher = Fetcher(client, subscription_state)
633+
tp = TopicPartition('foo', 0)
634+
subscription_state.assign_from_user([tp])
635+
subscription_state.pause(tp) # paused partition does not have a valid position
636+
637+
mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(0, 1, -1)})
638+
fetcher.update_fetch_positions([tp])
639+
640+
assert not subscription_state.is_offset_reset_needed(tp)
641+
assert not subscription_state.is_fetchable(tp) # because tp is paused
642+
assert subscription_state.has_valid_position(tp)
643+
assert subscription_state.position(tp) == OffsetAndMetadata(0, '', -1)
644+
645+
646+
def test_update_fetch_positions_paused_with_valid(subscription_state, client, mocker):
647+
fetcher = Fetcher(client, subscription_state)
648+
tp = TopicPartition('foo', 0)
649+
subscription_state.assign_from_user([tp])
650+
subscription_state.assignment[tp].committed = OffsetAndMetadata(0, '', -1)
651+
subscription_state.seek(tp, 10)
652+
subscription_state.pause(tp) # paused partition already has a valid position
653+
654+
mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(0, 1, -1)})
655+
fetcher.update_fetch_positions([tp])
656+
657+
assert not subscription_state.is_offset_reset_needed(tp)
658+
assert not subscription_state.is_fetchable(tp) # because tp is paused
659+
assert subscription_state.has_valid_position(tp)
660+
assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1)

0 commit comments

Comments
 (0)