Skip to content

KAFKA-3196: Add checksum and size to RecordMetadata and ConsumerRecord #770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@


ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"])
["topic", "partition", "offset", "timestamp", "timestamp_type",
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])


class NoOffsetForPartitionError(Errors.KafkaError):
Expand Down Expand Up @@ -410,13 +411,17 @@ def _unpack_message_set(self, tp, messages):
key, value = self._deserialize(inner_msg)
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
inner_timestamp, msg.timestamp_type,
key, value)
key, value, inner_msg.crc,
len(inner_msg.key) if inner_msg.key is not None else -1,
len(inner_msg.value) if inner_msg.value is not None else -1)

else:
key, value = self._deserialize(msg)
yield ConsumerRecord(tp.topic, tp.partition, offset,
msg.timestamp, msg.timestamp_type,
key, value)
key, value, msg.crc,
len(msg.key) if msg.key is not None else -1,
len(msg.value) if msg.value is not None else -1)

# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, buf, batch_size, compression_type=None, message_version=0):
self._final_size = None

def append(self, offset, message):
"""Apend a Message to the MessageSet.
"""Append a Message to the MessageSet.

Arguments:
offset (int): offset of the message
Expand Down
29 changes: 19 additions & 10 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,29 @@ def wait(self, timeout=None):


class FutureRecordMetadata(Future):
def __init__(self, produce_future, relative_offset, timestamp_ms):
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
self.relative_offset = relative_offset
self.timestamp_ms = timestamp_ms
# packing args as a tuple is a minor speed optimization
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)

def _produce_success(self, offset_and_timestamp):
offset, timestamp_ms = offset_and_timestamp
if timestamp_ms is None:
timestamp_ms = self.timestamp_ms
if offset != -1 and self.relative_offset is not None:
offset += self.relative_offset
offset, produce_timestamp_ms = offset_and_timestamp

# Unpacking from args tuple is minor speed optimization
(relative_offset, timestamp_ms, checksum,
serialized_key_size, serialized_value_size) = self.args

if produce_timestamp_ms is not None:
timestamp_ms = produce_timestamp_ms
if offset != -1 and relative_offset is not None:
offset += relative_offset
tp = self._produce_future.topic_partition
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms)
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
checksum, serialized_key_size,
serialized_value_size)
self.success(metadata)

def get(self, timeout=None):
Expand All @@ -57,4 +64,6 @@ def get(self, timeout=None):
return self.value


RecordMetadata = collections.namedtuple('RecordMetadata', 'topic partition topic_partition offset timestamp')
RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
'checksum', 'serialized_key_size', 'serialized_value_size'])
10 changes: 6 additions & 4 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
'Null messages require kafka >= 0.8.1')
assert not (value is None and key is None), 'Need at least one: key or value'
key_bytes = value_bytes = None
try:
# first make sure the metadata for the topic is
# available
Expand Down Expand Up @@ -497,10 +498,11 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
except Exception as e:
log.debug("Exception occurred during message send: %s", e)
return FutureRecordMetadata(
FutureProduceResult(
TopicPartition(topic, partition)),
-1, None
).failure(e)
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
len(key_bytes) if key_bytes is not None else -1,
len(value_bytes) if value_bytes is not None else -1
).failure(e)

def flush(self, timeout=None):
"""
Expand Down
5 changes: 4 additions & 1 deletion kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ def try_append(self, timestamp_ms, key, value):

msg = Message(value, key=key, magic=self.message_version)
record_size = self.records.append(self.record_count, msg)
checksum = msg.crc # crc is recalculated during records.append()
self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count,
timestamp_ms)
timestamp_ms, checksum,
len(key) if key is not None else -1,
len(value) if value is not None else -1)
self.record_count += 1
return future

Expand Down