Skip to content

Commit 452e7c2

Browse files
committed
Merge pull request dpkp#623 from dpkp/kafka-3318
KAFKA-3318: clean up consumer logging and error messages
2 parents 5a14bd8 + c478d0c commit 452e7c2

File tree

12 files changed

+121
-99
lines changed

12 files changed

+121
-99
lines changed

kafka/cluster.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import collections
44
import copy
55
import logging
6-
import random
76
import threading
87
import time
98

kafka/consumer/fetcher.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -511,13 +511,13 @@ def _handle_offset_response(self, partition, future, response):
511511
future.success(offset)
512512
elif error_type in (Errors.NotLeaderForPartitionError,
513513
Errors.UnknownTopicOrPartitionError):
514-
log.warning("Attempt to fetch offsets for partition %s failed due"
515-
" to obsolete leadership information, retrying.",
516-
partition)
514+
log.debug("Attempt to fetch offsets for partition %s failed due"
515+
" to obsolete leadership information, retrying.",
516+
partition)
517517
future.failure(error_type(partition))
518518
else:
519-
log.error("Attempt to fetch offsets for partition %s failed due to:"
520-
" %s", partition, error_type)
519+
log.warning("Attempt to fetch offsets for partition %s failed due to:"
520+
" %s", partition, error_type)
521521
future.failure(error_type(partition))
522522

523523
def _create_fetch_requests(self):

kafka/consumer/simple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
)
2727
from ..common import (
2828
FetchRequestPayload, KafkaError, OffsetRequestPayload,
29-
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
29+
ConsumerFetchSizeTooSmall,
3030
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
3131
OffsetOutOfRangeError, FailedPayloadsError, check_error
3232
)

kafka/coordinator/base.py

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def ensure_coordinator_known(self):
200200
self._client.poll()
201201
continue
202202

203-
future = self._send_group_metadata_request()
203+
future = self._send_group_coordinator_request()
204204
self._client.poll(future=future)
205205

206206
if future.failed():
@@ -233,7 +233,7 @@ def ensure_active_group(self):
233233
while self.need_rejoin():
234234
self.ensure_coordinator_known()
235235

236-
future = self._perform_group_join()
236+
future = self._send_join_group_request()
237237
self._client.poll(future=future)
238238

239239
if future.succeeded():
@@ -253,7 +253,7 @@ def ensure_active_group(self):
253253
raise exception # pylint: disable-msg=raising-bad-type
254254
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
255255

256-
def _perform_group_join(self):
256+
def _send_join_group_request(self):
257257
"""Join the group and return the assignment for the next generation.
258258
259259
This function handles both JoinGroup and SyncGroup, delegating to
@@ -268,7 +268,7 @@ def _perform_group_join(self):
268268
return Future().failure(e)
269269

270270
# send a join group request to the coordinator
271-
log.debug("(Re-)joining group %s", self.group_id)
271+
log.info("(Re-)joining group %s", self.group_id)
272272
request = JoinGroupRequest(
273273
self.group_id,
274274
self.config['session_timeout_ms'],
@@ -279,7 +279,7 @@ def _perform_group_join(self):
279279
for protocol, metadata in self.group_protocols()])
280280

281281
# create the request for the coordinator
282-
log.debug("Issuing request (%s) to coordinator %s", request, self.coordinator_id)
282+
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
283283
future = Future()
284284
_f = self._client.send(self.coordinator_id, request)
285285
_f.add_callback(self._handle_join_group_response, future)
@@ -300,6 +300,8 @@ def _failed_request(self, node_id, request, future, error):
300300
def _handle_join_group_response(self, future, response):
301301
error_type = Errors.for_code(response.error_code)
302302
if error_type is Errors.NoError:
303+
log.debug("Received successful JoinGroup response for group %s: %s",
304+
self.group_id, response)
303305
self.member_id = response.member_id
304306
self.generation = response.generation_id
305307
self.rejoin_needed = False
@@ -315,30 +317,31 @@ def _handle_join_group_response(self, future, response):
315317
self._on_join_follower().chain(future)
316318

317319
elif error_type is Errors.GroupLoadInProgressError:
318-
log.debug("Attempt to join group %s rejected since coordinator is"
319-
" loading the group.", self.group_id)
320+
log.debug("Attempt to join group %s rejected since coordinator %s"
321+
" is loading the group.", self.group_id, self.coordinator_id)
320322
# backoff and retry
321323
future.failure(error_type(response))
322324
elif error_type is Errors.UnknownMemberIdError:
323325
# reset the member id and retry immediately
324326
error = error_type(self.member_id)
325327
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
326-
log.info("Attempt to join group %s failed due to unknown member id,"
327-
" resetting and retrying.", self.group_id)
328+
log.debug("Attempt to join group %s failed due to unknown member id",
329+
self.group_id)
328330
future.failure(error)
329331
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
330332
Errors.NotCoordinatorForGroupError):
331333
# re-discover the coordinator and retry with backoff
332334
self.coordinator_dead()
333-
log.info("Attempt to join group %s failed due to obsolete "
334-
"coordinator information, retrying.", self.group_id)
335+
log.debug("Attempt to join group %s failed due to obsolete "
336+
"coordinator information: %s", self.group_id,
337+
error_type.__name__)
335338
future.failure(error_type())
336339
elif error_type in (Errors.InconsistentGroupProtocolError,
337340
Errors.InvalidSessionTimeoutError,
338341
Errors.InvalidGroupIdError):
339342
# log the error and re-throw the exception
340343
error = error_type(response)
341-
log.error("Attempt to join group %s failed due to: %s",
344+
log.error("Attempt to join group %s failed due to fatal error: %s",
342345
self.group_id, error)
343346
future.failure(error)
344347
elif error_type is Errors.GroupAuthorizationFailedError:
@@ -356,8 +359,8 @@ def _on_join_follower(self):
356359
self.generation,
357360
self.member_id,
358361
{})
359-
log.debug("Issuing follower SyncGroup (%s) to coordinator %s",
360-
request, self.coordinator_id)
362+
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
363+
self.group_id, self.coordinator_id, request)
361364
return self._send_sync_group_request(request)
362365

363366
def _on_join_leader(self, response):
@@ -386,8 +389,8 @@ def _on_join_leader(self, response):
386389
assignment if isinstance(assignment, bytes) else assignment.encode())
387390
for member_id, assignment in six.iteritems(group_assignment)])
388391

389-
log.debug("Issuing leader SyncGroup (%s) to coordinator %s",
390-
request, self.coordinator_id)
392+
log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
393+
self.group_id, self.coordinator_id, request)
391394
return self._send_sync_group_request(request)
392395

393396
def _send_sync_group_request(self, request):
@@ -404,8 +407,8 @@ def _send_sync_group_request(self, request):
404407
def _handle_sync_group_response(self, future, response):
405408
error_type = Errors.for_code(response.error_code)
406409
if error_type is Errors.NoError:
407-
log.debug("Received successful sync group response for group %s: %s",
408-
self.group_id, response)
410+
log.info("Successfully joined group %s with generation %s",
411+
self.group_id, self.generation)
409412
#self.sensors.syncLatency.record(response.requestLatencyMs())
410413
future.success(response.member_assignment)
411414
return
@@ -415,29 +418,27 @@ def _handle_sync_group_response(self, future, response):
415418
if error_type is Errors.GroupAuthorizationFailedError:
416419
future.failure(error_type(self.group_id))
417420
elif error_type is Errors.RebalanceInProgressError:
418-
log.info("SyncGroup for group %s failed due to coordinator"
419-
" rebalance, rejoining the group", self.group_id)
421+
log.debug("SyncGroup for group %s failed due to coordinator"
422+
" rebalance", self.group_id)
420423
future.failure(error_type(self.group_id))
421424
elif error_type in (Errors.UnknownMemberIdError,
422425
Errors.IllegalGenerationError):
423426
error = error_type()
424-
log.info("SyncGroup for group %s failed due to %s,"
425-
" rejoining the group", self.group_id, error)
427+
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
426428
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
427429
future.failure(error)
428430
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
429431
Errors.NotCoordinatorForGroupError):
430432
error = error_type()
431-
log.info("SyncGroup for group %s failed due to %s, will find new"
432-
" coordinator and rejoin", self.group_id, error)
433+
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
433434
self.coordinator_dead()
434435
future.failure(error)
435436
else:
436437
error = error_type()
437438
log.error("Unexpected error from SyncGroup: %s", error)
438439
future.failure(error)
439440

440-
def _send_group_metadata_request(self):
441+
def _send_group_coordinator_request(self):
441442
"""Discover the current coordinator for the group.
442443
443444
Returns:
@@ -447,7 +448,8 @@ def _send_group_metadata_request(self):
447448
if node_id is None:
448449
return Future().failure(Errors.NoBrokersAvailable())
449450

450-
log.debug("Issuing group metadata request to broker %s", node_id)
451+
log.debug("Sending group coordinator request for group %s to broker %s",
452+
self.group_id, node_id)
451453
request = GroupCoordinatorRequest(self.group_id)
452454
future = Future()
453455
_f = self._client.send(node_id, request)
@@ -456,7 +458,7 @@ def _send_group_metadata_request(self):
456458
return future
457459

458460
def _handle_group_coordinator_response(self, future, response):
459-
log.debug("Group metadata response %s", response)
461+
log.debug("Received group coordinator response %s", response)
460462
if not self.coordinator_unknown():
461463
# We already found the coordinator, so ignore the request
462464
log.debug("Coordinator already known -- ignoring metadata response")
@@ -473,6 +475,8 @@ def _handle_group_coordinator_response(self, future, response):
473475
return
474476

475477
self.coordinator_id = response.coordinator_id
478+
log.info("Discovered coordinator %s for group %s",
479+
self.coordinator_id, self.group_id)
476480
self._client.ready(self.coordinator_id)
477481

478482
# start sending heartbeats only if we have a valid generation
@@ -495,8 +499,8 @@ def _handle_group_coordinator_response(self, future, response):
495499
def coordinator_dead(self, error=None):
496500
"""Mark the current coordinator as dead."""
497501
if self.coordinator_id is not None:
498-
log.warning("Marking the coordinator dead (node %s): %s.",
499-
self.coordinator_id, error)
502+
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
503+
self.coordinator_id, self.group_id, error)
500504
self.coordinator_id = None
501505

502506
def close(self):
@@ -542,22 +546,24 @@ def _handle_heartbeat_response(self, future, response):
542546
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
543547
error_type = Errors.for_code(response.error_code)
544548
if error_type is Errors.NoError:
545-
log.info("Heartbeat successful")
549+
log.debug("Received successful heartbeat response for group %s",
550+
self.group_id)
546551
future.success(None)
547552
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
548553
Errors.NotCoordinatorForGroupError):
549-
log.warning("Heartbeat failed: coordinator is either not started or"
550-
" not valid; will refresh metadata and retry")
554+
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
555+
" is either not started or not valid", self.group_id,
556+
self.coordinator_id)
551557
self.coordinator_dead()
552558
future.failure(error_type())
553559
elif error_type is Errors.RebalanceInProgressError:
554-
log.warning("Heartbeat: group is rebalancing; this consumer needs to"
555-
" re-join")
560+
log.warning("Heartbeat failed for group %s because it is"
561+
" rebalancing", self.group_id)
556562
self.rejoin_needed = True
557563
future.failure(error_type())
558564
elif error_type is Errors.IllegalGenerationError:
559-
log.warning("Heartbeat: generation id is not current; this consumer"
560-
" needs to re-join")
565+
log.warning("Heartbeat failed for group %s: generation id is not "
566+
" current.", self.group_id)
561567
self.rejoin_needed = True
562568
future.failure(error_type())
563569
elif error_type is Errors.UnknownMemberIdError:

0 commit comments

Comments
 (0)