Skip to content

Commit a99384f

Browse files
committed
Merge pull request dpkp#221 from dpkp/minor_cleanups
Minor cleanups
2 parents 9856cc3 + 1b282d2 commit a99384f

15 files changed

+129
-113
lines changed

README.md

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,10 @@ Python versions
3737
## High level
3838

3939
```python
40-
from kafka.client import KafkaClient
41-
from kafka.consumer import SimpleConsumer
42-
from kafka.producer import SimpleProducer, KeyedProducer
43-
44-
kafka = KafkaClient("localhost:9092")
40+
from kafka import KafkaClient, SimpleProducer, SimpleConsumer
4541

4642
# To send messages synchronously
43+
kafka = KafkaClient("localhost:9092")
4744
producer = SimpleProducer(kafka)
4845

4946
# Note that the application is responsible for encoding messages to type str
@@ -97,9 +94,7 @@ kafka.close()
9794

9895
## Keyed messages
9996
```python
100-
from kafka.client import KafkaClient
101-
from kafka.producer import KeyedProducer
102-
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
97+
from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner
10398

10499
kafka = KafkaClient("localhost:9092")
105100

@@ -113,8 +108,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
113108

114109
## Multiprocess consumer
115110
```python
116-
from kafka.client import KafkaClient
117-
from kafka.consumer import MultiProcessConsumer
111+
from kafka import KafkaClient, MultiProcessConsumer
118112

119113
kafka = KafkaClient("localhost:9092")
120114

@@ -135,10 +129,13 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
135129
## Low level
136130

137131
```python
138-
from kafka.client import KafkaClient
132+
from kafka import KafkaClient
133+
from kafka.protocol import KafkaProtocol, ProduceRequest
134+
139135
kafka = KafkaClient("localhost:9092")
136+
140137
req = ProduceRequest(topic="my-topic", partition=1,
141-
messages=[KafkaProdocol.encode_message("some message")])
138+
messages=[KafkaProtocol.encode_message("some message")])
142139
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
143140
kafka.close()
144141

@@ -152,8 +149,17 @@ resps[0].offset # offset of the first message sent in this request
152149

153150
Install with your favorite package manager
154151

152+
## Latest Release
155153
Pip:
156154

155+
```shell
156+
pip install kafka-python
157+
```
158+
159+
Releases are also listed at https://github.com/mumrah/kafka-python/releases
160+
161+
162+
## Bleeding-Edge
157163
```shell
158164
git clone https://github.com/mumrah/kafka-python
159165
pip install ./kafka-python
@@ -211,8 +217,21 @@ pip install python-snappy
211217
tox
212218
```
213219

214-
## Run a single unit test
220+
## Run a subset of unit tests
221+
```shell
222+
# run protocol tests only
223+
tox -- -v test.test_protocol
224+
```
225+
215226
```shell
227+
# test with pypy only
228+
tox -e pypy
229+
```
230+
231+
```shell
232+
# Run only 1 test, and use python 2.7
233+
tox -e py27 -- -v --with-id --collect-only
234+
# pick a test number from the list like #102
216235
tox -e py27 -- -v --with-id 102
217236
```
218237

@@ -233,11 +252,11 @@ and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
233252
SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
234253
```
235254

236-
Then run the tests against supported Kafka versions:
255+
Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION`
256+
env variable to the server build you want to use for testing:
237257
```shell
238258
KAFKA_VERSION=0.8.0 tox
239259
KAFKA_VERSION=0.8.1 tox
240260
KAFKA_VERSION=0.8.1.1 tox
241261
KAFKA_VERSION=trunk tox
242262
```
243-

kafka/conn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import copy
22
import logging
3+
from random import shuffle
34
import socket
45
import struct
5-
from random import shuffle
66
from threading import local
77

88
from kafka.common import ConnectionError

kafka/consumer.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
from multiprocessing import Process, Queue as MPQueue, Event, Value
99
from Queue import Empty, Queue
1010

11-
import kafka
11+
import kafka.common
1212
from kafka.common import (
13-
FetchRequest,
14-
OffsetRequest, OffsetCommitRequest,
15-
OffsetFetchRequest,
16-
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
13+
FetchRequest, OffsetRequest,
14+
OffsetCommitRequest, OffsetFetchRequest,
15+
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
16+
UnknownTopicOrPartitionError
1717
)
1818

1919
from kafka.util import ReentrantTimer
@@ -114,7 +114,7 @@ def get_or_init_offset_callback(resp):
114114
try:
115115
kafka.common.check_error(resp)
116116
return resp.offset
117-
except kafka.common.UnknownTopicOrPartitionError:
117+
except UnknownTopicOrPartitionError:
118118
return 0
119119

120120
for partition in partitions:

kafka/producer.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,11 @@ def send_messages(self, topic, partition, *msg):
156156
Helper method to send produce requests
157157
@param: topic, name of topic for produce request -- type str
158158
@param: partition, partition number for produce request -- type int
159-
@param: *msg, one or more message payloads -- type str
159+
@param: *msg, one or more message payloads -- type bytes
160160
@returns: ResponseRequest returned by server
161161
raises on error
162162
163-
Note that msg type *must* be encoded to str by user.
163+
Note that msg type *must* be encoded to bytes by user.
164164
Passing unicode message will not work, for example
165165
you should encode before calling send_messages via
166166
something like `unicode_message.encode('utf-8')`
@@ -172,9 +172,9 @@ def send_messages(self, topic, partition, *msg):
172172
if not isinstance(msg, (list, tuple)):
173173
raise TypeError("msg is not a list or tuple!")
174174

175-
# Raise TypeError if any message is not encoded as a str
176-
if any(not isinstance(m, str) for m in msg):
177-
raise TypeError("all produce message payloads must be type str")
175+
# Raise TypeError if any message is not encoded as bytes
176+
if any(not isinstance(m, bytes) for m in msg):
177+
raise TypeError("all produce message payloads must be type bytes")
178178

179179
if self.async:
180180
for m in msg:

setup.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ def run(cls):
4848
"Intended Audience :: Developers",
4949
"License :: OSI Approved :: Apache Software License",
5050
"Programming Language :: Python",
51+
"Programming Language :: Python :: 2",
52+
"Programming Language :: Python :: 2.6",
53+
"Programming Language :: Python :: 2.7",
54+
"Programming Language :: Python :: Implementation :: PyPy",
5155
"Topic :: Software Development :: Libraries :: Python Modules"
5256
]
5357
)

test/test_client_integration.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,16 @@
22
import socket
33
import unittest2
44

5-
import kafka
6-
from kafka.common import *
5+
from kafka.conn import KafkaConnection
6+
from kafka.common import (
7+
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
8+
KafkaTimeoutError
9+
)
10+
711
from test.fixtures import ZookeeperFixture, KafkaFixture
8-
from test.testutil import *
12+
from test.testutil import (
13+
KafkaIntegrationTestCase, get_open_port, kafka_versions, Timer
14+
)
915

1016
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
1117
@classmethod

test/test_codec.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55
has_snappy, gzip_encode, gzip_decode,
66
snappy_encode, snappy_decode
77
)
8-
from kafka.protocol import (
9-
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
10-
)
11-
from testutil import *
8+
from testutil import random_string
129

1310
class TestCodec(unittest2.TestCase):
1411
def test_gzip(self):

test/test_conn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
import mock
55
import unittest2
66

7-
from kafka.common import *
8-
from kafka.conn import *
7+
from kafka.common import ConnectionError
8+
from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
99

1010
class ConnTest(unittest2.TestCase):
1111
def setUp(self):

test/test_consumer.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,10 @@
1-
import os
2-
import random
3-
import struct
41
import unittest2
52

6-
from mock import MagicMock, patch
3+
from mock import MagicMock
74

8-
from kafka import KafkaClient
95
from kafka.consumer import SimpleConsumer
10-
from kafka.common import (
11-
ProduceRequest, BrokerMetadata, PartitionMetadata,
12-
TopicAndPartition, KafkaUnavailableError,
13-
LeaderUnavailableError, PartitionUnavailableError
14-
)
15-
from kafka.protocol import (
16-
create_message, KafkaProtocol
17-
)
186

197
class TestKafkaConsumer(unittest2.TestCase):
208
def test_non_integer_partitions(self):
219
with self.assertRaises(AssertionError):
22-
consumer = SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
10+
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])

test/test_consumer_integration.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import os
2-
from datetime import datetime
32

4-
from kafka import * # noqa
5-
from kafka.common import * # noqa
3+
from kafka import SimpleConsumer, MultiProcessConsumer, create_message
4+
from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
65
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
7-
from fixtures import ZookeeperFixture, KafkaFixture
8-
from testutil import *
6+
7+
from test.fixtures import ZookeeperFixture, KafkaFixture
8+
from test.testutil import (
9+
KafkaIntegrationTestCase, kafka_versions, random_string, Timer
10+
)
911

1012
class TestConsumerIntegration(KafkaIntegrationTestCase):
1113
@classmethod
@@ -215,8 +217,8 @@ def test_huge_messages(self):
215217

216218
@kafka_versions("0.8.1", "0.8.1.1")
217219
def test_offset_behavior__resuming_behavior(self):
218-
msgs1 = self.send_messages(0, range(0, 100))
219-
msgs2 = self.send_messages(1, range(100, 200))
220+
self.send_messages(0, range(0, 100))
221+
self.send_messages(1, range(100, 200))
220222

221223
# Start a consumer
222224
consumer1 = self.consumer(

0 commit comments

Comments
 (0)