Skip to content

Commit ed42d78

Browse files
author
Dana Powers
committed
Change SimpleConsumer.reset_partition_offset to return offset / None on failure (dont raise exception)
1 parent 680a8dc commit ed42d78

File tree

1 file changed

+19
-5
lines changed

1 file changed

+19
-5
lines changed

kafka/consumer/simple.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
NO_MESSAGES_WAIT_TIME_SECONDS
2828
)
2929
from ..common import (
30-
FetchRequest, OffsetRequest,
30+
FetchRequest, KafkaError, OffsetRequest,
3131
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
3232
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
3333
OffsetOutOfRangeError, FailedPayloadsError, check_error
@@ -144,6 +144,13 @@ def __repr__(self):
144144
(self.group, self.topic, str(self.offsets.keys()))
145145

146146
def reset_partition_offset(self, partition):
147+
"""Update offsets using auto_offset_reset policy (smallest|largest)
148+
149+
Arguments:
150+
partition (int): the partition for which offsets should be updated
151+
152+
Returns: Updated offset on success, None on failure
153+
"""
147154
LATEST = -1
148155
EARLIEST = -2
149156
if self.auto_offset_reset == 'largest':
@@ -163,10 +170,17 @@ def reset_partition_offset(self, partition):
163170
raise
164171

165172
# send_offset_request
166-
(resp, ) = self.client.send_offset_request(reqs)
167-
check_error(resp)
168-
self.offsets[partition] = resp.offsets[0]
169-
self.fetch_offsets[partition] = resp.offsets[0]
173+
log.info('Resetting topic-partition offset to %s for %s:%d',
174+
self.auto_offset_reset, self.topic, partition)
175+
try:
176+
(resp, ) = self.client.send_offset_request(reqs)
177+
except KafkaError as e:
178+
log.error('%s sending offset request for %s:%d',
179+
e.__class__.__name__, self.topic, partition)
180+
else:
181+
self.offsets[partition] = resp.offsets[0]
182+
self.fetch_offsets[partition] = resp.offsets[0]
183+
return resp.offsets[0]
170184

171185
def provide_partition_info(self):
172186
"""

0 commit comments

Comments
 (0)