|
2 | 2 |
|
3 | 3 | [](https://travis-ci.org/mumrah/kafka-python)
|
4 | 4 |
|
| 5 | +[Full documentation available on ReadTheDocs](http://kafka-python.readthedocs.org/en/latest/) |
| 6 | + |
5 | 7 | This module provides low-level protocol support for Apache Kafka as well as
|
6 | 8 | high-level consumer and producer classes. Request batching is supported by the
|
7 | 9 | protocol as well as broker-aware request routing. Gzip and Snappy compression
|
@@ -32,233 +34,3 @@ Python versions
|
32 | 34 | - 2.7 (tested on 2.7.8)
|
33 | 35 | - pypy (tested on pypy 2.3.1 / python 2.7.6)
|
34 | 36 | - (Python 3.3 and 3.4 support has been added to trunk and will be available the next release)
|
35 |
| - |
36 |
| -# Usage |
37 |
| - |
38 |
| -## High level |
39 |
| - |
40 |
| -```python |
41 |
| -from kafka import KafkaClient, SimpleProducer, SimpleConsumer |
42 |
| - |
43 |
| -# To send messages synchronously |
44 |
| -kafka = KafkaClient("localhost:9092") |
45 |
| -producer = SimpleProducer(kafka) |
46 |
| - |
47 |
| -# Note that the application is responsible for encoding messages to type str |
48 |
| -producer.send_messages("my-topic", "some message") |
49 |
| -producer.send_messages("my-topic", "this method", "is variadic") |
50 |
| - |
51 |
| -# Send unicode message |
52 |
| -producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) |
53 |
| - |
54 |
| -# To send messages asynchronously |
55 |
| -# WARNING: current implementation does not guarantee message delivery on failure! |
56 |
| -# messages can get dropped! Use at your own risk! Or help us improve with a PR! |
57 |
| -producer = SimpleProducer(kafka, async=True) |
58 |
| -producer.send_messages("my-topic", "async message") |
59 |
| - |
60 |
| -# To wait for acknowledgements |
61 |
| -# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to |
62 |
| -# a local log before sending response |
63 |
| -# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed |
64 |
| -# by all in sync replicas before sending a response |
65 |
| -producer = SimpleProducer(kafka, async=False, |
66 |
| - req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, |
67 |
| - ack_timeout=2000) |
68 |
| - |
69 |
| -response = producer.send_messages("my-topic", "another message") |
70 |
| - |
71 |
| -if response: |
72 |
| - print(response[0].error) |
73 |
| - print(response[0].offset) |
74 |
| - |
75 |
| -# To send messages in batch. You can use any of the available |
76 |
| -# producers for doing this. The following producer will collect |
77 |
| -# messages in batch and send them to Kafka after 20 messages are |
78 |
| -# collected or every 60 seconds |
79 |
| -# Notes: |
80 |
| -# * If the producer dies before the messages are sent, there will be losses |
81 |
| -# * Call producer.stop() to send the messages and cleanup |
82 |
| -producer = SimpleProducer(kafka, batch_send=True, |
83 |
| - batch_send_every_n=20, |
84 |
| - batch_send_every_t=60) |
85 |
| - |
86 |
| -# To consume messages |
87 |
| -consumer = SimpleConsumer(kafka, "my-group", "my-topic") |
88 |
| -for message in consumer: |
89 |
| - # message is raw byte string -- decode if necessary! |
90 |
| - # e.g., for unicode: `message.decode('utf-8')` |
91 |
| - print(message) |
92 |
| - |
93 |
| -kafka.close() |
94 |
| -``` |
95 |
| - |
96 |
| -## Keyed messages |
97 |
| -```python |
98 |
| -from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner |
99 |
| - |
100 |
| -kafka = KafkaClient("localhost:9092") |
101 |
| - |
102 |
| -# HashedPartitioner is default |
103 |
| -producer = KeyedProducer(kafka) |
104 |
| -producer.send("my-topic", "key1", "some message") |
105 |
| -producer.send("my-topic", "key2", "this methode") |
106 |
| - |
107 |
| -producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) |
108 |
| -``` |
109 |
| - |
110 |
| -## Multiprocess consumer |
111 |
| -```python |
112 |
| -from kafka import KafkaClient, MultiProcessConsumer |
113 |
| - |
114 |
| -kafka = KafkaClient("localhost:9092") |
115 |
| - |
116 |
| -# This will split the number of partitions among two processes |
117 |
| -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) |
118 |
| - |
119 |
| -# This will spawn processes such that each handles 2 partitions max |
120 |
| -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", |
121 |
| - partitions_per_proc=2) |
122 |
| - |
123 |
| -for message in consumer: |
124 |
| - print(message) |
125 |
| - |
126 |
| -for message in consumer.get_messages(count=5, block=True, timeout=4): |
127 |
| - print(message) |
128 |
| -``` |
129 |
| - |
130 |
| -## Low level |
131 |
| - |
132 |
| -```python |
133 |
| -from kafka import KafkaClient, create_message |
134 |
| -from kafka.protocol import KafkaProtocol |
135 |
| -from kafka.common import ProduceRequest |
136 |
| - |
137 |
| -kafka = KafkaClient("localhost:9092") |
138 |
| - |
139 |
| -req = ProduceRequest(topic="my-topic", partition=1, |
140 |
| - messages=[create_message("some message")]) |
141 |
| -resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) |
142 |
| -kafka.close() |
143 |
| - |
144 |
| -resps[0].topic # "my-topic" |
145 |
| -resps[0].partition # 1 |
146 |
| -resps[0].error # 0 (hopefully) |
147 |
| -resps[0].offset # offset of the first message sent in this request |
148 |
| -``` |
149 |
| - |
150 |
| -# Install |
151 |
| - |
152 |
| -Install with your favorite package manager |
153 |
| - |
154 |
| -## Latest Release |
155 |
| -Pip: |
156 |
| - |
157 |
| -```shell |
158 |
| -pip install kafka-python |
159 |
| -``` |
160 |
| - |
161 |
| -Releases are also listed at https://github.com/mumrah/kafka-python/releases |
162 |
| - |
163 |
| - |
164 |
| -## Bleeding-Edge |
165 |
| -```shell |
166 |
| -git clone https://github.com/mumrah/kafka-python |
167 |
| -pip install ./kafka-python |
168 |
| -``` |
169 |
| - |
170 |
| -Setuptools: |
171 |
| -```shell |
172 |
| -git clone https://github.com/mumrah/kafka-python |
173 |
| -easy_install ./kafka-python |
174 |
| -``` |
175 |
| - |
176 |
| -Using `setup.py` directly: |
177 |
| -```shell |
178 |
| -git clone https://github.com/mumrah/kafka-python |
179 |
| -cd kafka-python |
180 |
| -python setup.py install |
181 |
| -``` |
182 |
| - |
183 |
| -## Optional Snappy install |
184 |
| - |
185 |
| -### Install Development Libraries |
186 |
| -Download and build Snappy from http://code.google.com/p/snappy/downloads/list |
187 |
| - |
188 |
| -Ubuntu: |
189 |
| -```shell |
190 |
| -apt-get install libsnappy-dev |
191 |
| -``` |
192 |
| - |
193 |
| -OSX: |
194 |
| -```shell |
195 |
| -brew install snappy |
196 |
| -``` |
197 |
| - |
198 |
| -From Source: |
199 |
| -```shell |
200 |
| -wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz |
201 |
| -tar xzvf snappy-1.0.5.tar.gz |
202 |
| -cd snappy-1.0.5 |
203 |
| -./configure |
204 |
| -make |
205 |
| -sudo make install |
206 |
| -``` |
207 |
| - |
208 |
| -### Install Python Module |
209 |
| -Install the `python-snappy` module |
210 |
| -```shell |
211 |
| -pip install python-snappy |
212 |
| -``` |
213 |
| - |
214 |
| -# Tests |
215 |
| - |
216 |
| -## Run the unit tests |
217 |
| - |
218 |
| -```shell |
219 |
| -tox |
220 |
| -``` |
221 |
| - |
222 |
| -## Run a subset of unit tests |
223 |
| -```shell |
224 |
| -# run protocol tests only |
225 |
| -tox -- -v test.test_protocol |
226 |
| -``` |
227 |
| - |
228 |
| -```shell |
229 |
| -# test with pypy only |
230 |
| -tox -e pypy |
231 |
| -``` |
232 |
| - |
233 |
| -```shell |
234 |
| -# Run only 1 test, and use python 2.7 |
235 |
| -tox -e py27 -- -v --with-id --collect-only |
236 |
| -# pick a test number from the list like #102 |
237 |
| -tox -e py27 -- -v --with-id 102 |
238 |
| -``` |
239 |
| - |
240 |
| -## Run the integration tests |
241 |
| - |
242 |
| -The integration tests will actually start up real local Zookeeper |
243 |
| -instance and Kafka brokers, and send messages in using the client. |
244 |
| - |
245 |
| -First, get the kafka binaries for integration testing: |
246 |
| -```shell |
247 |
| -./build_integration.sh |
248 |
| -``` |
249 |
| -By default, the build_integration.sh script will download binary |
250 |
| -distributions for all supported kafka versions. |
251 |
| -To test against the latest source build, set KAFKA_VERSION=trunk |
252 |
| -and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) |
253 |
| -```shell |
254 |
| -SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh |
255 |
| -``` |
256 |
| - |
257 |
| -Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` |
258 |
| -env variable to the server build you want to use for testing: |
259 |
| -```shell |
260 |
| -KAFKA_VERSION=0.8.0 tox |
261 |
| -KAFKA_VERSION=0.8.1 tox |
262 |
| -KAFKA_VERSION=0.8.1.1 tox |
263 |
| -KAFKA_VERSION=trunk tox |
264 |
| -``` |
0 commit comments