Skip to content

Commit 68c8a4a

Browse files
authored
KAFKA-3196: Add checksum and size to RecordMetadata and ConsumerRecord (dpkp#770 / dpkp#594)
1 parent bcbc0c4 commit 68c8a4a

File tree

5 files changed

+38
-19
lines changed

5 files changed

+38
-19
lines changed

kafka/consumer/fetcher.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020

2121

2222
ConsumerRecord = collections.namedtuple("ConsumerRecord",
23-
["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"])
23+
["topic", "partition", "offset", "timestamp", "timestamp_type",
24+
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
2425

2526

2627
class NoOffsetForPartitionError(Errors.KafkaError):
@@ -410,13 +411,17 @@ def _unpack_message_set(self, tp, messages):
410411
key, value = self._deserialize(inner_msg)
411412
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
412413
inner_timestamp, msg.timestamp_type,
413-
key, value)
414+
key, value, inner_msg.crc,
415+
len(inner_msg.key) if inner_msg.key is not None else -1,
416+
len(inner_msg.value) if inner_msg.value is not None else -1)
414417

415418
else:
416419
key, value = self._deserialize(msg)
417420
yield ConsumerRecord(tp.topic, tp.partition, offset,
418421
msg.timestamp, msg.timestamp_type,
419-
key, value)
422+
key, value, msg.crc,
423+
len(msg.key) if msg.key is not None else -1,
424+
len(msg.value) if msg.value is not None else -1)
420425

421426
# If unpacking raises StopIteration, it is erroneously
422427
# caught by the generator. We want all exceptions to be raised

kafka/producer/buffer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __init__(self, buf, batch_size, compression_type=None, message_version=0):
5959
self._final_size = None
6060

6161
def append(self, offset, message):
62-
"""Apend a Message to the MessageSet.
62+
"""Append a Message to the MessageSet.
6363
6464
Arguments:
6565
offset (int): offset of the message

kafka/producer/future.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,29 @@ def wait(self, timeout=None):
2929

3030

3131
class FutureRecordMetadata(Future):
32-
def __init__(self, produce_future, relative_offset, timestamp_ms):
32+
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
3333
super(FutureRecordMetadata, self).__init__()
3434
self._produce_future = produce_future
35-
self.relative_offset = relative_offset
36-
self.timestamp_ms = timestamp_ms
35+
# packing args as a tuple is a minor speed optimization
36+
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
3737
produce_future.add_callback(self._produce_success)
3838
produce_future.add_errback(self.failure)
3939

4040
def _produce_success(self, offset_and_timestamp):
41-
offset, timestamp_ms = offset_and_timestamp
42-
if timestamp_ms is None:
43-
timestamp_ms = self.timestamp_ms
44-
if offset != -1 and self.relative_offset is not None:
45-
offset += self.relative_offset
41+
offset, produce_timestamp_ms = offset_and_timestamp
42+
43+
# Unpacking from args tuple is minor speed optimization
44+
(relative_offset, timestamp_ms, checksum,
45+
serialized_key_size, serialized_value_size) = self.args
46+
47+
if produce_timestamp_ms is not None:
48+
timestamp_ms = produce_timestamp_ms
49+
if offset != -1 and relative_offset is not None:
50+
offset += relative_offset
4651
tp = self._produce_future.topic_partition
47-
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms)
52+
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
53+
checksum, serialized_key_size,
54+
serialized_value_size)
4855
self.success(metadata)
4956

5057
def get(self, timeout=None):
@@ -57,4 +64,6 @@ def get(self, timeout=None):
5764
return self.value
5865

5966

60-
RecordMetadata = collections.namedtuple('RecordMetadata', 'topic partition topic_partition offset timestamp')
67+
RecordMetadata = collections.namedtuple(
68+
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
69+
'checksum', 'serialized_key_size', 'serialized_value_size'])

kafka/producer/kafka.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,7 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
457457
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
458458
'Null messages require kafka >= 0.8.1')
459459
assert not (value is None and key is None), 'Need at least one: key or value'
460+
key_bytes = value_bytes = None
460461
try:
461462
# first make sure the metadata for the topic is
462463
# available
@@ -497,10 +498,11 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
497498
except Exception as e:
498499
log.debug("Exception occurred during message send: %s", e)
499500
return FutureRecordMetadata(
500-
FutureProduceResult(
501-
TopicPartition(topic, partition)),
502-
-1, None
503-
).failure(e)
501+
FutureProduceResult(TopicPartition(topic, partition)),
502+
-1, None, None,
503+
len(key_bytes) if key_bytes is not None else -1,
504+
len(value_bytes) if value_bytes is not None else -1
505+
).failure(e)
504506

505507
def flush(self, timeout=None):
506508
"""

kafka/producer/record_accumulator.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,13 @@ def try_append(self, timestamp_ms, key, value):
5757

5858
msg = Message(value, key=key, magic=self.message_version)
5959
record_size = self.records.append(self.record_count, msg)
60+
checksum = msg.crc # crc is recalculated during records.append()
6061
self.max_record_size = max(self.max_record_size, record_size)
6162
self.last_append = time.time()
6263
future = FutureRecordMetadata(self.produce_future, self.record_count,
63-
timestamp_ms)
64+
timestamp_ms, checksum,
65+
len(key) if key is not None else -1,
66+
len(value) if value is not None else -1)
6467
self.record_count += 1
6568
return future
6669

0 commit comments

Comments
 (0)