@@ -209,11 +209,11 @@ def _raise_if_offset_out_of_range(self):
209
209
log .debug ("Ignoring fetched records for %s since it is no"
210
210
" longer fetchable" , partition )
211
211
continue
212
- consumed = self ._subscriptions .assignment [partition ].consumed
213
- # ignore partition if its consumed offset != offset in FetchResponse
212
+ position = self ._subscriptions .assignment [partition ].position
213
+ # ignore partition if the current position != offset in FetchResponse
214
214
# e.g. after seek()
215
- if consumed is not None and offset == consumed :
216
- current_out_of_range_partitions [partition ] = offset
215
+ if position is not None and offset == position :
216
+ current_out_of_range_partitions [partition ] = position
217
217
218
218
self ._offset_out_of_range_partitions .clear ()
219
219
if current_out_of_range_partitions :
@@ -290,31 +290,30 @@ def fetched_records(self):
290
290
" since it is no longer assigned" , tp )
291
291
continue
292
292
293
- # note that the consumed position should always be available
293
+ # note that the position should always be available
294
294
# as long as the partition is still assigned
295
- consumed = self ._subscriptions .assignment [tp ].consumed
295
+ position = self ._subscriptions .assignment [tp ].position
296
296
if not self ._subscriptions .is_fetchable (tp ):
297
- # this can happen when a partition consumption paused before
297
+ # this can happen when a partition is paused before
298
298
# fetched records are returned to the consumer's poll call
299
299
log .debug ("Not returning fetched records for assigned partition"
300
300
" %s since it is no longer fetchable" , tp )
301
301
302
- # we also need to reset the fetch positions to pretend we did
303
- # not fetch this partition in the previous request at all
304
- self ._subscriptions .assignment [tp ].fetched = consumed
305
- elif fetch_offset == consumed :
302
+ elif fetch_offset == position :
306
303
next_offset = messages [- 1 ][0 ] + 1
307
- log .debug ("Returning fetched records for assigned partition %s"
308
- " and update consumed position to %s" , tp , next_offset )
309
- self ._subscriptions .assignment [tp ].consumed = next_offset
304
+ log .debug ("Returning fetched records at offset %d for assigned"
305
+ " partition %s and update position to %s" , position ,
306
+ tp , next_offset )
307
+ self ._subscriptions .assignment [tp ].position = next_offset
310
308
311
309
for record in self ._unpack_message_set (tp , messages ):
312
310
drained [tp ].append (record )
313
311
else :
314
312
# these records aren't next in line based on the last consumed
315
313
# position, ignore them they must be from an obsolete request
316
- log .debug ("Ignoring fetched records for %s at offset %s" ,
317
- tp , fetch_offset )
314
+ log .debug ("Ignoring fetched records for %s at offset %s since"
315
+ " the current position is %d" , tp , fetch_offset ,
316
+ position )
318
317
return dict (drained )
319
318
320
319
def _unpack_message_set (self , tp , messages ):
@@ -351,20 +350,16 @@ def _message_generator(self):
351
350
352
351
# note that the consumed position should always be available
353
352
# as long as the partition is still assigned
354
- consumed = self ._subscriptions .assignment [tp ].consumed
353
+ position = self ._subscriptions .assignment [tp ].position
355
354
if not self ._subscriptions .is_fetchable (tp ):
356
355
# this can happen when a partition consumption paused before
357
356
# fetched records are returned
358
357
log .warning ("Not returning fetched records for assigned partition"
359
358
" %s since it is no longer fetchable" , tp )
360
359
361
- # we also need to reset the fetch positions to pretend we did
362
- # not fetch this partition in the previous request at all
363
- self ._subscriptions .assignment [tp ].fetched = consumed
364
-
365
- elif fetch_offset == consumed :
360
+ elif fetch_offset == position :
366
361
for msg in self ._unpack_message_set (tp , messages ):
367
- self ._subscriptions .assignment [tp ].consumed = msg .offset + 1
362
+ self ._subscriptions .assignment [tp ].position = msg .offset + 1
368
363
yield msg
369
364
else :
370
365
# these records aren't next in line based on the last consumed
@@ -494,19 +489,15 @@ def _create_fetch_requests(self):
494
489
# if there is a leader and no in-flight requests,
495
490
# issue a new fetch but only fetch data for partitions whose
496
491
# previously fetched data has been consumed
497
- fetched = self ._subscriptions .assignment [partition ].fetched
498
- consumed = self ._subscriptions .assignment [partition ].consumed
499
- if consumed == fetched :
500
- partition_info = (
501
- partition .partition ,
502
- fetched ,
503
- self .config ['max_partition_fetch_bytes' ]
504
- )
505
- fetchable [node_id ][partition .topic ].append (partition_info )
506
- else :
507
- log .debug ("Skipping FetchRequest to %s because previously"
508
- " fetched offsets (%s) have not been fully"
509
- " consumed yet (%s)" , node_id , fetched , consumed )
492
+ position = self ._subscriptions .assignment [partition ].position
493
+ partition_info = (
494
+ partition .partition ,
495
+ position ,
496
+ self .config ['max_partition_fetch_bytes' ]
497
+ )
498
+ fetchable [node_id ][partition .topic ].append (partition_info )
499
+ log .debug ("Adding fetch request for partition %d at offset %d" ,
500
+ partition , position )
510
501
511
502
requests = {}
512
503
for node_id , partition_data in six .iteritems (fetchable ):
@@ -541,25 +532,24 @@ def _handle_fetch_response(self, request, response):
541
532
542
533
# we are interested in this fetch only if the beginning
543
534
# offset matches the current consumed position
544
- consumed = self ._subscriptions .assignment [tp ].consumed
545
- if consumed is None :
546
- continue
547
- elif consumed != fetch_offset :
548
- # the fetched position has gotten out of sync with the
549
- # consumed position (which might happen when a
550
- # rebalance occurs with a fetch in-flight), so we need
551
- # to reset the fetch position so the next fetch is right
552
- self ._subscriptions .assignment [tp ].fetched = consumed
535
+ position = self ._subscriptions .assignment [tp ].position
536
+ if position is None or position != fetch_offset :
537
+ log .debug ("Discarding fetch response for partition %s"
538
+ " since its offset %d does not match the"
539
+ " expected offset %d" , tp , fetch_offset ,
540
+ position )
553
541
continue
554
542
555
543
partial = None
556
544
if messages and isinstance (messages [- 1 ][- 1 ], PartialMessage ):
557
545
partial = messages .pop ()
558
546
559
547
if messages :
560
- last_offset , _ , _ = messages [- 1 ]
561
- self ._subscriptions .assignment [tp ].fetched = last_offset + 1
548
+ log .debug ("Adding fetched record for partition %s with"
549
+ " offset %d to buffered record list" , tp ,
550
+ position )
562
551
self ._records .append ((fetch_offset , tp , messages ))
552
+ #last_offset, _, _ = messages[-1]
563
553
#self.sensors.records_fetch_lag.record(highwater - last_offset)
564
554
elif partial :
565
555
# we did not read a single message from a non-empty
@@ -581,7 +571,7 @@ def _handle_fetch_response(self, request, response):
581
571
else :
582
572
self ._offset_out_of_range_partitions [tp ] = fetch_offset
583
573
log .info ("Fetch offset %s is out of range, resetting offset" ,
584
- self . _subscriptions . assignment [ tp ]. fetched )
574
+ fetch_offset )
585
575
elif error_type is Errors .TopicAuthorizationFailedError :
586
576
log .warn ("Not authorized to read from topic %s." , tp .topic )
587
577
self ._unauthorized_topics .add (tp .topic )
0 commit comments