Skip to content

Commit fbbd6ca

Browse files
Merge pull request dpkp#1252 from dpkp/legacy_records_refactor
Refactor MessageSet and Message into LegacyRecordBatch
2 parents dd8e336 + 365cae0 commit fbbd6ca

26 files changed

+1317
-461
lines changed

benchmarks/README

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
The `record_batch_*` benchmarks in this section are written using
2+
``perf`` library, created by Viktor Stinner. For more information on how to get
3+
reliable results of test runs please consult
4+
http://perf.readthedocs.io/en/latest/run_benchmark.html.

benchmarks/record_batch_compose.py

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/usr/bin/env python3
2+
from __future__ import print_function
3+
import hashlib
4+
import itertools
5+
import os
6+
import random
7+
8+
import perf
9+
10+
from kafka.record.memory_records import MemoryRecordsBuilder
11+
12+
13+
DEFAULT_BATCH_SIZE = 1600 * 1024
14+
KEY_SIZE = 6
15+
VALUE_SIZE = 60
16+
TIMESTAMP_RANGE = [1505824130000, 1505824140000]
17+
18+
# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages
19+
MESSAGES_PER_BATCH = 100
20+
21+
22+
def random_bytes(length):
23+
buffer = bytearray(length)
24+
for i in range(length):
25+
buffer[i] = random.randint(0, 255)
26+
return bytes(buffer)
27+
28+
29+
def prepare():
30+
return iter(itertools.cycle([
31+
(random_bytes(KEY_SIZE),
32+
random_bytes(VALUE_SIZE),
33+
random.randint(*TIMESTAMP_RANGE)
34+
)
35+
for _ in range(int(MESSAGES_PER_BATCH * 1.94))
36+
]))
37+
38+
39+
def finalize(results):
40+
# Just some strange code to make sure PyPy does execute the main code
41+
# properly, without optimizing it away
42+
hash_val = hashlib.md5()
43+
for buf in results:
44+
hash_val.update(buf)
45+
print(hash_val, file=open(os.devnull, "w"))
46+
47+
48+
def func(loops, magic):
49+
# Jit can optimize out the whole function if the result is the same each
50+
# time, so we need some randomized input data )
51+
precomputed_samples = prepare()
52+
results = []
53+
54+
# Main benchmark code.
55+
t0 = perf.perf_counter()
56+
for _ in range(loops):
57+
batch = MemoryRecordsBuilder(
58+
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
59+
for _ in range(MESSAGES_PER_BATCH):
60+
key, value, timestamp = next(precomputed_samples)
61+
size = batch.append(timestamp=timestamp, key=key, value=value)
62+
assert size
63+
batch.close()
64+
results.append(batch.buffer())
65+
66+
res = perf.perf_counter() - t0
67+
68+
finalize(results)
69+
70+
return res
71+
72+
73+
runner = perf.Runner()
74+
runner.bench_time_func('batch_append_v0', func, 0)
75+
runner.bench_time_func('batch_append_v1', func, 1)

benchmarks/record_batch_read.py

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#!/usr/bin/env python
2+
from __future__ import print_function
3+
import hashlib
4+
import itertools
5+
import os
6+
import random
7+
8+
import perf
9+
10+
from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
11+
12+
13+
DEFAULT_BATCH_SIZE = 1600 * 1024
14+
KEY_SIZE = 6
15+
VALUE_SIZE = 60
16+
TIMESTAMP_RANGE = [1505824130000, 1505824140000]
17+
18+
BATCH_SAMPLES = 5
19+
MESSAGES_PER_BATCH = 100
20+
21+
22+
def random_bytes(length):
23+
buffer = bytearray(length)
24+
for i in range(length):
25+
buffer[i] = random.randint(0, 255)
26+
return bytes(buffer)
27+
28+
29+
def prepare(magic):
30+
samples = []
31+
for _ in range(BATCH_SAMPLES):
32+
batch = MemoryRecordsBuilder(
33+
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
34+
for _ in range(MESSAGES_PER_BATCH):
35+
size = batch.append(
36+
random.randint(*TIMESTAMP_RANGE),
37+
random_bytes(KEY_SIZE),
38+
random_bytes(VALUE_SIZE))
39+
assert size
40+
batch.close()
41+
samples.append(bytes(batch.buffer()))
42+
43+
return iter(itertools.cycle(samples))
44+
45+
46+
def finalize(results):
47+
# Just some strange code to make sure PyPy does execute the code above
48+
# properly
49+
hash_val = hashlib.md5()
50+
for buf in results:
51+
hash_val.update(buf)
52+
print(hash_val, file=open(os.devnull, "w"))
53+
54+
55+
def func(loops, magic):
56+
# Jit can optimize out the whole function if the result is the same each
57+
# time, so we need some randomized input data )
58+
precomputed_samples = prepare(magic)
59+
results = []
60+
61+
# Main benchmark code.
62+
batch_data = next(precomputed_samples)
63+
t0 = perf.perf_counter()
64+
for _ in range(loops):
65+
records = MemoryRecords(batch_data)
66+
while records.has_next():
67+
batch = records.next_batch()
68+
batch.validate_crc()
69+
for record in batch:
70+
results.append(record.value)
71+
72+
res = perf.perf_counter() - t0
73+
finalize(results)
74+
75+
return res
76+
77+
78+
runner = perf.Runner()
79+
runner.bench_time_func('batch_read_v0', func, 0)
80+
runner.bench_time_func('batch_read_v1', func, 1)

kafka/consumer/fetcher.py

+27-79
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
from kafka.future import Future
1414
from kafka.metrics.stats import Avg, Count, Max, Rate
1515
from kafka.protocol.fetch import FetchRequest
16-
from kafka.protocol.message import PartialMessage
1716
from kafka.protocol.offset import (
1817
OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
1918
)
19+
from kafka.record import MemoryRecords
2020
from kafka.serializer import Deserializer
2121
from kafka.structs import TopicPartition, OffsetAndTimestamp
2222

@@ -304,7 +304,7 @@ def fetched_records(self, max_records=None):
304304
305305
Raises:
306306
OffsetOutOfRangeError: if no subscription offset_reset_strategy
307-
InvalidMessageError: if message crc validation fails (check_crcs
307+
CorruptRecordException: if message crc validation fails (check_crcs
308308
must be set to True)
309309
RecordTooLargeError: if a message is larger than the currently
310310
configured max_partition_fetch_bytes
@@ -449,57 +449,25 @@ def _message_generator(self):
449449

450450
self._next_partition_records = None
451451

452-
def _unpack_message_set(self, tp, messages):
452+
def _unpack_message_set(self, tp, records):
453453
try:
454-
for offset, size, msg in messages:
455-
if self.config['check_crcs'] and not msg.validate_crc():
456-
raise Errors.InvalidMessageError(msg)
457-
458-
if not msg.is_compressed():
459-
yield self._parse_record(tp, offset, msg.timestamp, msg)
460-
461-
else:
462-
# If relative offset is used, we need to decompress the entire message first
463-
# to compute the absolute offset.
464-
inner_mset = msg.decompress()
465-
466-
# There should only ever be a single layer of compression
467-
if inner_mset[0][-1].is_compressed():
468-
log.warning('MessageSet at %s offset %d appears '
469-
' double-compressed. This should not'
470-
' happen -- check your producers!',
471-
tp, offset)
472-
if self.config['skip_double_compressed_messages']:
473-
log.warning('Skipping double-compressed message at'
474-
' %s %d', tp, offset)
475-
continue
476-
477-
if msg.magic > 0:
478-
last_offset, _, _ = inner_mset[-1]
479-
absolute_base_offset = offset - last_offset
480-
else:
481-
absolute_base_offset = -1
482-
483-
for inner_offset, inner_size, inner_msg in inner_mset:
484-
if msg.magic > 0:
485-
# When magic value is greater than 0, the timestamp
486-
# of a compressed message depends on the
487-
# typestamp type of the wrapper message:
488-
489-
if msg.timestamp_type == 0: # CREATE_TIME (0)
490-
inner_timestamp = inner_msg.timestamp
491-
492-
elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
493-
inner_timestamp = msg.timestamp
494-
495-
else:
496-
raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type))
497-
else:
498-
inner_timestamp = msg.timestamp
499-
500-
if absolute_base_offset >= 0:
501-
inner_offset += absolute_base_offset
502-
yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg)
454+
batch = records.next_batch()
455+
while batch is not None:
456+
for record in batch:
457+
key_size = len(record.key) if record.key is not None else -1
458+
value_size = len(record.value) if record.value is not None else -1
459+
key = self._deserialize(
460+
self.config['key_deserializer'],
461+
tp.topic, record.key)
462+
value = self._deserialize(
463+
self.config['value_deserializer'],
464+
tp.topic, record.value)
465+
yield ConsumerRecord(
466+
tp.topic, tp.partition, record.offset, record.timestamp,
467+
record.timestamp_type, key, value, record.checksum,
468+
key_size, value_size)
469+
470+
batch = records.next_batch()
503471

504472
# If unpacking raises StopIteration, it is erroneously
505473
# caught by the generator. We want all exceptions to be raised
@@ -508,21 +476,6 @@ def _unpack_message_set(self, tp, messages):
508476
log.exception('StopIteration raised unpacking messageset: %s', e)
509477
raise Exception('StopIteration raised unpacking messageset')
510478

511-
# If unpacking raises AssertionError, it means decompression unsupported
512-
# See Issue 1033
513-
except AssertionError as e:
514-
log.exception('AssertionError raised unpacking messageset: %s', e)
515-
raise
516-
517-
def _parse_record(self, tp, offset, timestamp, msg):
518-
key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key)
519-
value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value)
520-
return ConsumerRecord(tp.topic, tp.partition, offset,
521-
timestamp, msg.timestamp_type,
522-
key, value, msg.crc,
523-
len(msg.key) if msg.key is not None else -1,
524-
len(msg.value) if msg.value is not None else -1)
525-
526479
def __iter__(self): # pylint: disable=non-iterator-returned
527480
return self
528481

@@ -784,15 +737,13 @@ def _handle_fetch_response(self, request, send_time, response):
784737

785738
def _parse_fetched_data(self, completed_fetch):
786739
tp = completed_fetch.topic_partition
787-
partition = completed_fetch.partition_data
788740
fetch_offset = completed_fetch.fetched_offset
789741
num_bytes = 0
790742
records_count = 0
791743
parsed_records = None
792744

793745
error_code, highwater = completed_fetch.partition_data[:2]
794746
error_type = Errors.for_code(error_code)
795-
messages = completed_fetch.partition_data[-1]
796747

797748
try:
798749
if not self._subscriptions.is_fetchable(tp):
@@ -816,21 +767,18 @@ def _parse_fetched_data(self, completed_fetch):
816767
position)
817768
return None
818769

819-
partial = None
820-
if messages and isinstance(messages[-1][-1], PartialMessage):
821-
partial = messages.pop()
822-
823-
if messages:
770+
records = MemoryRecords(completed_fetch.partition_data[-1])
771+
if records.has_next():
824772
log.debug("Adding fetched record for partition %s with"
825773
" offset %d to buffered record list", tp,
826774
position)
827-
unpacked = list(self._unpack_message_set(tp, messages))
775+
unpacked = list(self._unpack_message_set(tp, records))
828776
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
829-
last_offset, _, _ = messages[-1]
777+
last_offset = unpacked[-1].offset
830778
self._sensors.records_fetch_lag.record(highwater - last_offset)
831-
num_bytes = sum(msg[1] for msg in messages)
832-
records_count = len(messages)
833-
elif partial:
779+
num_bytes = records.valid_bytes()
780+
records_count = len(unpacked)
781+
elif records.size_in_bytes() > 0:
834782
# we did not read a single message from a non-empty
835783
# buffer because that message's size is larger than
836784
# fetch size, in this case record this exception

kafka/errors.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,15 @@ class OffsetOutOfRangeError(BrokerResponseError):
101101
' maintained by the server for the given topic/partition.')
102102

103103

104-
class InvalidMessageError(BrokerResponseError):
104+
class CorruptRecordException(BrokerResponseError):
105105
errno = 2
106-
message = 'INVALID_MESSAGE'
106+
message = 'CORRUPT_MESSAGE'
107107
description = ('This message has failed its CRC checksum, exceeds the'
108108
' valid size, or is otherwise corrupt.')
109109

110+
# Backward compatibility
111+
InvalidMessageError = CorruptRecordException
112+
110113

111114
class UnknownTopicOrPartitionError(BrokerResponseError):
112115
errno = 3

0 commit comments

Comments
 (0)