Skip to content

Commit 843b347

Browse files
committed
Add KafkaConsumer.highwater(partition) to get highwater offsets from FetchResponses
1 parent 33b9ff2 commit 843b347

File tree

3 files changed

+26
-1
lines changed

3 files changed

+26
-1
lines changed

kafka/consumer/fetcher.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,11 +574,13 @@ def _handle_fetch_response(self, request, response):
574574
# consumption paused while fetch is still in-flight
575575
log.debug("Ignoring fetched records for partition %s"
576576
" since it is no longer fetchable", tp)
577+
577578
elif error_type is Errors.NoError:
578-
fetch_offset = fetch_offsets[tp]
579+
self._subscriptions.assignment[tp].highwater = highwater
579580

580581
# we are interested in this fetch only if the beginning
581582
# offset matches the current consumed position
583+
fetch_offset = fetch_offsets[tp]
582584
position = self._subscriptions.assignment[tp].position
583585
if position is None or position != fetch_offset:
584586
log.debug("Discarding fetch response for partition %s"

kafka/consumer/group.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,28 @@ def position(self, partition):
463463
offset = self._subscription.assignment[partition].position
464464
return offset
465465

466+
def highwater(self, partition):
467+
"""Last known highwater offset for a partition
468+
469+
A highwater offset is the offset that will be assigned to the next
470+
message that is produced. It may be useful for calculating lag, by
471+
comparing with the reported position. Note that both position and
472+
highwater refer to the *next* offset -- i.e., highwater offset is
473+
one greater than the newest availabel message.
474+
475+
Highwater offsets are returned in FetchResponse messages, so will
476+
not be available if not FetchRequests have been sent for this partition
477+
yet.
478+
479+
Arguments:
480+
partition (TopicPartition): partition to check
481+
482+
Returns:
483+
int or None: offset if available
484+
"""
485+
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
486+
return self._subscription.assignment[partition].highwater
487+
466488
def pause(self, *partitions):
467489
"""Suspend fetching from the requested partitions.
468490

kafka/consumer/subscription_state.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ def __init__(self):
340340
self.awaiting_reset = False # whether we are awaiting reset
341341
self.reset_strategy = None # the reset strategy if awaitingReset is set
342342
self._position = None # offset exposed to the user
343+
self.highwater = None
343344

344345
def _set_position(self, offset):
345346
assert self.has_valid_position, 'Valid position required'

0 commit comments

Comments
 (0)