@@ -4,17 +4,17 @@ This module provides low-level protocol support Apache Kafka. It implements the
4
4
(and their responses): Produce, Fetch, MultiFetch, MultiProduce, and Offsets. Gzip and Snappy compression
5
5
is also supported.
6
6
7
- Compatible with Apache Kafka 0.7x. Tested against 0.7.0, 0.7.1, and 0.7.2
7
+ Compatible with Apache Kafka 0.7x. Tested against 0.8
8
8
9
9
http://incubator.apache.org/kafka/
10
10
11
11
# License
12
12
13
- Copyright 2012 , David Arthur under Apache License, v2.0. See ` LICENSE `
13
+ Copyright 2013 , David Arthur under Apache License, v2.0. See ` LICENSE `
14
14
15
15
# Status
16
16
17
- Current version is 0.1 -alpha. The current API should be pretty stable.
17
+ Current version is 0.2 -alpha. This version is under development, APIs are subject to change
18
18
19
19
# Install
20
20
@@ -89,109 +89,34 @@ python -m test.integration
89
89
90
90
# Usage
91
91
92
- ## Send a message to a topic
92
+ ## High level
93
93
94
94
``` python
95
95
from kafka.client import KafkaClient
96
- kafka = KafkaClient(" localhost" , 9092 )
97
- kafka.send_messages_simple(" my-topic" , " some message" )
98
- kafka.close()
99
- ```
100
-
101
- ## Send several messages to a topic
102
-
103
- Same as before, just add more arguments to ` send_simple `
104
-
105
- ``` python
106
- kafka = KafkaClient(" localhost" , 9092 )
107
- kafka.send_messages_simple(" my-topic" , " some message" , " another message" , " and another" )
108
- kafka.close()
109
- ```
110
-
111
- ## Recieve some messages from a topic
112
-
113
- Supply ` get_message_set ` with a ` FetchRequest ` , get back the messages and new ` FetchRequest `
114
-
115
- ``` python
116
- kafka = KafkaClient(" localhost" , 9092 )
117
- req = FetchRequest(" my-topic" , 0 , 0 , 1024 * 1024 )
118
- (messages, req1) = kafka.get_message_set(req)
119
- kafka.close()
120
- ```
121
-
122
- The returned ` FetchRequest ` includes the offset of the next message. This makes
123
- paging through the queue very simple.
124
-
125
- ## Send multiple messages to multiple topics
126
-
127
- For this we use the ` send_multi_message_set ` method along with ` ProduceRequest ` objects.
128
-
129
- ``` python
130
- kafka = KafkaClient(" localhost" , 9092 )
131
- req1 = ProduceRequest(" my-topic-1" , 0 , [
132
- create_message_from_string(" message one" ),
133
- create_message_from_string(" message two" )
134
- ])
135
- req2 = ProduceRequest(" my-topic-2" , 0 , [
136
- create_message_from_string(" nachricht ein" ),
137
- create_message_from_string(" nachricht zwei" )
138
- ])
139
- kafka.send_multi_message_set([req1, req1])
140
- kafka.close()
141
- ```
142
-
143
- ## Iterate through all messages from an offset
144
-
145
- The ` iter_messages ` method will make the underlying calls to ` get_message_set `
146
- to provide a generator that returns every message available.
147
-
148
- ``` python
149
- kafka = KafkaClient(" localhost" , 9092 )
150
- for msg in kafka.iter_messages(" my-topic" , 0 , 0 , 1024 * 1024 ):
151
- print (msg.payload)
152
- kafka.close()
153
- ```
154
-
155
- An optional ` auto ` argument will control auto-paging through results
156
96
157
- ``` python
158
- kafka = KafkaClient(" localhost" , 9092 )
159
- for msg in kafka.iter_messages(" my-topic" , 0 , 0 , 1024 * 1024 , False ):
160
- print (msg.payload)
161
- kafka.close()
162
- ```
163
- This will only iterate through messages in the byte range of (0, 1024\* 1024)
97
+ producer = SimpleProducer(kafka, " my-topic" )
98
+ producer.send_messages(" some message" )
99
+ producer.send_messages(" this method" , " is variadic" )
164
100
165
- ## Create some compressed messages
101
+ consumer = SimpleConsumer(kafka, " my-group" , " my-topic" )
102
+ for message in consumer:
103
+ print (message)
166
104
167
- ``` python
168
- kafka = KafkaClient(" localhost" , 9092 )
169
- messages = [kafka.create_snappy_message(" testing 1" ),
170
- kafka.create_snappy_message(" testing 2" )]
171
- req = ProduceRequest(topic, 1 , messages)
172
- kafka.send_message_set(req)
173
105
kafka.close()
174
106
```
175
107
176
- ## Use Kafka like a FIFO queue
177
-
178
- Simple API: ` get ` , ` put ` , ` close ` .
108
+ ## Low level
179
109
180
110
``` python
111
+ from kafka.client import KafkaClient
181
112
kafka = KafkaClient(" localhost" , 9092 )
182
- q = KafkaQueue(kafka, " my-topic" , [0 ,1 ])
183
- q.put(" first" )
184
- q.put(" second" )
185
- q.get() # first
186
- q.get() # second
187
- q.close()
113
+ req = ProduceRequest(topic = " my-topic" , partition = 1 ,
114
+ messages = [KafkaProdocol.encode_message(" some message" )])
115
+ resps = kafka.send_produce_request(payloads = [req], fail_on_error = True )
188
116
kafka.close()
189
- ```
190
117
191
- Since the producer and consumers are backed by actual ` multiprocessing.Queue ` , you can
192
- do blocking or non-blocking puts and gets.
193
-
194
- ``` python
195
- q.put(" first" , block = False )
196
- q.get(block = True , timeout = 10 )
118
+ resps[0 ].topic # "my-topic"
119
+ resps[0 ].partition # 1
120
+ resps[0 ].error # 0 (hopefully)
121
+ resps[0 ].offset # offset of the first message sent in this request
197
122
```
0 commit comments