@@ -101,7 +101,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
101
101
count = 0
102
102
log .debug ('Skipping new batch collection to handle retries' )
103
103
else :
104
- log .debug ('Batching size: {0} , timeout: {1}' . format ( count , timeout ) )
104
+ log .debug ('Batching size: %s , timeout: %s' , count , timeout )
105
105
106
106
# Keep fetching till we gather enough messages or a
107
107
# timeout is reached
@@ -147,12 +147,14 @@ def _handle_error(error_cls, request):
147
147
retry_state ['do_refresh' ] |= True
148
148
149
149
requests = list (request_tries .keys ())
150
- reply = client .send_produce_request (requests ,
151
- acks = req_acks ,
152
- timeout = ack_timeout ,
153
- fail_on_error = False )
154
-
155
- for i , response in enumerate (reply ):
150
+ log .debug ('Sending: %s' , requests )
151
+ responses = client .send_produce_request (requests ,
152
+ acks = req_acks ,
153
+ timeout = ack_timeout ,
154
+ fail_on_error = False )
155
+
156
+ log .debug ('Received: %s' , responses )
157
+ for i , response in enumerate (responses ):
156
158
error_cls = None
157
159
if isinstance (response , FailedPayloadsError ):
158
160
error_cls = response .__class__
@@ -164,7 +166,8 @@ def _handle_error(error_cls, request):
164
166
165
167
if error_cls :
166
168
_handle_error (error_cls , orig_req )
167
- log .error ('Error sending ProduceRequest to %s:%d with msgs %s' ,
169
+ log .error ('Error sending ProduceRequest (#%d of %d) to %s:%d '
170
+ 'with msgs %s' , i + 1 , len (requests ),
168
171
orig_req .topic , orig_req .partition ,
169
172
orig_req .messages if log_messages_on_error
170
173
else hash (orig_req .messages ))
0 commit comments