This module provides low-level protocol support Apache Kafka. It implements the five basic request types (and their responses): Produce, Fetch, MultiFetch, MultiProduce, and Offsets.
Compatible with Apache Kafka 0.7x. Tested against 0.7.0, 0.7.1, and 0.7.2
http://incubator.apache.org/kafka/
Copyright 2012, David Arthur under Apache License, v2.0. See LICENSE
This project is very much alpha. The API is in flux and not all the features are fully implemented.
Install with your favorite package manager
Pip:
git clone https://github.com/mumrah/kafka-python
pip install ./kafka-python
Setuptools:
git clone https://github.com/mumrah/kafka-python
easy_install ./kafka-python
Using setup.py
directly:
git clone https://github.com/mumrah/kafka-python
cd kafka-python
python setup.py install
Download and build Snappy from http://code.google.com/p/snappy/downloads/list
wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
tar xzvf snappy-1.0.5.tar.gz
cd snappy-1.0.5
./configure
make
sudo make install
Install the python-snappy
module
pip install python-snappy
Some of the tests will fail if Snappy is not installed. These tests will throw NotImplementedError. If you see other failures, they might be bugs - so please report them!
python -m test.unit
First, checkout the Kafka source
git submodule init
git submodule update
cd kafka-src
./sbt update
./sbt package
Then from the root directory, run the integration tests
python -m test.integration
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
kafka.send_messages_simple("my-topic", "some message")
kafka.close()
Same as before, just add more arguments to send_simple
kafka = KafkaClient("localhost", 9092)
kafka.send_messages_simple("my-topic", "some message", "another message", "and another")
kafka.close()
Supply get_message_set
with a FetchRequest
, get back the messages and new FetchRequest
kafka = KafkaClient("localhost", 9092)
req = FetchRequest("my-topic", 0, 0, 1024*1024)
(messages, req1) = kafka.get_message_set(req)
kafka.close()
The returned FetchRequest
includes the offset of the next message. This makes
paging through the queue very simple.
For this we use the send_multi_message_set
method along with ProduceRequest
objects.
kafka = KafkaClient("localhost", 9092)
req1 = ProduceRequest("my-topic-1", 0, [
create_message_from_string("message one"),
create_message_from_string("message two")
])
req2 = ProduceRequest("my-topic-2", 0, [
create_message_from_string("nachricht ein"),
create_message_from_string("nachricht zwei")
])
kafka.sent_multi_message_set([req1, req1])
kafka.close()
The iter_messages
method will make the underlying calls to get_message_set
to provide a generator that returns every message available.
kafka = KafkaClient("localhost", 9092)
for msg in kafka.iter_messages(FetchRequest("my-topic", 0, 0, 1024*1024)):
print(msg.payload)
kafka.close()
An optional auto
argument will control auto-paging through results
kafka = KafkaClient("localhost", 9092)
for msg in kafka.iter_messages(FetchRequest("my-topic", 0, 0, 1024*1024), False):
print(msg.payload)
kafka.close()
This will only iterate through messages in the byte range of (0, 1024*1024)
kafka = KafkaClient("localhost", 9092)
messages = [kafka.create_snappy_message("testing 1"),
kafka.create_snappy_message("testing 2")]
req = ProduceRequest(topic, 1, messages)
kafka.send_message_set(req)
kafka.close()