@@ -65,6 +65,7 @@ def __init__(self, **configs):
65
65
self .config [key ] = configs [key ]
66
66
67
67
self ._bootstrap_brokers = self ._generate_bootstrap_brokers ()
68
+ self ._coordinator_brokers = {}
68
69
69
70
def _generate_bootstrap_brokers (self ):
70
71
# collect_hosts does not perform DNS, so we should be fine to re-use
@@ -96,7 +97,11 @@ def broker_metadata(self, broker_id):
96
97
Returns:
97
98
BrokerMetadata or None if not found
98
99
"""
99
- return self ._brokers .get (broker_id ) or self ._bootstrap_brokers .get (broker_id )
100
+ return (
101
+ self ._brokers .get (broker_id ) or
102
+ self ._bootstrap_brokers .get (broker_id ) or
103
+ self ._coordinator_brokers .get (broker_id )
104
+ )
100
105
101
106
def partitions_for_topic (self , topic ):
102
107
"""Return set of all partitions for topic (whether available or not)
@@ -341,41 +346,28 @@ def add_group_coordinator(self, group, response):
341
346
response (GroupCoordinatorResponse): broker response
342
347
343
348
Returns:
344
- bool: True if metadata is updated, False on error
349
+ string: coordinator node_id if metadata is updated, None on error
345
350
"""
346
351
log .debug ("Updating coordinator for %s: %s" , group , response )
347
352
error_type = Errors .for_code (response .error_code )
348
353
if error_type is not Errors .NoError :
349
354
log .error ("GroupCoordinatorResponse error: %s" , error_type )
350
355
self ._groups [group ] = - 1
351
- return False
356
+ return
352
357
353
- node_id = response .coordinator_id
358
+ # Use a coordinator-specific node id so that group requests
359
+ # get a dedicated connection
360
+ node_id = 'coordinator-{}' .format (response .coordinator_id )
354
361
coordinator = BrokerMetadata (
355
- response . coordinator_id ,
362
+ node_id ,
356
363
response .host ,
357
364
response .port ,
358
365
None )
359
366
360
- # Assume that group coordinators are just brokers
361
- # (this is true now, but could diverge in future)
362
- if node_id not in self ._brokers :
363
- self ._brokers [node_id ] = coordinator
364
-
365
- # If this happens, either brokers have moved without
366
- # changing IDs, or our assumption above is wrong
367
- else :
368
- node = self ._brokers [node_id ]
369
- if coordinator .host != node .host or coordinator .port != node .port :
370
- log .error ("GroupCoordinator metadata conflicts with existing"
371
- " broker metadata. Coordinator: %s, Broker: %s" ,
372
- coordinator , node )
373
- self ._groups [group ] = node_id
374
- return False
375
-
376
367
log .info ("Group coordinator for %s is %s" , group , coordinator )
368
+ self ._coordinator_brokers [node_id ] = coordinator
377
369
self ._groups [group ] = node_id
378
- return True
370
+ return node_id
379
371
380
372
def with_partitions (self , partitions_to_add ):
381
373
"""Returns a copy of cluster metadata with partitions added"""
0 commit comments