Skip to content

Commit c99d411

Browse files
committed
Moved codec stuff into it's own module
Snappy will go there when I get around to it
1 parent 36b67b7 commit c99d411

File tree

5 files changed

+29
-24
lines changed

5 files changed

+29
-24
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
*.pyc
2+
build

kafka/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__all__ = ["kafka"]
1+
__all__ = ["kafka","codec"]

kafka/codec.py

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from cStringIO import StringIO
2+
import gzip
3+
import logging
4+
5+
log = logging.getLogger("kafka.codec")
6+
7+
def gzip_encode(payload):
8+
buf = StringIO()
9+
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
10+
f.write(payload)
11+
f.close()
12+
buf.seek(0)
13+
out = buf.read()
14+
buf.close()
15+
return out
16+
17+
def gzip_decode(payload):
18+
buf = StringIO(payload)
19+
f = gzip.GzipFile(fileobj=buf, mode='r')
20+
out = f.read()
21+
f.close()
22+
buf.close()
23+
return out

kafka/kafka.py

+2-21
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
import struct
88
import zlib
99

10-
log = logging.getLogger("org.apache.kafka")
10+
from .codec import gzip_encode, gzip_decode
1111

12+
log = logging.getLogger("kafka")
1213

1314
error_codes = {
1415
-1: "UnknownError",
@@ -25,31 +26,11 @@ def __init__(self, errorType):
2526
def __str__(self):
2627
return str(errorType)
2728

28-
2929
Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"])
3030
FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"])
3131
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
3232
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"])
3333

34-
def gzip_encode(payload):
35-
buf = StringIO()
36-
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
37-
f.write(payload)
38-
f.close()
39-
buf.seek(0)
40-
out = buf.read()
41-
buf.close()
42-
return out
43-
44-
def gzip_decode(payload):
45-
buf = StringIO(payload)
46-
f = gzip.GzipFile(fileobj=buf, mode='r')
47-
out = f.read()
48-
f.close()
49-
buf.close()
50-
return out
51-
52-
5334
def length_prefix_message(msg):
5435
"""
5536
Prefix a message with it's length as an int

test/unit.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import struct
44
import unittest
55

6-
from kafka.kafka import KafkaClient, ProduceRequest, FetchRequest
7-
from kafka.kafka import gzip_encode, gzip_decode, length_prefix_message
6+
from kafka.kafka import KafkaClient, ProduceRequest, FetchRequest, length_prefix_message
7+
from kafka.codec import gzip_encode, gzip_decode
88

99
ITERATIONS = 1000
1010
STRLEN = 100

0 commit comments

Comments
 (0)