Skip to content

Commit 91f4642

Browse files
dpkpjeffwidman
authored andcommitted
Use dedicated connection for group coordinator (dpkp#1822)
This changes the coordinator_id to be a unique string, e.g., `coordinator-1`, so that it will get a dedicated connection. This won't eliminate lock contention because the client lock applies to all connections, but it should improve in-flight-request contention.
1 parent f126e5b commit 91f4642

File tree

2 files changed

+17
-25
lines changed

2 files changed

+17
-25
lines changed

kafka/cluster.py

+14-22
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def __init__(self, **configs):
6565
self.config[key] = configs[key]
6666

6767
self._bootstrap_brokers = self._generate_bootstrap_brokers()
68+
self._coordinator_brokers = {}
6869

6970
def _generate_bootstrap_brokers(self):
7071
# collect_hosts does not perform DNS, so we should be fine to re-use
@@ -96,7 +97,11 @@ def broker_metadata(self, broker_id):
9697
Returns:
9798
BrokerMetadata or None if not found
9899
"""
99-
return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id)
100+
return (
101+
self._brokers.get(broker_id) or
102+
self._bootstrap_brokers.get(broker_id) or
103+
self._coordinator_brokers.get(broker_id)
104+
)
100105

101106
def partitions_for_topic(self, topic):
102107
"""Return set of all partitions for topic (whether available or not)
@@ -341,41 +346,28 @@ def add_group_coordinator(self, group, response):
341346
response (GroupCoordinatorResponse): broker response
342347
343348
Returns:
344-
bool: True if metadata is updated, False on error
349+
string: coordinator node_id if metadata is updated, None on error
345350
"""
346351
log.debug("Updating coordinator for %s: %s", group, response)
347352
error_type = Errors.for_code(response.error_code)
348353
if error_type is not Errors.NoError:
349354
log.error("GroupCoordinatorResponse error: %s", error_type)
350355
self._groups[group] = -1
351-
return False
356+
return
352357

353-
node_id = response.coordinator_id
358+
# Use a coordinator-specific node id so that group requests
359+
# get a dedicated connection
360+
node_id = 'coordinator-{}'.format(response.coordinator_id)
354361
coordinator = BrokerMetadata(
355-
response.coordinator_id,
362+
node_id,
356363
response.host,
357364
response.port,
358365
None)
359366

360-
# Assume that group coordinators are just brokers
361-
# (this is true now, but could diverge in future)
362-
if node_id not in self._brokers:
363-
self._brokers[node_id] = coordinator
364-
365-
# If this happens, either brokers have moved without
366-
# changing IDs, or our assumption above is wrong
367-
else:
368-
node = self._brokers[node_id]
369-
if coordinator.host != node.host or coordinator.port != node.port:
370-
log.error("GroupCoordinator metadata conflicts with existing"
371-
" broker metadata. Coordinator: %s, Broker: %s",
372-
coordinator, node)
373-
self._groups[group] = node_id
374-
return False
375-
376367
log.info("Group coordinator for %s is %s", group, coordinator)
368+
self._coordinator_brokers[node_id] = coordinator
377369
self._groups[group] = node_id
378-
return True
370+
return node_id
379371

380372
def with_partitions(self, partitions_to_add):
381373
"""Returns a copy of cluster metadata with partitions added"""

kafka/coordinator/base.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -676,14 +676,14 @@ def _handle_group_coordinator_response(self, future, response):
676676
error_type = Errors.for_code(response.error_code)
677677
if error_type is Errors.NoError:
678678
with self._client._lock, self._lock:
679-
ok = self._client.cluster.add_group_coordinator(self.group_id, response)
680-
if not ok:
679+
coordinator_id = self._client.cluster.add_group_coordinator(self.group_id, response)
680+
if not coordinator_id:
681681
# This could happen if coordinator metadata is different
682682
# than broker metadata
683683
future.failure(Errors.IllegalStateError())
684684
return
685685

686-
self.coordinator_id = response.coordinator_id
686+
self.coordinator_id = coordinator_id
687687
log.info("Discovered coordinator %s for group %s",
688688
self.coordinator_id, self.group_id)
689689
self._client.maybe_connect(self.coordinator_id)

0 commit comments

Comments
 (0)