27
27
NO_MESSAGES_WAIT_TIME_SECONDS
28
28
)
29
29
from ..common import (
30
- FetchRequest , OffsetRequest ,
30
+ FetchRequest , KafkaError , OffsetRequest ,
31
31
ConsumerFetchSizeTooSmall , ConsumerNoMoreData ,
32
32
UnknownTopicOrPartitionError , NotLeaderForPartitionError ,
33
33
OffsetOutOfRangeError , FailedPayloadsError , check_error
@@ -144,6 +144,13 @@ def __repr__(self):
144
144
(self .group , self .topic , str (self .offsets .keys ()))
145
145
146
146
def reset_partition_offset (self , partition ):
147
+ """Update offsets using auto_offset_reset policy (smallest|largest)
148
+
149
+ Arguments:
150
+ partition (int): the partition for which offsets should be updated
151
+
152
+ Returns: Updated offset on success, None on failure
153
+ """
147
154
LATEST = - 1
148
155
EARLIEST = - 2
149
156
if self .auto_offset_reset == 'largest' :
@@ -163,10 +170,17 @@ def reset_partition_offset(self, partition):
163
170
raise
164
171
165
172
# send_offset_request
166
- (resp , ) = self .client .send_offset_request (reqs )
167
- check_error (resp )
168
- self .offsets [partition ] = resp .offsets [0 ]
169
- self .fetch_offsets [partition ] = resp .offsets [0 ]
173
+ log .info ('Resetting topic-partition offset to %s for %s:%d' ,
174
+ self .auto_offset_reset , self .topic , partition )
175
+ try :
176
+ (resp , ) = self .client .send_offset_request (reqs )
177
+ except KafkaError as e :
178
+ log .error ('%s sending offset request for %s:%d' ,
179
+ e .__class__ .__name__ , self .topic , partition )
180
+ else :
181
+ self .offsets [partition ] = resp .offsets [0 ]
182
+ self .fetch_offsets [partition ] = resp .offsets [0 ]
183
+ return resp .offsets [0 ]
170
184
171
185
def provide_partition_info (self ):
172
186
"""
0 commit comments