Skip to content

Commit 7f554f2

Browse files
author
jerluc
committed
Adding codec abstraction for compatibility
The `Codec` class has been added to support Snappy-Xerial compatibility in the Producer. Without this, Java-based consumers will be unable to decode messages produced by non-Xerial Snappy producers.
1 parent ae6b49a commit 7f554f2

File tree

6 files changed

+73
-78
lines changed

6 files changed

+73
-78
lines changed

kafka/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from kafka.client import KafkaClient
88
from kafka.conn import KafkaConnection
99
from kafka.protocol import (
10-
create_message, create_gzip_message, create_snappy_message
10+
create_message, create_encoded_message
1111
)
1212
from kafka.producer import SimpleProducer, KeyedProducer
1313
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
@@ -16,6 +16,5 @@
1616
__all__ = [
1717
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
1818
'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
19-
'MultiProcessConsumer', 'create_message', 'create_gzip_message',
20-
'create_snappy_message'
19+
'MultiProcessConsumer', 'create_message', 'create_encoded_message'
2120
]

kafka/codec.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from cStringIO import StringIO
22
import gzip
33
import struct
4+
from functools import partial
45

56
_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
67
_XERIAL_V1_FORMAT = 'bccccccBii'
@@ -138,3 +139,23 @@ def snappy_decode(payload):
138139
return out.read()
139140
else:
140141
return snappy.decompress(payload)
142+
143+
144+
class Codec(object):
145+
def __init__(self, mask, encoder=lambda m: m, decoder=lambda m: m):
146+
self.mask = mask
147+
self.encoder = encoder
148+
self.decoder = decoder
149+
150+
def encode(self, payload):
151+
return self.encoder(payload)
152+
153+
def decode(self, payload):
154+
return self.decoder(payload)
155+
156+
CODEC_NONE = Codec(0x00)
157+
CODEC_GZIP = Codec(0x01, gzip_encode, gzip_decode)
158+
CODEC_SNAPPY = Codec(0x02, snappy_encode, snappy_decode)
159+
CODEC_SNAPPY_XERIAL = Codec(0x02, partial(snappy_encode, xerial_compatible=True), snappy_decode)
160+
161+
ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, CODEC_SNAPPY_XERIAL)

kafka/protocol.py

Lines changed: 20 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import logging
22
import struct
33
import zlib
4+
from functools import partial
45

56
from kafka.codec import (
6-
gzip_encode, gzip_decode, snappy_encode, snappy_decode
7+
ALL_CODECS, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, CODEC_SNAPPY_XERIAL
78
)
89
from kafka.common import (
910
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
@@ -20,11 +21,6 @@
2021
log = logging.getLogger("kafka")
2122

2223
ATTRIBUTE_CODEC_MASK = 0x03
23-
CODEC_NONE = 0x00
24-
CODEC_GZIP = 0x01
25-
CODEC_SNAPPY = 0x02
26-
ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
27-
2824

2925
class KafkaProtocol(object):
3026
"""
@@ -154,18 +150,17 @@ def _decode_message(cls, data, offset):
154150

155151
codec = att & ATTRIBUTE_CODEC_MASK
156152

157-
if codec == CODEC_NONE:
153+
if codec == CODEC_NONE.mask:
158154
yield (offset, Message(magic, att, key, value))
159-
160-
elif codec == CODEC_GZIP:
161-
gz = gzip_decode(value)
162-
for (offset, msg) in KafkaProtocol._decode_message_set_iter(gz):
163-
yield (offset, msg)
164-
165-
elif codec == CODEC_SNAPPY:
166-
snp = snappy_decode(value)
167-
for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
168-
yield (offset, msg)
155+
else:
156+
decoders = filter(lambda c: c.mask == codec, ALL_CODECS)
157+
if decoders:
158+
message_set = decoders[0].decode(value)
159+
for (offset, msg) in KafkaProtocol._decode_message_set_iter(message_set):
160+
yield (offset, msg)
161+
else:
162+
raise UnsupportedCodecError('FILL ME IN')
163+
169164

170165
##################
171166
# Public API #
@@ -528,47 +523,13 @@ def create_message(payload, key=None):
528523
"""
529524
return Message(0, 0, key, payload)
530525

531-
532-
def create_gzip_message(payloads, key=None):
533-
"""
534-
Construct a Gzipped Message containing multiple Messages
535-
536-
The given payloads will be encoded, compressed, and sent as a single atomic
537-
message to Kafka.
538-
539-
Params
540-
======
541-
payloads: list(bytes), a list of payload to send be sent to Kafka
542-
key: bytes, a key used for partition routing (optional)
543-
"""
544-
message_set = KafkaProtocol._encode_message_set(
545-
[create_message(payload) for payload in payloads])
546-
547-
gzipped = gzip_encode(message_set)
548-
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
549-
550-
return Message(0, 0x00 | codec, key, gzipped)
551-
552-
553-
def create_snappy_message(payloads, key=None):
554-
"""
555-
Construct a Snappy Message containing multiple Messages
556-
557-
The given payloads will be encoded, compressed, and sent as a single atomic
558-
message to Kafka.
559-
560-
Params
561-
======
562-
payloads: list(bytes), a list of payload to send be sent to Kafka
563-
key: bytes, a key used for partition routing (optional)
564-
"""
526+
def create_encoded_message(messages, codec=CODEC_NONE):
565527
message_set = KafkaProtocol._encode_message_set(
566-
[create_message(payload) for payload in payloads])
567-
568-
snapped = snappy_encode(message_set)
569-
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
570-
571-
return Message(0, 0x00 | codec, key, snapped)
528+
[create_message(m) for m in messages])
529+
encoded = codec.encode(message_set)
530+
codec_flag = ATTRIBUTE_CODEC_MASK & codec.mask
531+
532+
return Message(0, 0x00 | codec_flag, None, encoded)
572533

573534

574535
def create_message_set(messages, codec=CODEC_NONE):
@@ -579,9 +540,7 @@ def create_message_set(messages, codec=CODEC_NONE):
579540
"""
580541
if codec == CODEC_NONE:
581542
return [create_message(m) for m in messages]
582-
elif codec == CODEC_GZIP:
583-
return [create_gzip_message(messages)]
584-
elif codec == CODEC_SNAPPY:
585-
return [create_snappy_message(messages)]
543+
elif codec in ALL_CODECS:
544+
return [create_encoded_message(messages, codec)]
586545
else:
587546
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)

test/test_codec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
snappy_encode, snappy_decode
77
)
88
from kafka.protocol import (
9-
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
9+
create_message, create_encoded_message, KafkaProtocol
1010
)
1111
from testutil import *
1212

test/test_consumer_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from datetime import datetime
33

44
from kafka import * # noqa
5+
from kafka.protocol import CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY
56
from kafka.common import * # noqa
67
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
78
from fixtures import ZookeeperFixture, KafkaFixture

test/test_protocol.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323
import kafka.protocol
2424
from kafka.protocol import (
2525
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
26-
create_message, create_gzip_message, create_snappy_message,
27-
create_message_set
26+
create_message, create_encoded_message, create_message_set
2827
)
2928

3029
class TestProtocol(unittest2.TestCase):
@@ -39,9 +38,9 @@ def test_create_message(self):
3938

4039
def test_create_gzip(self):
4140
payloads = ["v1", "v2"]
42-
msg = create_gzip_message(payloads)
41+
msg = create_encoded_message(payloads, CODEC_GZIP)
4342
self.assertEqual(msg.magic, 0)
44-
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
43+
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP.mask)
4544
self.assertEqual(msg.key, None)
4645
# Need to decode to check since gzipped payload is non-deterministic
4746
decoded = gzip_decode(msg.value)
@@ -68,9 +67,9 @@ def test_create_gzip(self):
6867
@unittest2.skipUnless(has_snappy(), "Snappy not available")
6968
def test_create_snappy(self):
7069
payloads = ["v1", "v2"]
71-
msg = create_snappy_message(payloads)
70+
msg = create_encoded_message(payloads, CODEC_SNAPPY)
7271
self.assertEqual(msg.magic, 0)
73-
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
72+
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY.mask)
7473
self.assertEqual(msg.key, None)
7574
decoded = snappy_decode(msg.value)
7675
expect = "".join([
@@ -703,11 +702,27 @@ def test_decode_offset_fetch_response(self):
703702
def mock_create_message_fns(self):
704703
patches = contextlib.nested(
705704
mock.patch.object(kafka.protocol, "create_message",
706-
return_value=sentinel.message),
707-
mock.patch.object(kafka.protocol, "create_gzip_message",
708-
return_value=sentinel.gzip_message),
709-
mock.patch.object(kafka.protocol, "create_snappy_message",
710-
return_value=sentinel.snappy_message),
705+
return_value=sentinel.message)
706+
)
707+
708+
with patches:
709+
yield
710+
711+
@contextmanager
712+
def mock_create_gzip_message_fns(self):
713+
patches = contextlib.nested(
714+
mock.patch('kafka.protocol.create_encoded_message',
715+
return_value=sentinel.gzip_message)
716+
)
717+
718+
with patches:
719+
yield
720+
721+
@contextmanager
722+
def mock_create_snappy_message_fns(self):
723+
patches = contextlib.nested(
724+
mock.patch('kafka.protocol.create_encoded_message',
725+
return_value=sentinel.snappy_message)
711726
)
712727

713728
with patches:
@@ -730,13 +745,13 @@ def test_create_message_set(self):
730745

731746
# CODEC_GZIP: Expect list of one gzip-encoded message.
732747
expect = [sentinel.gzip_message]
733-
with self.mock_create_message_fns():
748+
with self.mock_create_gzip_message_fns():
734749
message_set = create_message_set(messages, CODEC_GZIP)
735750
self.assertEqual(message_set, expect)
736751

737752
# CODEC_SNAPPY: Expect list of one snappy-encoded message.
738753
expect = [sentinel.snappy_message]
739-
with self.mock_create_message_fns():
754+
with self.mock_create_snappy_message_fns():
740755
message_set = create_message_set(messages, CODEC_SNAPPY)
741756
self.assertEqual(message_set, expect)
742757

0 commit comments

Comments
 (0)