13
13
from kafka .future import Future
14
14
from kafka .metrics .stats import Avg , Count , Max , Rate
15
15
from kafka .protocol .fetch import FetchRequest
16
- from kafka .protocol .message import PartialMessage
17
16
from kafka .protocol .offset import (
18
17
OffsetRequest , OffsetResetStrategy , UNKNOWN_OFFSET
19
18
)
19
+ from kafka .record import MemoryRecords
20
20
from kafka .serializer import Deserializer
21
21
from kafka .structs import TopicPartition , OffsetAndTimestamp
22
22
@@ -304,7 +304,7 @@ def fetched_records(self, max_records=None):
304
304
305
305
Raises:
306
306
OffsetOutOfRangeError: if no subscription offset_reset_strategy
307
- InvalidMessageError : if message crc validation fails (check_crcs
307
+ CorruptRecordException : if message crc validation fails (check_crcs
308
308
must be set to True)
309
309
RecordTooLargeError: if a message is larger than the currently
310
310
configured max_partition_fetch_bytes
@@ -449,57 +449,25 @@ def _message_generator(self):
449
449
450
450
self ._next_partition_records = None
451
451
452
- def _unpack_message_set (self , tp , messages ):
452
+ def _unpack_message_set (self , tp , records ):
453
453
try :
454
- for offset , size , msg in messages :
455
- if self .config ['check_crcs' ] and not msg .validate_crc ():
456
- raise Errors .InvalidMessageError (msg )
457
-
458
- if not msg .is_compressed ():
459
- yield self ._parse_record (tp , offset , msg .timestamp , msg )
460
-
461
- else :
462
- # If relative offset is used, we need to decompress the entire message first
463
- # to compute the absolute offset.
464
- inner_mset = msg .decompress ()
465
-
466
- # There should only ever be a single layer of compression
467
- if inner_mset [0 ][- 1 ].is_compressed ():
468
- log .warning ('MessageSet at %s offset %d appears '
469
- ' double-compressed. This should not'
470
- ' happen -- check your producers!' ,
471
- tp , offset )
472
- if self .config ['skip_double_compressed_messages' ]:
473
- log .warning ('Skipping double-compressed message at'
474
- ' %s %d' , tp , offset )
475
- continue
476
-
477
- if msg .magic > 0 :
478
- last_offset , _ , _ = inner_mset [- 1 ]
479
- absolute_base_offset = offset - last_offset
480
- else :
481
- absolute_base_offset = - 1
482
-
483
- for inner_offset , inner_size , inner_msg in inner_mset :
484
- if msg .magic > 0 :
485
- # When magic value is greater than 0, the timestamp
486
- # of a compressed message depends on the
487
- # typestamp type of the wrapper message:
488
-
489
- if msg .timestamp_type == 0 : # CREATE_TIME (0)
490
- inner_timestamp = inner_msg .timestamp
491
-
492
- elif msg .timestamp_type == 1 : # LOG_APPEND_TIME (1)
493
- inner_timestamp = msg .timestamp
494
-
495
- else :
496
- raise ValueError ('Unknown timestamp type: {0}' .format (msg .timestamp_type ))
497
- else :
498
- inner_timestamp = msg .timestamp
499
-
500
- if absolute_base_offset >= 0 :
501
- inner_offset += absolute_base_offset
502
- yield self ._parse_record (tp , inner_offset , inner_timestamp , inner_msg )
454
+ batch = records .next_batch ()
455
+ while batch is not None :
456
+ for record in batch :
457
+ key_size = len (record .key ) if record .key is not None else - 1
458
+ value_size = len (record .value ) if record .value is not None else - 1
459
+ key = self ._deserialize (
460
+ self .config ['key_deserializer' ],
461
+ tp .topic , record .key )
462
+ value = self ._deserialize (
463
+ self .config ['value_deserializer' ],
464
+ tp .topic , record .value )
465
+ yield ConsumerRecord (
466
+ tp .topic , tp .partition , record .offset , record .timestamp ,
467
+ record .timestamp_type , key , value , record .checksum ,
468
+ key_size , value_size )
469
+
470
+ batch = records .next_batch ()
503
471
504
472
# If unpacking raises StopIteration, it is erroneously
505
473
# caught by the generator. We want all exceptions to be raised
@@ -508,21 +476,6 @@ def _unpack_message_set(self, tp, messages):
508
476
log .exception ('StopIteration raised unpacking messageset: %s' , e )
509
477
raise Exception ('StopIteration raised unpacking messageset' )
510
478
511
- # If unpacking raises AssertionError, it means decompression unsupported
512
- # See Issue 1033
513
- except AssertionError as e :
514
- log .exception ('AssertionError raised unpacking messageset: %s' , e )
515
- raise
516
-
517
- def _parse_record (self , tp , offset , timestamp , msg ):
518
- key = self ._deserialize (self .config ['key_deserializer' ], tp .topic , msg .key )
519
- value = self ._deserialize (self .config ['value_deserializer' ], tp .topic , msg .value )
520
- return ConsumerRecord (tp .topic , tp .partition , offset ,
521
- timestamp , msg .timestamp_type ,
522
- key , value , msg .crc ,
523
- len (msg .key ) if msg .key is not None else - 1 ,
524
- len (msg .value ) if msg .value is not None else - 1 )
525
-
526
479
def __iter__ (self ): # pylint: disable=non-iterator-returned
527
480
return self
528
481
@@ -784,15 +737,13 @@ def _handle_fetch_response(self, request, send_time, response):
784
737
785
738
def _parse_fetched_data (self , completed_fetch ):
786
739
tp = completed_fetch .topic_partition
787
- partition = completed_fetch .partition_data
788
740
fetch_offset = completed_fetch .fetched_offset
789
741
num_bytes = 0
790
742
records_count = 0
791
743
parsed_records = None
792
744
793
745
error_code , highwater = completed_fetch .partition_data [:2 ]
794
746
error_type = Errors .for_code (error_code )
795
- messages = completed_fetch .partition_data [- 1 ]
796
747
797
748
try :
798
749
if not self ._subscriptions .is_fetchable (tp ):
@@ -816,21 +767,18 @@ def _parse_fetched_data(self, completed_fetch):
816
767
position )
817
768
return None
818
769
819
- partial = None
820
- if messages and isinstance (messages [- 1 ][- 1 ], PartialMessage ):
821
- partial = messages .pop ()
822
-
823
- if messages :
770
+ records = MemoryRecords (completed_fetch .partition_data [- 1 ])
771
+ if records .has_next ():
824
772
log .debug ("Adding fetched record for partition %s with"
825
773
" offset %d to buffered record list" , tp ,
826
774
position )
827
- unpacked = list (self ._unpack_message_set (tp , messages ))
775
+ unpacked = list (self ._unpack_message_set (tp , records ))
828
776
parsed_records = self .PartitionRecords (fetch_offset , tp , unpacked )
829
- last_offset , _ , _ = messages [- 1 ]
777
+ last_offset = unpacked [- 1 ]. offset
830
778
self ._sensors .records_fetch_lag .record (highwater - last_offset )
831
- num_bytes = sum ( msg [ 1 ] for msg in messages )
832
- records_count = len (messages )
833
- elif partial :
779
+ num_bytes = records . valid_bytes ( )
780
+ records_count = len (unpacked )
781
+ elif records . size_in_bytes () > 0 :
834
782
# we did not read a single message from a non-empty
835
783
# buffer because that message's size is larger than
836
784
# fetch size, in this case record this exception
0 commit comments