@@ -196,6 +196,9 @@ def __init__(self, **configs):
196
196
assert self .config ['api_version' ] in self .API_VERSIONS , (
197
197
'api_version [{0}] must be one of: {1}' .format (
198
198
self .config ['api_version' ], str (self .API_VERSIONS )))
199
+ else :
200
+ # This should get updated after a successful bootstrap
201
+ self .config ['api_version' ] = (0 , 0 )
199
202
200
203
self .cluster = ClusterMetadata (** self .config )
201
204
self ._topics = set () # empty set will fetch all topic metadata
@@ -228,11 +231,6 @@ def __init__(self, **configs):
228
231
229
232
self ._bootstrap (collect_hosts (self .config ['bootstrap_servers' ]))
230
233
231
- # Check Broker Version if not set explicitly
232
- if self .config ['api_version' ] is None :
233
- check_timeout = self .config ['api_version_auto_timeout_ms' ] / 1000
234
- self .config ['api_version' ] = self .check_version (timeout = check_timeout )
235
-
236
234
def _bootstrap (self , hosts ):
237
235
log .info ('Bootstrapping cluster metadata from %s' , hosts )
238
236
# Exponential backoff if bootstrap fails
@@ -245,7 +243,7 @@ def _bootstrap(self, hosts):
245
243
time .sleep (next_at - now )
246
244
self ._last_bootstrap = time .time ()
247
245
248
- if self .config ['api_version' ] is None or self . config [ 'api_version' ] < (0 , 10 ):
246
+ if self .config ['api_version' ] < (0 , 10 ):
249
247
metadata_request = MetadataRequest [0 ]([])
250
248
else :
251
249
metadata_request = MetadataRequest [1 ](None )
@@ -283,7 +281,13 @@ def _bootstrap(self, hosts):
283
281
else :
284
282
bootstrap .close ()
285
283
self ._bootstrap_fails = 0
284
+
285
+ # Check Broker Version if not set explicitly
286
+ if self .config ['api_version' ] == (0 , 0 ):
287
+ check_timeout = self .config ['api_version_auto_timeout_ms' ] / 1000
288
+ self .config ['api_version' ] = self .check_version (timeout = check_timeout )
286
289
break
290
+
287
291
# No bootstrap found...
288
292
else :
289
293
log .error ('Unable to bootstrap from %s' , hosts )
@@ -821,10 +825,10 @@ def check_version(self, node_id=None, timeout=2, strict=False):
821
825
This is only possible if node_id is None.
822
826
823
827
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
828
+ (0, 0) returned if the version cannot be determined,
829
+ typically due to networking.
824
830
825
831
Raises:
826
- NodeNotReadyError (if node_id is provided)
827
- NoBrokersAvailable (if node_id is None)
828
832
UnrecognizedBrokerVersion: please file bug if seen!
829
833
AssertionError (if strict=True): please file bug if seen!
830
834
"""
@@ -835,7 +839,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
835
839
# which can block for an increasing backoff period
836
840
try_node = node_id or self .least_loaded_node ()
837
841
if try_node is None :
838
- raise Errors . NoBrokersAvailable ( )
842
+ return ( 0 , 0 )
839
843
self ._maybe_connect (try_node )
840
844
conn = self ._conns [try_node ]
841
845
@@ -847,15 +851,14 @@ def check_version(self, node_id=None, timeout=2, strict=False):
847
851
version = conn .check_version (timeout = remaining , strict = strict )
848
852
return version
849
853
except Errors .NodeNotReadyError :
850
- # Only raise to user if this is a node-specific request
851
854
if node_id is not None :
852
- raise
855
+ return ( 0 , 0 )
853
856
finally :
854
857
self ._refresh_on_disconnects = True
855
858
856
859
# Timeout
857
860
else :
858
- raise Errors . NoBrokersAvailable ( )
861
+ return ( 0 , 0 )
859
862
860
863
def wakeup (self ):
861
864
with self ._wake_lock :
0 commit comments