Skip to content

Commit eecea88

Browse files
committed
Move fetching last known offset logic to a stand alone function.
The `Consumer` class fetches the last known offsets in `__init__` if `auto_commit` is enabled, but it would be nice to expose this behavior for consumers that aren't using auto_commit. This doesn't change existing behavior, just exposes the ability to easily fetch and set the last known offsets. Once dpkp#162 or something similar lands this may no longer be necessary, but it looks like that might take a while to make it through.
1 parent d73d169 commit eecea88

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

kafka/consumer.py

+16-10
Original file line numberDiff line numberDiff line change
@@ -100,23 +100,29 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
100100
self.commit)
101101
self.commit_timer.start()
102102

103+
if auto_commit:
104+
self.fetch_last_known_offsets(partitions)
105+
else:
106+
for partition in partitions:
107+
self.offsets[partition] = 0
108+
109+
def fetch_last_known_offsets(self, partitions=None):
110+
if not partitions:
111+
partitions = self.client.topic_partitions[self.topic]
112+
103113
def get_or_init_offset_callback(resp):
104114
try:
105115
kafka.common.check_error(resp)
106116
return resp.offset
107117
except kafka.common.UnknownTopicOrPartitionError:
108118
return 0
109119

110-
if auto_commit:
111-
for partition in partitions:
112-
req = OffsetFetchRequest(topic, partition)
113-
(offset,) = self.client.send_offset_fetch_request(group, [req],
114-
callback=get_or_init_offset_callback,
115-
fail_on_error=False)
116-
self.offsets[partition] = offset
117-
else:
118-
for partition in partitions:
119-
self.offsets[partition] = 0
120+
for partition in partitions:
121+
req = OffsetFetchRequest(self.topic, partition)
122+
(offset,) = self.client.send_offset_fetch_request(self.group, [req],
123+
callback=get_or_init_offset_callback,
124+
fail_on_error=False)
125+
self.offsets[partition] = offset
120126

121127
def commit(self, partitions=None):
122128
"""

0 commit comments

Comments
 (0)