@@ -200,7 +200,7 @@ def ensure_coordinator_known(self):
200
200
self ._client .poll ()
201
201
continue
202
202
203
- future = self ._send_group_metadata_request ()
203
+ future = self ._send_group_coordinator_request ()
204
204
self ._client .poll (future = future )
205
205
206
206
if future .failed ():
@@ -233,7 +233,7 @@ def ensure_active_group(self):
233
233
while self .need_rejoin ():
234
234
self .ensure_coordinator_known ()
235
235
236
- future = self ._perform_group_join ()
236
+ future = self ._send_join_group_request ()
237
237
self ._client .poll (future = future )
238
238
239
239
if future .succeeded ():
@@ -253,7 +253,7 @@ def ensure_active_group(self):
253
253
raise exception # pylint: disable-msg=raising-bad-type
254
254
time .sleep (self .config ['retry_backoff_ms' ] / 1000.0 )
255
255
256
- def _perform_group_join (self ):
256
+ def _send_join_group_request (self ):
257
257
"""Join the group and return the assignment for the next generation.
258
258
259
259
This function handles both JoinGroup and SyncGroup, delegating to
@@ -268,7 +268,7 @@ def _perform_group_join(self):
268
268
return Future ().failure (e )
269
269
270
270
# send a join group request to the coordinator
271
- log .debug ("(Re-)joining group %s" , self .group_id )
271
+ log .info ("(Re-)joining group %s" , self .group_id )
272
272
request = JoinGroupRequest (
273
273
self .group_id ,
274
274
self .config ['session_timeout_ms' ],
@@ -279,7 +279,7 @@ def _perform_group_join(self):
279
279
for protocol , metadata in self .group_protocols ()])
280
280
281
281
# create the request for the coordinator
282
- log .debug ("Issuing request (%s) to coordinator %s" , request , self .coordinator_id )
282
+ log .debug ("Sending JoinGroup (%s) to coordinator %s" , request , self .coordinator_id )
283
283
future = Future ()
284
284
_f = self ._client .send (self .coordinator_id , request )
285
285
_f .add_callback (self ._handle_join_group_response , future )
@@ -300,6 +300,8 @@ def _failed_request(self, node_id, request, future, error):
300
300
def _handle_join_group_response (self , future , response ):
301
301
error_type = Errors .for_code (response .error_code )
302
302
if error_type is Errors .NoError :
303
+ log .debug ("Received successful JoinGroup response for group %s: %s" ,
304
+ self .group_id , response )
303
305
self .member_id = response .member_id
304
306
self .generation = response .generation_id
305
307
self .rejoin_needed = False
@@ -315,30 +317,31 @@ def _handle_join_group_response(self, future, response):
315
317
self ._on_join_follower ().chain (future )
316
318
317
319
elif error_type is Errors .GroupLoadInProgressError :
318
- log .debug ("Attempt to join group %s rejected since coordinator is "
319
- " loading the group." , self .group_id )
320
+ log .debug ("Attempt to join group %s rejected since coordinator %s "
321
+ " is loading the group." , self .group_id , self . coordinator_id )
320
322
# backoff and retry
321
323
future .failure (error_type (response ))
322
324
elif error_type is Errors .UnknownMemberIdError :
323
325
# reset the member id and retry immediately
324
326
error = error_type (self .member_id )
325
327
self .member_id = JoinGroupRequest .UNKNOWN_MEMBER_ID
326
- log .info ("Attempt to join group %s failed due to unknown member id,"
327
- " resetting and retrying." , self .group_id )
328
+ log .debug ("Attempt to join group %s failed due to unknown member id" ,
329
+ self .group_id )
328
330
future .failure (error )
329
331
elif error_type in (Errors .GroupCoordinatorNotAvailableError ,
330
332
Errors .NotCoordinatorForGroupError ):
331
333
# re-discover the coordinator and retry with backoff
332
334
self .coordinator_dead ()
333
- log .info ("Attempt to join group %s failed due to obsolete "
334
- "coordinator information, retrying." , self .group_id )
335
+ log .debug ("Attempt to join group %s failed due to obsolete "
336
+ "coordinator information: %s" , self .group_id ,
337
+ error_type .__name__ )
335
338
future .failure (error_type ())
336
339
elif error_type in (Errors .InconsistentGroupProtocolError ,
337
340
Errors .InvalidSessionTimeoutError ,
338
341
Errors .InvalidGroupIdError ):
339
342
# log the error and re-throw the exception
340
343
error = error_type (response )
341
- log .error ("Attempt to join group %s failed due to: %s" ,
344
+ log .error ("Attempt to join group %s failed due to fatal error : %s" ,
342
345
self .group_id , error )
343
346
future .failure (error )
344
347
elif error_type is Errors .GroupAuthorizationFailedError :
@@ -356,8 +359,8 @@ def _on_join_follower(self):
356
359
self .generation ,
357
360
self .member_id ,
358
361
{})
359
- log .debug ("Issuing follower SyncGroup (%s) to coordinator %s" ,
360
- request , self .coordinator_id )
362
+ log .debug ("Sending follower SyncGroup for group %s to coordinator %s: %s" ,
363
+ self . group_id , self .coordinator_id , request )
361
364
return self ._send_sync_group_request (request )
362
365
363
366
def _on_join_leader (self , response ):
@@ -386,8 +389,8 @@ def _on_join_leader(self, response):
386
389
assignment if isinstance (assignment , bytes ) else assignment .encode ())
387
390
for member_id , assignment in six .iteritems (group_assignment )])
388
391
389
- log .debug ("Issuing leader SyncGroup (%s) to coordinator %s" ,
390
- request , self .coordinator_id )
392
+ log .debug ("Sending leader SyncGroup for group %s to coordinator %s: %s" ,
393
+ self . group_id , self .coordinator_id , request )
391
394
return self ._send_sync_group_request (request )
392
395
393
396
def _send_sync_group_request (self , request ):
@@ -404,8 +407,8 @@ def _send_sync_group_request(self, request):
404
407
def _handle_sync_group_response (self , future , response ):
405
408
error_type = Errors .for_code (response .error_code )
406
409
if error_type is Errors .NoError :
407
- log .debug ( "Received successful sync group response for group %s: %s" ,
408
- self .group_id , response )
410
+ log .info ( "Successfully joined group %s with generation %s" ,
411
+ self .group_id , self . generation )
409
412
#self.sensors.syncLatency.record(response.requestLatencyMs())
410
413
future .success (response .member_assignment )
411
414
return
@@ -415,29 +418,27 @@ def _handle_sync_group_response(self, future, response):
415
418
if error_type is Errors .GroupAuthorizationFailedError :
416
419
future .failure (error_type (self .group_id ))
417
420
elif error_type is Errors .RebalanceInProgressError :
418
- log .info ("SyncGroup for group %s failed due to coordinator"
419
- " rebalance, rejoining the group " , self .group_id )
421
+ log .debug ("SyncGroup for group %s failed due to coordinator"
422
+ " rebalance" , self .group_id )
420
423
future .failure (error_type (self .group_id ))
421
424
elif error_type in (Errors .UnknownMemberIdError ,
422
425
Errors .IllegalGenerationError ):
423
426
error = error_type ()
424
- log .info ("SyncGroup for group %s failed due to %s,"
425
- " rejoining the group" , self .group_id , error )
427
+ log .debug ("SyncGroup for group %s failed due to %s" , self .group_id , error )
426
428
self .member_id = JoinGroupRequest .UNKNOWN_MEMBER_ID
427
429
future .failure (error )
428
430
elif error_type in (Errors .GroupCoordinatorNotAvailableError ,
429
431
Errors .NotCoordinatorForGroupError ):
430
432
error = error_type ()
431
- log .info ("SyncGroup for group %s failed due to %s, will find new"
432
- " coordinator and rejoin" , self .group_id , error )
433
+ log .debug ("SyncGroup for group %s failed due to %s" , self .group_id , error )
433
434
self .coordinator_dead ()
434
435
future .failure (error )
435
436
else :
436
437
error = error_type ()
437
438
log .error ("Unexpected error from SyncGroup: %s" , error )
438
439
future .failure (error )
439
440
440
- def _send_group_metadata_request (self ):
441
+ def _send_group_coordinator_request (self ):
441
442
"""Discover the current coordinator for the group.
442
443
443
444
Returns:
@@ -447,7 +448,8 @@ def _send_group_metadata_request(self):
447
448
if node_id is None :
448
449
return Future ().failure (Errors .NoBrokersAvailable ())
449
450
450
- log .debug ("Issuing group metadata request to broker %s" , node_id )
451
+ log .debug ("Sending group coordinator request for group %s to broker %s" ,
452
+ self .group_id , node_id )
451
453
request = GroupCoordinatorRequest (self .group_id )
452
454
future = Future ()
453
455
_f = self ._client .send (node_id , request )
@@ -456,7 +458,7 @@ def _send_group_metadata_request(self):
456
458
return future
457
459
458
460
def _handle_group_coordinator_response (self , future , response ):
459
- log .debug ("Group metadata response %s" , response )
461
+ log .debug ("Received group coordinator response %s" , response )
460
462
if not self .coordinator_unknown ():
461
463
# We already found the coordinator, so ignore the request
462
464
log .debug ("Coordinator already known -- ignoring metadata response" )
@@ -473,6 +475,8 @@ def _handle_group_coordinator_response(self, future, response):
473
475
return
474
476
475
477
self .coordinator_id = response .coordinator_id
478
+ log .info ("Discovered coordinator %s for group %s" ,
479
+ self .coordinator_id , self .group_id )
476
480
self ._client .ready (self .coordinator_id )
477
481
478
482
# start sending heartbeats only if we have a valid generation
@@ -495,8 +499,8 @@ def _handle_group_coordinator_response(self, future, response):
495
499
def coordinator_dead (self , error = None ):
496
500
"""Mark the current coordinator as dead."""
497
501
if self .coordinator_id is not None :
498
- log .warning ("Marking the coordinator dead (node %s): %s." ,
499
- self .coordinator_id , error )
502
+ log .warning ("Marking the coordinator dead (node %s) for group %s : %s." ,
503
+ self .coordinator_id , self . group_id , error )
500
504
self .coordinator_id = None
501
505
502
506
def close (self ):
@@ -542,22 +546,24 @@ def _handle_heartbeat_response(self, future, response):
542
546
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
543
547
error_type = Errors .for_code (response .error_code )
544
548
if error_type is Errors .NoError :
545
- log .info ("Heartbeat successful" )
549
+ log .debug ("Received successful heartbeat response for group %s" ,
550
+ self .group_id )
546
551
future .success (None )
547
552
elif error_type in (Errors .GroupCoordinatorNotAvailableError ,
548
553
Errors .NotCoordinatorForGroupError ):
549
- log .warning ("Heartbeat failed: coordinator is either not started or"
550
- " not valid; will refresh metadata and retry" )
554
+ log .warning ("Heartbeat failed for group %s: coordinator (node %s)"
555
+ " is either not started or not valid" , self .group_id ,
556
+ self .coordinator_id )
551
557
self .coordinator_dead ()
552
558
future .failure (error_type ())
553
559
elif error_type is Errors .RebalanceInProgressError :
554
- log .warning ("Heartbeat: group is rebalancing; this consumer needs to "
555
- " re-join" )
560
+ log .warning ("Heartbeat failed for group %s because it is "
561
+ " rebalancing" , self . group_id )
556
562
self .rejoin_needed = True
557
563
future .failure (error_type ())
558
564
elif error_type is Errors .IllegalGenerationError :
559
- log .warning ("Heartbeat: generation id is not current; this consumer "
560
- " needs to re-join" )
565
+ log .warning ("Heartbeat failed for group %s : generation id is not "
566
+ " current." , self . group_id )
561
567
self .rejoin_needed = True
562
568
future .failure (error_type ())
563
569
elif error_type is Errors .UnknownMemberIdError :
0 commit comments