@@ -174,11 +174,16 @@ def _handle_produce_response(self, batches, response):
174
174
for batch in batches ])
175
175
176
176
for topic , partitions in response .topics :
177
- for partition , error_code , offset in partitions :
177
+ for partition_info in partitions :
178
+ if response .API_VERSION < 2 :
179
+ partition , error_code , offset = partition_info
180
+ ts = None
181
+ else :
182
+ partition , error_code , offset , ts = partition_info
178
183
tp = TopicPartition (topic , partition )
179
184
error = Errors .for_code (error_code )
180
185
batch = batches_by_partition [tp ]
181
- self ._complete_batch (batch , error , offset )
186
+ self ._complete_batch (batch , error , offset , ts )
182
187
183
188
else :
184
189
# this is the acks = 0 case, just complete all requests
@@ -258,7 +263,12 @@ def _produce_request(self, node_id, acks, timeout, batches):
258
263
buf = batch .records .buffer ()
259
264
produce_records_by_partition [topic ][partition ] = buf
260
265
261
- version = 1 if self .config ['api_version' ] >= (0 , 9 ) else 0
266
+ if self .config ['api_version' ] >= (0 , 10 ):
267
+ version = 2
268
+ elif self .config ['api_version' ] == (0 , 9 ):
269
+ version = 1
270
+ else :
271
+ version = 0
262
272
return ProduceRequest [version ](
263
273
required_acks = acks ,
264
274
timeout = timeout ,
0 commit comments