1
1
Usage
2
2
=====
3
3
4
- High level
5
- ----------
4
+ SimpleProducer
5
+ --------------
6
6
7
7
.. code :: python
8
8
9
- from kafka import SimpleProducer, KafkaClient, KafkaConsumer
9
+ from kafka import SimpleProducer, KafkaClient
10
10
11
11
# To send messages synchronously
12
12
kafka = KafkaClient(" localhost:9092" )
@@ -51,17 +51,6 @@ High level
51
51
batch_send_every_n = 20 ,
52
52
batch_send_every_t = 60 )
53
53
54
- # To consume messages
55
- consumer = KafkaConsumer(" my-topic" , group_id = " my_group" ,
56
- metadata_broker_list = [" localhost:9092" ])
57
- for message in consumer:
58
- # message is raw byte string -- decode if necessary!
59
- # e.g., for unicode: `message.decode('utf-8')`
60
- print (message)
61
-
62
- kafka.close()
63
-
64
-
65
54
Keyed messages
66
55
--------------
67
56
@@ -80,6 +69,92 @@ Keyed messages
80
69
producer = KeyedProducer(kafka, partitioner = RoundRobinPartitioner)
81
70
82
71
72
+
73
+ KafkaConsumer
74
+ -------------
75
+
76
+ .. code :: python
77
+
78
+ from kafka import KafkaConsumer
79
+
80
+ # To consume messages
81
+ consumer = KafkaConsumer(" my-topic" ,
82
+ group_id = " my_group" ,
83
+ bootstrap_servers = [" localhost:9092" ])
84
+ for message in consumer:
85
+ # message value is raw byte string -- decode if necessary!
86
+ # e.g., for unicode: `message.value.decode('utf-8')`
87
+ print (" %s :%d :%d : key=%s value=%s " % (message.topic, message.partition,
88
+ message.offset, message.key,
89
+ message.value))
90
+
91
+ kafka.close()
92
+
93
+
94
+ messages (m) are namedtuples with attributes:
95
+
96
+ * `m.topic `: topic name (str)
97
+ * `m.partition `: partition number (int)
98
+ * `m.offset `: message offset on topic-partition log (int)
99
+ * `m.key `: key (bytes - can be None)
100
+ * `m.value `: message (output of deserializer_class - default is raw bytes)
101
+
102
+
103
+ .. code :: python
104
+
105
+ from kafka import KafkaConsumer
106
+
107
+ # more advanced consumer -- multiple topics w/ auto commit offset
108
+ # management
109
+ consumer = KafkaConsumer(' topic1' , ' topic2' ,
110
+ bootstrap_servers = [' localhost:9092' ],
111
+ group_id = ' my_consumer_group' ,
112
+ auto_commit_enable = True ,
113
+ auto_commit_interval_ms = 30 * 1000 ,
114
+ auto_offset_reset = ' smallest' )
115
+
116
+ # Infinite iteration
117
+ for m in consumer:
118
+ do_some_work(m)
119
+
120
+ # Mark this message as fully consumed
121
+ # so it can be included in the next commit
122
+ #
123
+ # **messages that are not marked w/ task_done currently do not commit!
124
+ kafka.task_done(m)
125
+
126
+ # If auto_commit_enable is False, remember to commit() periodically
127
+ kafka.commit()
128
+
129
+ # Batch process interface
130
+ while True :
131
+ for m in kafka.fetch_messages():
132
+ process_message(m)
133
+ kafka.task_done(m)
134
+
135
+
136
+ Configuration settings can be passed to constructor,
137
+ otherwise defaults will be used:
138
+
139
+ .. code :: python
140
+
141
+ client_id= ' kafka.consumer.kafka' ,
142
+ group_id= None ,
143
+ fetch_message_max_bytes= 1024 * 1024 ,
144
+ fetch_min_bytes= 1 ,
145
+ fetch_wait_max_ms= 100 ,
146
+ refresh_leader_backoff_ms= 200 ,
147
+ bootstrap_servers= [],
148
+ socket_timeout_ms= 30 * 1000 ,
149
+ auto_offset_reset= ' largest' ,
150
+ deserializer_class= lambda msg : msg,
151
+ auto_commit_enable= False ,
152
+ auto_commit_interval_ms= 60 * 1000 ,
153
+ consumer_timeout_ms= - 1
154
+
155
+ Configuration parameters are described in more detail at
156
+ http:// kafka.apache.org/ documentation.html# highlevelconsumerapi
157
+
83
158
Multiprocess consumer
84
159
---------------------
85
160
0 commit comments