-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Refactor MessageSet and Message into LegacyRecordBatch #1252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Looks like there's an error when using pypy:
|
The performance on cPython 2.7.10 is better after this PR. On my MacBook (2.7 GHz Intel Core i7) on OSX it yields: Producer performance (before refactor,
Producer performance (after refactor,
Consumer performance (before refactor,
Consumer performance (after refactor,
|
So on cPython around 24% gain on Produce and 28% gain on Consume (I hope I'm correct =)) |
@dpkp Lol, compared my results to those in #1185 (comment). Seems like we have similar PC's ) |
…rt v2 message format
7df4904
to
9da4c3c
Compare
@dpkp Rebased on top of deferred parsing, quite nice. One last look and I want to merge it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine to me, although I'm not really qualified to review this.
I learned a number of python tricks while reading this, thanks!
kafka/record/abc.py
Outdated
Arguments: | ||
offset (int): Relative offset of record, starting from 0 | ||
timestamp (int or None): Timestamp in milliseconds since beginning | ||
of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omited --> omitted
benchmarks/record_batch_compose.py
Outdated
import itertools | ||
import random | ||
import hashlib | ||
import os |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: imports aren't pep8 ordered/spaced
benchmarks/record_batch_compose.py
Outdated
VALUE_SIZE = 60 | ||
TIMESTAMP_RANGE = [1505824130000, 1505824140000] | ||
|
||
# With values above v1 record is 100 bytes, so 10_000 bytes for 100 messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10,000 bytes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's a habit from developing under 3.6. It allows to writex = 10_000_000
and will just ignore underscores.
benchmarks/record_batch_read.py
Outdated
import itertools | ||
import random | ||
import hashlib | ||
import os |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imports non-pep8
Also, since this isn't merged yet, you might want to fix typo in commit name: |
kafka/producer/record_accumulator.py
Outdated
# Convert compression type to INT presentation. Mostly for unit tests, | ||
# as Producer should pass already converted values. | ||
ct = self.config["compression_type"] | ||
self.config["compression_type"] = self._COMPRESSORS.get(ct, ct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little confusing to reuse the same configuration name here. It means we have to remember that when talking to users they need this to be a string, but when talking to developers / writing tests / etc, they may need this to be an int. Can we use an internal config like _compression_type_int
or compression_type_int
?
kafka/protocol/message.py
Outdated
# RecordAccumulator encodes messagesets internally | ||
if isinstance(items, (io.BytesIO, KafkaBytes)): | ||
size = Int32.decode(items) | ||
# rewind and return all the bytes | ||
items.seek(items.tell() - 4) | ||
if prepend_size: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you also need to alter the number of bytes read depending on whether prepend_size is True / False. Below it is reading size + 4, but that is only correct if we are prepending size. If we are not, I think we only want to read size bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, seems like I missed it...
|
||
@property | ||
def offset(self): | ||
return self._offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with this pattern of using @Property to get private attributes. Is that primarily to "enforce" read-only behavior ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's to enforce read only behaviour
One subtle thing to look out for here is that this alters the output of debug messages from BrokerConnection FetchResponse to include the entire byte buffer for the included messages. In master we do something slightly different. I'm not sure which is better / worse, but we should think about whether we need to adjust this behavior at all. |
…rdBatch anyway. Minor abc doc fixes.
…change after updating to V2 message format
9da4c3c
to
d10051b
Compare
@dpkp It's the debug log, but I don't think it needs to show the whole body. How do you look on altering the |
Showing only first 100 bytes for Bytes object sounds good to me. |
I'm merging, should have fixed all remaining issues. If I missed something will address it in a separate one. |
To later support v2 message format