Skip to content

Commit 98119e9

Browse files
author
Dana Powers
committed
Defer version check until after bootstrap succeeds
1 parent 4abdb1b commit 98119e9

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

kafka/client_async.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ def __init__(self, **configs):
196196
assert self.config['api_version'] in self.API_VERSIONS, (
197197
'api_version [{0}] must be one of: {1}'.format(
198198
self.config['api_version'], str(self.API_VERSIONS)))
199+
else:
200+
# This should get updated after a successful bootstrap
201+
self.config['api_version'] = (0, 0)
199202

200203
self.cluster = ClusterMetadata(**self.config)
201204
self._topics = set() # empty set will fetch all topic metadata
@@ -228,11 +231,6 @@ def __init__(self, **configs):
228231

229232
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
230233

231-
# Check Broker Version if not set explicitly
232-
if self.config['api_version'] is None:
233-
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
234-
self.config['api_version'] = self.check_version(timeout=check_timeout)
235-
236234
def _bootstrap(self, hosts):
237235
log.info('Bootstrapping cluster metadata from %s', hosts)
238236
# Exponential backoff if bootstrap fails
@@ -245,7 +243,7 @@ def _bootstrap(self, hosts):
245243
time.sleep(next_at - now)
246244
self._last_bootstrap = time.time()
247245

248-
if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
246+
if self.config['api_version'] < (0, 10):
249247
metadata_request = MetadataRequest[0]([])
250248
else:
251249
metadata_request = MetadataRequest[1](None)
@@ -283,7 +281,13 @@ def _bootstrap(self, hosts):
283281
else:
284282
bootstrap.close()
285283
self._bootstrap_fails = 0
284+
285+
# Check Broker Version if not set explicitly
286+
if self.config['api_version'] == (0, 0):
287+
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
288+
self.config['api_version'] = self.check_version(timeout=check_timeout)
286289
break
290+
287291
# No bootstrap found...
288292
else:
289293
log.error('Unable to bootstrap from %s', hosts)
@@ -821,10 +825,10 @@ def check_version(self, node_id=None, timeout=2, strict=False):
821825
This is only possible if node_id is None.
822826
823827
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
828+
(0, 0) returned if the version cannot be determined,
829+
typically due to networking.
824830
825831
Raises:
826-
NodeNotReadyError (if node_id is provided)
827-
NoBrokersAvailable (if node_id is None)
828832
UnrecognizedBrokerVersion: please file bug if seen!
829833
AssertionError (if strict=True): please file bug if seen!
830834
"""
@@ -835,7 +839,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
835839
# which can block for an increasing backoff period
836840
try_node = node_id or self.least_loaded_node()
837841
if try_node is None:
838-
raise Errors.NoBrokersAvailable()
842+
return (0, 0)
839843
self._maybe_connect(try_node)
840844
conn = self._conns[try_node]
841845

@@ -847,15 +851,14 @@ def check_version(self, node_id=None, timeout=2, strict=False):
847851
version = conn.check_version(timeout=remaining, strict=strict)
848852
return version
849853
except Errors.NodeNotReadyError:
850-
# Only raise to user if this is a node-specific request
851854
if node_id is not None:
852-
raise
855+
return (0, 0)
853856
finally:
854857
self._refresh_on_disconnects = True
855858

856859
# Timeout
857860
else:
858-
raise Errors.NoBrokersAvailable()
861+
return (0, 0)
859862

860863
def wakeup(self):
861864
with self._wake_lock:

0 commit comments

Comments
 (0)