Skip to content

Optionally return OffsetAndMetadata from consumer.committed(tp) #1979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def update_fetch_positions(self, partitions):
self._subscriptions.need_offset_reset(tp)
self._reset_offset(tp)
else:
committed = self._subscriptions.assignment[tp].committed
committed = self._subscriptions.assignment[tp].committed.offset
log.debug("Resetting offset for partition %s to the committed"
" offset %s", tp, committed)
self._subscriptions.seek(tp, committed)
Expand Down
15 changes: 11 additions & 4 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ def commit(self, offsets=None):
offsets = self._subscription.all_consumed_offsets()
self._coordinator.commit_offsets_sync(offsets)

def committed(self, partition):
def committed(self, partition, metadata=False):
"""Get the last committed offset for the given partition.

This offset will be used as the position for the consumer
Expand All @@ -537,9 +537,11 @@ def committed(self, partition):

Arguments:
partition (TopicPartition): The partition to check.
metadata (bool, optional): If True, return OffsetAndMetadata struct
instead of offset int. Default: False.

Returns:
The last committed offset, or None if there was no prior commit.
The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.
"""
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
assert self.config['group_id'] is not None, 'Requires group_id'
Expand All @@ -553,10 +555,15 @@ def committed(self, partition):
else:
commit_map = self._coordinator.fetch_committed_offsets([partition])
if partition in commit_map:
committed = commit_map[partition].offset
committed = commit_map[partition]
else:
committed = None
return committed

if committed is not None:
if metadata:
return committed
else:
return committed.offset

def _fetch_all_topic_metadata(self):
"""A blocking call that fetches topic metadata for all topics in the
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def _add_assigned_partition(self, partition):

class TopicPartitionState(object):
def __init__(self):
self.committed = None # last committed position
self.committed = None # last committed OffsetAndMetadata
self.has_valid_position = False # whether we have valid position
self.paused = False # whether this partition has been paused by the user
self.awaiting_reset = False # whether we are awaiting reset
Expand Down
6 changes: 3 additions & 3 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def refresh_committed_offsets_if_needed(self):
for partition, offset in six.iteritems(offsets):
# verify assignment is still active
if self._subscription.is_assigned(partition):
self._subscription.assignment[partition].committed = offset.offset
self._subscription.assignment[partition].committed = offset
self._subscription.needs_fetch_committed_offsets = False

def fetch_committed_offsets(self, partitions):
Expand Down Expand Up @@ -641,7 +641,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
log.debug("Group %s committed offset %s for partition %s",
self.group_id, offset, tp)
if self._subscription.is_assigned(tp):
self._subscription.assignment[tp].committed = offset.offset
self._subscription.assignment[tp].committed = offset
elif error_type is Errors.GroupAuthorizationFailedError:
log.error("Not authorized to commit offsets for group %s",
self.group_id)
Expand Down Expand Up @@ -704,7 +704,7 @@ def _send_offset_fetch_request(self, partitions):
partitions (list of TopicPartition): the partitions to fetch
Returns:
Future: resolves to dict of offsets: {TopicPartition: int}
Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata}
"""
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
Expand Down
4 changes: 2 additions & 2 deletions test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import WeakMethod


Expand Down Expand Up @@ -211,7 +211,7 @@ def test_refresh_committed_offsets_if_needed(mocker, coordinator):
assert coordinator._subscription.needs_fetch_committed_offsets is True
coordinator.refresh_committed_offsets_if_needed()
assignment = coordinator._subscription.assignment
assert assignment[TopicPartition('foobar', 0)].committed == 123
assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'')
assert TopicPartition('foobar', 1) not in assignment
assert coordinator._subscription.needs_fetch_committed_offsets is False

Expand Down
4 changes: 2 additions & 2 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
from kafka.structs import TopicPartition
from kafka.structs import OffsetAndMetadata, TopicPartition


@pytest.fixture
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_update_fetch_positions(fetcher, topic, mocker):
fetcher._reset_offset.reset_mock()
fetcher._subscriptions.need_offset_reset(partition)
fetcher._subscriptions.assignment[partition].awaiting_reset = False
fetcher._subscriptions.assignment[partition].committed = 123
fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'')
mocker.patch.object(fetcher._subscriptions, 'seek')
fetcher.update_fetch_positions([partition])
assert fetcher._reset_offset.call_count == 0
Expand Down