Skip to content

Commit bb1c11e

Browse files
committed
Merge pull request dpkp#341 from dpkp/kafka_consumer_docs
KafkaConsumer documentation
2 parents fd204dc + 35b8f5b commit bb1c11e

File tree

4 files changed

+185
-149
lines changed

4 files changed

+185
-149
lines changed

docs/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
sphinx
22
sphinxcontrib-napoleon
3+
sphinx_rtd_theme
34

45
# Install kafka-python in editable mode
56
# This allows the sphinx autodoc module

docs/usage.rst

Lines changed: 89 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
Usage
22
=====
33

4-
High level
5-
----------
4+
SimpleProducer
5+
--------------
66

77
.. code:: python
88
9-
from kafka import SimpleProducer, KafkaClient, KafkaConsumer
9+
from kafka import SimpleProducer, KafkaClient
1010
1111
# To send messages synchronously
1212
kafka = KafkaClient("localhost:9092")
@@ -51,17 +51,6 @@ High level
5151
batch_send_every_n=20,
5252
batch_send_every_t=60)
5353
54-
# To consume messages
55-
consumer = KafkaConsumer("my-topic", group_id="my_group",
56-
metadata_broker_list=["localhost:9092"])
57-
for message in consumer:
58-
# message is raw byte string -- decode if necessary!
59-
# e.g., for unicode: `message.decode('utf-8')`
60-
print(message)
61-
62-
kafka.close()
63-
64-
6554
Keyed messages
6655
--------------
6756

@@ -80,6 +69,92 @@ Keyed messages
8069
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
8170
8271
72+
73+
KafkaConsumer
74+
-------------
75+
76+
.. code:: python
77+
78+
from kafka import KafkaConsumer
79+
80+
# To consume messages
81+
consumer = KafkaConsumer("my-topic",
82+
group_id="my_group",
83+
bootstrap_servers=["localhost:9092"])
84+
for message in consumer:
85+
# message value is raw byte string -- decode if necessary!
86+
# e.g., for unicode: `message.value.decode('utf-8')`
87+
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
88+
message.offset, message.key,
89+
message.value))
90+
91+
kafka.close()
92+
93+
94+
messages (m) are namedtuples with attributes:
95+
96+
* `m.topic`: topic name (str)
97+
* `m.partition`: partition number (int)
98+
* `m.offset`: message offset on topic-partition log (int)
99+
* `m.key`: key (bytes - can be None)
100+
* `m.value`: message (output of deserializer_class - default is raw bytes)
101+
102+
103+
.. code:: python
104+
105+
from kafka import KafkaConsumer
106+
107+
# more advanced consumer -- multiple topics w/ auto commit offset
108+
# management
109+
consumer = KafkaConsumer('topic1', 'topic2',
110+
bootstrap_servers=['localhost:9092'],
111+
group_id='my_consumer_group',
112+
auto_commit_enable=True,
113+
auto_commit_interval_ms=30 * 1000,
114+
auto_offset_reset='smallest')
115+
116+
# Infinite iteration
117+
for m in consumer:
118+
do_some_work(m)
119+
120+
# Mark this message as fully consumed
121+
# so it can be included in the next commit
122+
#
123+
# **messages that are not marked w/ task_done currently do not commit!
124+
kafka.task_done(m)
125+
126+
# If auto_commit_enable is False, remember to commit() periodically
127+
kafka.commit()
128+
129+
# Batch process interface
130+
while True:
131+
for m in kafka.fetch_messages():
132+
process_message(m)
133+
kafka.task_done(m)
134+
135+
136+
Configuration settings can be passed to constructor,
137+
otherwise defaults will be used:
138+
139+
.. code:: python
140+
141+
client_id='kafka.consumer.kafka',
142+
group_id=None,
143+
fetch_message_max_bytes=1024*1024,
144+
fetch_min_bytes=1,
145+
fetch_wait_max_ms=100,
146+
refresh_leader_backoff_ms=200,
147+
bootstrap_servers=[],
148+
socket_timeout_ms=30*1000,
149+
auto_offset_reset='largest',
150+
deserializer_class=lambda msg: msg,
151+
auto_commit_enable=False,
152+
auto_commit_interval_ms=60 * 1000,
153+
consumer_timeout_ms=-1
154+
155+
Configuration parameters are described in more detail at
156+
http://kafka.apache.org/documentation.html#highlevelconsumerapi
157+
83158
Multiprocess consumer
84159
---------------------
85160

0 commit comments

Comments
 (0)