Skip to content

Commit 8b0a598

Browse files
author
Dana Powers
committed
Add send/receive debug logging to async producer
1 parent 7b2f98f commit 8b0a598

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

kafka/producer/base.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
101101
count = 0
102102
log.debug('Skipping new batch collection to handle retries')
103103
else:
104-
log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout))
104+
log.debug('Batching size: %s, timeout: %s', count, timeout)
105105

106106
# Keep fetching till we gather enough messages or a
107107
# timeout is reached
@@ -147,12 +147,14 @@ def _handle_error(error_cls, request):
147147
retry_state['do_refresh'] |= True
148148

149149
requests = list(request_tries.keys())
150-
reply = client.send_produce_request(requests,
151-
acks=req_acks,
152-
timeout=ack_timeout,
153-
fail_on_error=False)
154-
155-
for i, response in enumerate(reply):
150+
log.debug('Sending: %s', requests)
151+
responses = client.send_produce_request(requests,
152+
acks=req_acks,
153+
timeout=ack_timeout,
154+
fail_on_error=False)
155+
156+
log.debug('Received: %s', responses)
157+
for i, response in enumerate(responses):
156158
error_cls = None
157159
if isinstance(response, FailedPayloadsError):
158160
error_cls = response.__class__
@@ -164,7 +166,8 @@ def _handle_error(error_cls, request):
164166

165167
if error_cls:
166168
_handle_error(error_cls, orig_req)
167-
log.error('Error sending ProduceRequest to %s:%d with msgs %s',
169+
log.error('Error sending ProduceRequest (#%d of %d) to %s:%d '
170+
'with msgs %s', i + 1, len(requests),
168171
orig_req.topic, orig_req.partition,
169172
orig_req.messages if log_messages_on_error
170173
else hash(orig_req.messages))

0 commit comments

Comments
 (0)