Skip to content

Refactor MessageSet and Message into LegacyRecordBatch #1252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 14, 2017

Conversation

tvoinarovskyi
Copy link
Collaborator

To later support v2 message format

@dpkp
Copy link
Owner

dpkp commented Oct 10, 2017

Looks like there's an error when using pypy:

Traceback (most recent call last):
  File "/home/travis/build/dpkp/kafka-python/kafka/producer/sender.py", line 60, in run
    self.run_once()
  File "/home/travis/build/dpkp/kafka-python/kafka/producer/sender.py", line 116, in run_once
    self._metadata, ready_nodes, self.config['max_request_size'])
  File "/home/travis/build/dpkp/kafka-python/kafka/producer/record_accumulator.py", line 502, in drain
    batch.records.close()
  File "/home/travis/build/dpkp/kafka-python/kafka/record/memory_records.py", line 151, in close
    self._buffer = bytes(self._builder.build())
  File "/home/travis/build/dpkp/kafka-python/kafka/record/legacy_records.py", line 440, in build
    self._maybe_compress()
  File "/home/travis/build/dpkp/kafka-python/kafka/record/legacy_records.py", line 418, in _maybe_compress
    compressed = snappy_encode(self._buffer)
  File "/home/travis/build/dpkp/kafka-python/kafka/codec.py", line 136, in snappy_encode
    block = snappy.compress(chunk)
  File "/home/travis/build/dpkp/kafka-python/.tox/pypy/site-packages/snappy.py", line 84, in compress
    return _compress(data)
  File "/home/travis/build/dpkp/kafka-python/.tox/pypy/site-packages/snappy_cffi.py", line 193, in compress
    _input_data, _input_size = prepare(data)
  File "/home/travis/build/dpkp/kafka-python/.tox/pypy/site-packages/snappy_cffi.py", line 183, in prepare
    _out_data = ffi.new('char[]', data)
  File "/opt/python/pypy2.7-5.8.0/lib_pypy/cffi/api.py", line 250, in new
    return self._backend.newp(cdecl, init)
TypeError: 'bytearray' object cannot be interpreted as an index

@tvoinarovskyi
Copy link
Collaborator Author

The performance on cPython 2.7.10 is better after this PR. On my MacBook (2.7 GHz Intel Core i7) on OSX it yields:

Producer performance (before refactor, master):

8988.24744641 records/sec (1205293.60883 B/sec), 2.20609148307 latency, 134.0 record size, 5555.60516605 batch size, 41.4298892989 records/req
8979.79654646 records/sec (1204184.38615 B/sec), 2.16188681334 latency, 134.0 record size, 5404.19398294 batch size, 40.2999550965 records/req
8862.39114729 records/sec (1188477.99544 B/sec), 2.17066042747 latency, 134.0 record size, 5181.86904005 batch size, 38.6408137317 records/req

Producer performance (after refactor, legacy_records_refactor branch):

11156.8740853 records/sec (1495023.20879 B/sec), 1.95941633879 latency, 134.0 record size, 5975.13554578 batch size, 44.5905637745 records/req
11154.633705 records/sec (1494721.64645 B/sec), 2.01865647889 latency, 134.0 record size, 6027.51651894 batch size, 44.9516324063 records/req
11110.6124732 records/sec (1488822.26352 B/sec), 1.98895959568 latency, 134.0 record size, 6014.45409253 batch size, 44.8839857651 records/req

Consumer performance (before refactor, master):

25696.5338939 records/sec (3134973.52202 B/sec), 191.90095365 latency, 3.28401605057 fetch/s, 954650.0 fetch size, 1616576.0 max record lag, 7825.0 records/req
25670.0861078 records/sec (3131746.94506 B/sec), 189.899698381 latency, 3.28070306302 fetch/s, 954650.0 fetch size, 1483551.0 max record lag, 7825.0 records/req
25129.3707579 records/sec (3065782.302 B/sec), 189.820181939 latency, 3.21152937427 fetch/s, 954650.0 fetch size, 1358351.0 max record lag, 7825.0 records/req

Consumer performance (after refactor, legacy_records_refactor branch):

32384.5603453 records/sec (4339529.05435 B/sec), 11.7965965736 latency, 4.13878035169 fetch/s, 1048550.0 fetch size, 663526.0 max record lag, 7825.0 records/req
32560.4913518 records/sec (4363110.27358 B/sec), 11.7553327142 latency, 4.16130993247 fetch/s, 1048550.0 fetch size, 499201.0 max record lag, 7825.0 records/req
33022.7197667 records/sec (4425042.82047 B/sec), 11.722275189 latency, 4.22039586671 fetch/s, 1048550.0 fetch size, 334876.0 max record lag, 7825.0 records/req

@tvoinarovskyi
Copy link
Collaborator Author

tvoinarovskyi commented Oct 11, 2017

So on cPython around 24% gain on Produce and 28% gain on Consume (I hope I'm correct =))

@tvoinarovskyi
Copy link
Collaborator Author

@dpkp Lol, compared my results to those in #1185 (comment). Seems like we have similar PC's )

@tvoinarovskyi
Copy link
Collaborator Author

@dpkp Rebased on top of deferred parsing, quite nice. One last look and I want to merge it.

Copy link
Contributor

@jeffwidman jeffwidman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me, although I'm not really qualified to review this.

I learned a number of python tricks while reading this, thanks!

Arguments:
offset (int): Relative offset of record, starting from 0
timestamp (int or None): Timestamp in milliseconds since beginning
of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omited --> omitted

import itertools
import random
import hashlib
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: imports aren't pep8 ordered/spaced

VALUE_SIZE = 60
TIMESTAMP_RANGE = [1505824130000, 1505824140000]

# With values above v1 record is 100 bytes, so 10_000 bytes for 100 messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10,000 bytes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a habit from developing under 3.6. It allows to writex = 10_000_000 and will just ignore underscores.

import itertools
import random
import hashlib
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imports non-pep8

@jeffwidman
Copy link
Contributor

Also, since this isn't merged yet, you might want to fix typo in commit name: Fix snappy comprassion on PyPy... comprassion --> compression

# Convert compression type to INT presentation. Mostly for unit tests,
# as Producer should pass already converted values.
ct = self.config["compression_type"]
self.config["compression_type"] = self._COMPRESSORS.get(ct, ct)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little confusing to reuse the same configuration name here. It means we have to remember that when talking to users they need this to be a string, but when talking to developers / writing tests / etc, they may need this to be an int. Can we use an internal config like _compression_type_int or compression_type_int ?

# RecordAccumulator encodes messagesets internally
if isinstance(items, (io.BytesIO, KafkaBytes)):
size = Int32.decode(items)
# rewind and return all the bytes
items.seek(items.tell() - 4)
if prepend_size:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also need to alter the number of bytes read depending on whether prepend_size is True / False. Below it is reading size + 4, but that is only correct if we are prepending size. If we are not, I think we only want to read size bytes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, seems like I missed it...


@property
def offset(self):
return self._offset
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this pattern of using @Property to get private attributes. Is that primarily to "enforce" read-only behavior ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's to enforce read only behaviour

@dpkp
Copy link
Owner

dpkp commented Oct 12, 2017

One subtle thing to look out for here is that this alters the output of debug messages from BrokerConnection FetchResponse to include the entire byte buffer for the included messages. In master we do something slightly different. I'm not sure which is better / worse, but we should think about whether we need to adjust this behavior at all.

@tvoinarovskyi tvoinarovskyi force-pushed the legacy_records_refactor branch from 9da4c3c to d10051b Compare October 12, 2017 08:11
@tvoinarovskyi
Copy link
Collaborator Author

@dpkp It's the debug log, but I don't think it needs to show the whole body. How do you look on altering the Bytes.repr to limit the amount of data it prints to say 100 bytes?

@dpkp
Copy link
Owner

dpkp commented Oct 12, 2017

Showing only first 100 bytes for Bytes object sounds good to me.

@tvoinarovskyi
Copy link
Collaborator Author

I'm merging, should have fixed all remaining issues. If I missed something will address it in a separate one.

@tvoinarovskyi tvoinarovskyi merged commit fbbd6ca into master Oct 14, 2017
@tvoinarovskyi tvoinarovskyi deleted the legacy_records_refactor branch October 14, 2017 20:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants