Skip to content

Merging in some consumer fixes #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 36 commits into from
Closed

Merging in some consumer fixes #2

wants to merge 36 commits into from

Conversation

whalesalad
Copy link

Merging this in dpkp#136

mrtheb and others added 30 commits November 14, 2013 09:26
Conflicts:
	kafka/client.py
	kafka/conn.py
	setup.py
	test/test_integration.py
	test/test_unit.py
Fixes dpkp#126

TL;DR
=====
This makes it possible to read and write snappy compressed streams that
are compatible with the java and scala kafka clients (the xerial
blocking format))

Xerial Details
==============
Kafka supports transparent compression of data (both in transit and at
rest) of messages, one of the allowable compression algorithms is
Google's snappy, an algorithm which has excellent performance at the
cost of efficiency.

The specific implementation of snappy used in kafka is the xerial-snappy
implementation, this is a readily available java library for snappy.

As part of this implementation, there is a specialised blocking format
that is somewhat none standard in the snappy world.

Xerial Format
-------------
The blocking mode of the xerial snappy library is fairly simple, using a
magic header to identify itself and then a size + block scheme, unless
otherwise noted all items in xerials blocking format are assumed to be
big-endian.

A block size (```xerial_blocksize``` in implementation) controls how
frequent the blocking occurs 32k is the default in the xerial library,
this blocking controls the size of the uncompressed chunks that will be
fed to snappy to be compressed.

The format winds up being
|   Header    | Block1 len | Block1 data  | Blockn len | Blockn data  |
| ----------- | ---------- | ------------ | ---------- | ------------ |
|  16 bytes   |  BE int32  | snappy bytes |  BE int32  | snappy bytes |

It is important to not that the blocksize is the amount of uncompressed
data presented to snappy at each block, whereas the blocklen is the
number of bytes that will be present in the stream, that is the
length will always be <= blocksize.

Xerial blocking header
----------------------
Marker | Magic String | Null / Pad | Version  | Compat
------ | ------------ | ---------- | -------- | --------
 byte  |   c-string   |    byte    |  int32   | int32
------ | ------------ | ---------- | -------- | --------
 -126  |   'SNAPPY'   |     \0     | variable | variable

The pad appears to be to ensure that SNAPPY is a valid cstring, and to
align the header on a word boundary.

The version is the version of this format as written by xerial, in the
wild this is currently 1 as such we only support v1.

Compat is there to claim the minimum supported version that can read a
xerial block stream, presently in the wild this is 1.

Implementation specific details
===============================
The implementation presented here follows the Xerial implementation as
of its v1 blocking format, no attempts are made to check for future
versions. Since none-xerial aware clients might have persisted snappy
compressed messages to kafka brokers we allow clients to turn on xerial
compatibility for message sending, and perform header sniffing to detect
xerial vs plain snappy payloads.
Make it possible to read and write xerial snappy
Support for multiple hosts on KafkaClient boostrap (improves on dpkp#70)
…-lists

If a broker refuses the connection, try the next
Conflicts:
	test/test_unit.py
Check against basestring instead of str in collect.hosts.
mrtheb and others added 6 commits March 20, 2014 09:08
Fix py26 compatibility issue, add mock to tox
TopicAndPartition fix when partition has no leader = -1
conn.py performance improvements, make examples work, add another example
Make seek(); commit(); work without commit discarding the seek change
@whalesalad
Copy link
Author

Oops. I was trying to merge your stuff into my own branch. Please delete!

@whalesalad whalesalad closed this Apr 15, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants