Description
Hi all
This might be my issue but I seem to be unable to read from topics that have snappy compressed messages on them
The Kafka console methods are able to read these topics (and as a result I suspect, but I have not tested, that java clients are able to as well).
My python test code is really simple
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
kafka = KafkaClient('compute-10-2-60-63.us-east-1.urx.internal', 6667)
consumer = SimpleConsumer(kafka, 'etl-pipeline', 'raw-clicks')
it = iter(consumer)
next(it)
This gives me
greg@localhost ~/projects/kafka-python $ ipython -i -- ./test.py
Python 2.7.6 (default, Dec 1 2013, 21:04:11)
Type "copyright", "credits" or "license" for more information.
IPython 1.1.0 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
---------------------------------------------------------------------------
UncompressError Traceback (most recent call last)
/usr/lib64/python2.7/site-packages/IPython/utils/py3compat.pyc in execfile(fname, *where)
202 else:
203 filename = fname
--> 204 __builtin__.execfile(filename, *where)
/home/greg/projects/kafka-python/test.py in <module>()
5 consumer = SimpleConsumer(kafka, 'etl-pipeline', 'raw-clicks')
6 it = iter(consumer)
----> 7 next(it)
/home/greg/projects/kafka-python/kafka/consumer.pyc in __iter__(self)
392
393 while True:
--> 394 message = self.get_message(True, timeout)
395 if message:
396 yield message
/home/greg/projects/kafka-python/kafka/consumer.pyc in get_message(self, block, timeout, get_partition_info)
351
352 def get_message(self, block=True, timeout=0.1, get_partition_info=None):
--> 353 return self._get_message(block, timeout, get_partition_info)
354
355 def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
/home/greg/projects/kafka-python/kafka/consumer.pyc in _get_message(self, block, timeout, get_partition_info, update_offset)
364 # We're out of messages, go grab some more.
365 with FetchContext(self, block, timeout):
--> 366 self._fetch()
367 try:
368 partition, message = self.queue.get_nowait()
/home/greg/projects/kafka-python/kafka/consumer.pyc in _fetch(self)
422 partition = resp.partition
423 try:
--> 424 for message in resp.messages:
425 # Put the message in our queue
426 self.queue.put((partition, message))
/home/greg/projects/kafka-python/kafka/protocol.pyc in _decode_message_set_iter(cls, data)
116 ((offset, ), cur) = relative_unpack('>q', data, cur)
117 (msg, cur) = read_int_string(data, cur)
--> 118 for (offset, message) in KafkaProtocol._decode_message(msg, offset):
119 read_message = True
120 yield OffsetAndMessage(offset, message)
/home/greg/projects/kafka-python/kafka/protocol.pyc in _decode_message(cls, data, offset)
163
164 elif codec == KafkaProtocol.CODEC_SNAPPY:
--> 165 snp = snappy_decode(value)
166 for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
167 yield (offset, msg)
/home/greg/projects/kafka-python/kafka/codec.pyc in snappy_decode(payload)
46 if not _has_snappy:
47 raise NotImplementedError("Snappy codec is not available")
---> 48 return snappy.decompress(payload)
UncompressError: Error while decompressing: invalid input
In [1]: %debug
> /home/greg/projects/kafka-python/kafka/codec.py(48)snappy_decode()
46 if not _has_snappy:
47 raise NotImplementedError("Snappy codec is not available")
---> 48 return snappy.decompress(payload)
ipdb> payload
'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x04\xdb\xb9\r\x00\x00\x15\x01....
open('/tmp/out', 'w').write(payload)
It is interesting to note that the payload contains the literal word SNAPPY as if it is being used as a magic marker, from python snappy seems to compress strings without this marker
In [1]: import snappy
In [2]: snappy.compress('aaaaaaaaaaaaaaaa')
Out[2]: '\x10<aaaaaaaaaaaaaaaa'
I also note that with the following C test program, the google snappy library itself cannot uncompress the data
#include <stdio.h>
#include <snappy-c.h>
#include <stdlib.h>
int main(int argc, char** argv) {
char* buffer = calloc(1024*1024, sizeof(char));
if (buffer == NULL) {
return -1;
}
FILE *fp = fopen("/tmp/out", "r");
int i = 0;
fseek(fp, 0, SEEK_END);
int length = 0;
length = ftell(fp);
fseek(fp, 0, SEEK_SET);
char* input;
input = malloc(length);
if (input) {
fread(input, 1, length, fp);
}
fclose(fp);
size_t len2 = 1024*1024;
if (snappy_uncompress(input, length, buffer, &len2) != SNAPPY_OK) {
printf("BAD BAD BAD\n");
} else {
printf("%s\n", buffer);
}
return 0;
}
..... This leads me to reason that maybe the protocol handling for snappy might be not quite right (or just as likely I have done something moronic)
I am using:
- kafka-python as of master
- snappy-python as of whatever is in pypi
- snappy 1.1.1 (not the recommended version, but what I had already on my machine)
- kafka 0.8.0 (scala 2.10)
I will keep poking at it to see if I can figure out why its not working right.
Any thoughts ?