Skip to content

SimpleConsumer errors reading snappy #126

Closed
@GregBowyer

Description

@GregBowyer

Hi all

This might be my issue but I seem to be unable to read from topics that have snappy compressed messages on them

The Kafka console methods are able to read these topics (and as a result I suspect, but I have not tested, that java clients are able to as well).

My python test code is really simple

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer

kafka = KafkaClient('compute-10-2-60-63.us-east-1.urx.internal', 6667)
consumer = SimpleConsumer(kafka, 'etl-pipeline', 'raw-clicks')
it = iter(consumer)
next(it)

This gives me

greg@localhost ~/projects/kafka-python $ ipython -i -- ./test.py 
Python 2.7.6 (default, Dec  1 2013, 21:04:11) 
Type "copyright", "credits" or "license" for more information.

IPython 1.1.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.
---------------------------------------------------------------------------
UncompressError                           Traceback (most recent call last)
/usr/lib64/python2.7/site-packages/IPython/utils/py3compat.pyc in execfile(fname, *where)
    202             else:
    203                 filename = fname
--> 204             __builtin__.execfile(filename, *where)

/home/greg/projects/kafka-python/test.py in <module>()
      5 consumer = SimpleConsumer(kafka, 'etl-pipeline', 'raw-clicks')
      6 it = iter(consumer)
----> 7 next(it)

/home/greg/projects/kafka-python/kafka/consumer.pyc in __iter__(self)
    392 
    393         while True:
--> 394             message = self.get_message(True, timeout)
    395             if message:
    396                 yield message

/home/greg/projects/kafka-python/kafka/consumer.pyc in get_message(self, block, timeout, get_partition_info)
    351 
    352     def get_message(self, block=True, timeout=0.1, get_partition_info=None):
--> 353         return self._get_message(block, timeout, get_partition_info)
    354 
    355     def _get_message(self, block=True, timeout=0.1, get_partition_info=None,

/home/greg/projects/kafka-python/kafka/consumer.pyc in _get_message(self, block, timeout, get_partition_info, update_offset)
    364             # We're out of messages, go grab some more.
    365             with FetchContext(self, block, timeout):
--> 366                 self._fetch()
    367         try:
    368             partition, message = self.queue.get_nowait()

/home/greg/projects/kafka-python/kafka/consumer.pyc in _fetch(self)
    422                 partition = resp.partition
    423                 try:
--> 424                     for message in resp.messages:
    425                         # Put the message in our queue
    426                         self.queue.put((partition, message))

/home/greg/projects/kafka-python/kafka/protocol.pyc in _decode_message_set_iter(cls, data)
    116                 ((offset, ), cur) = relative_unpack('>q', data, cur)
    117                 (msg, cur) = read_int_string(data, cur)
--> 118                 for (offset, message) in KafkaProtocol._decode_message(msg, offset):
    119                     read_message = True
    120                     yield OffsetAndMessage(offset, message)

/home/greg/projects/kafka-python/kafka/protocol.pyc in _decode_message(cls, data, offset)
    163 
    164         elif codec == KafkaProtocol.CODEC_SNAPPY:
--> 165             snp = snappy_decode(value)
    166             for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
    167                 yield (offset, msg)

/home/greg/projects/kafka-python/kafka/codec.pyc in snappy_decode(payload)
     46     if not _has_snappy:
     47         raise NotImplementedError("Snappy codec is not available")
---> 48     return snappy.decompress(payload)

UncompressError: Error while decompressing: invalid input

In [1]: %debug
> /home/greg/projects/kafka-python/kafka/codec.py(48)snappy_decode()
     46     if not _has_snappy:
     47         raise NotImplementedError("Snappy codec is not available")
---> 48     return snappy.decompress(payload)

ipdb> payload
'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x04\xdb\xb9\r\x00\x00\x15\x01....
open('/tmp/out', 'w').write(payload)

It is interesting to note that the payload contains the literal word SNAPPY as if it is being used as a magic marker, from python snappy seems to compress strings without this marker

In [1]: import snappy

In [2]: snappy.compress('aaaaaaaaaaaaaaaa')
Out[2]: '\x10<aaaaaaaaaaaaaaaa'

I also note that with the following C test program, the google snappy library itself cannot uncompress the data

#include <stdio.h>
#include <snappy-c.h>
#include <stdlib.h>

int main(int argc, char** argv) {

    char* buffer = calloc(1024*1024, sizeof(char));
    if (buffer == NULL) {
        return -1;
    }

    FILE *fp = fopen("/tmp/out", "r");
    int i = 0;

    fseek(fp, 0, SEEK_END);
    int length = 0;
    length = ftell(fp);
    fseek(fp, 0, SEEK_SET);
    char* input;
    input = malloc(length);
    if (input) {
        fread(input, 1, length, fp);
    }
    fclose(fp);

    size_t len2 = 1024*1024;

    if (snappy_uncompress(input, length, buffer, &len2) != SNAPPY_OK) {
        printf("BAD BAD BAD\n");
    } else {
        printf("%s\n", buffer);
    }

    return 0;
}

..... This leads me to reason that maybe the protocol handling for snappy might be not quite right (or just as likely I have done something moronic)

I am using:

  • kafka-python as of master
  • snappy-python as of whatever is in pypi
  • snappy 1.1.1 (not the recommended version, but what I had already on my machine)
  • kafka 0.8.0 (scala 2.10)

I will keep poking at it to see if I can figure out why its not working right.

Any thoughts ?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions