@@ -2,8 +2,8 @@ Simple APIs (DEPRECATED)
2
2
************************
3
3
4
4
5
- SimpleConsumer
6
- ==============
5
+ SimpleConsumer (DEPRECATED)
6
+ ===========================
7
7
8
8
.. code :: python
9
9
@@ -37,8 +37,8 @@ SimpleConsumer
37
37
client.close()
38
38
39
39
40
- SimpleProducer
41
- ==============
40
+ SimpleProducer (DEPRECATED)
41
+ ===========================
42
42
43
43
Asynchronous Mode
44
44
-----------------
@@ -98,8 +98,8 @@ Synchronous Mode
98
98
logging.info(r.offset)
99
99
100
100
101
- KeyedProducer
102
- =============
101
+ KeyedProducer (DEPRECATED)
102
+ ==========================
103
103
104
104
.. code :: python
105
105
@@ -121,24 +121,43 @@ KeyedProducer
121
121
producer = KeyedProducer(kafka, partitioner = RoundRobinPartitioner)
122
122
123
123
124
- SimpleClient
125
- ============
124
+ SimpleClient (DEPRECATED)
125
+ =========================
126
126
127
127
128
128
.. code :: python
129
129
130
- from kafka import SimpleClient, create_message
131
- from kafka.protocol import KafkaProtocol
132
- from kafka.common import ProduceRequest
130
+ import time
131
+ from kafka import SimpleClient
132
+ from kafka.common import (
133
+ LeaderNotAvailableError, NotLeaderForPartitionError,
134
+ ProduceRequestPayload)
135
+ from kafka.protocol import create_message
133
136
134
- kafka = SimpleClient(" localhost:9092" )
137
+ kafka = SimpleClient(' localhost:9092' )
138
+ payload = ProduceRequestPayload(topic = ' my-topic' , partition = 0 ,
139
+ messages = [create_message(" some message" )])
140
+
141
+ retries = 5
142
+ resps = []
143
+ while retries and not resps:
144
+ retries -= 1
145
+ try :
146
+ resps = kafka.send_produce_request(
147
+ payloads = [payload], fail_on_error = True )
148
+ except LeaderNotAvailableError, NotLeaderForPartitionError:
149
+ kafka.load_metadata_for_topics()
150
+ time.sleep(1 )
151
+
152
+ # Other exceptions you might consider handling:
153
+ # UnknownTopicOrPartitionError, TopicAuthorizationFailedError,
154
+ # RequestTimedOutError, MessageSizeTooLargeError, InvalidTopicError,
155
+ # RecordListTooLargeError, InvalidRequiredAcksError,
156
+ # NotEnoughReplicasError, NotEnoughReplicasAfterAppendError
135
157
136
- req = ProduceRequest(topic = " my-topic" , partition = 1 ,
137
- messages = [create_message(" some message" )])
138
- resps = kafka.send_produce_request(payloads = [req], fail_on_error = True )
139
158
kafka.close()
140
159
141
- resps[0 ].topic # " my-topic"
142
- resps[0 ].partition # 1
143
- resps[0 ].error # 0 (hopefully)
160
+ resps[0 ].topic # ' my-topic'
161
+ resps[0 ].partition # 0
162
+ resps[0 ].error # 0
144
163
resps[0 ].offset # offset of the first message sent in this request
0 commit comments