@@ -906,6 +906,7 @@ def check_version(self, timeout=2, strict=False):
906
906
907
907
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
908
908
"""
909
+ log .info ('Probing node %s broker version' , self .node_id )
909
910
# Monkeypatch some connection configurations to avoid timeouts
910
911
override_config = {
911
912
'request_timeout_ms' : timeout * 1000 ,
@@ -924,17 +925,6 @@ def check_version(self, timeout=2, strict=False):
924
925
from kafka .protocol .admin import ApiVersionRequest , ListGroupsRequest
925
926
from kafka .protocol .commit import OffsetFetchRequest , GroupCoordinatorRequest
926
927
927
- # Socket errors are logged as exceptions and can alarm users. Mute them
928
- from logging import Filter
929
-
930
- class ConnFilter (Filter ):
931
- def filter (self , record ):
932
- if record .funcName == 'check_version' :
933
- return True
934
- return False
935
- log_filter = ConnFilter ()
936
- log .addFilter (log_filter )
937
-
938
928
test_cases = [
939
929
# All cases starting from 0.10 will be based on ApiVersionResponse
940
930
((0 , 10 ), ApiVersionRequest [0 ]()),
@@ -1004,7 +994,6 @@ def filter(self, record):
1004
994
else :
1005
995
raise Errors .UnrecognizedBrokerVersion ()
1006
996
1007
- log .removeFilter (log_filter )
1008
997
for key in stashed :
1009
998
self .config [key ] = stashed [key ]
1010
999
return version
0 commit comments