Skip to content

Commit 5c78489

Browse files
authored
Monkeypatch max_in_flight_requests_per_connection when checking broker version (dpkp#834)
1 parent 2a7aca1 commit 5c78489

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

kafka/conn.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -738,11 +738,15 @@ def check_version(self, timeout=2, strict=False):
738738
739739
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
740740
"""
741-
# Monkeypatch the connection request timeout
742-
# Generally this timeout should not get triggered
743-
# but in case it does, we want it to be reasonably short
744-
stashed_request_timeout_ms = self.config['request_timeout_ms']
745-
self.config['request_timeout_ms'] = timeout * 1000
741+
# Monkeypatch some connection configurations to avoid timeouts
742+
override_config = {
743+
'request_timeout_ms': timeout * 1000,
744+
'max_in_flight_requests_per_connection': 5
745+
}
746+
stashed = {}
747+
for key in override_config:
748+
stashed[key] = self.config[key]
749+
self.config[key] = override_config[key]
746750

747751
# kafka kills the connection when it doesnt recognize an API request
748752
# so we can send a test request and then follow immediately with a
@@ -837,7 +841,8 @@ def connect():
837841
raise Errors.UnrecognizedBrokerVersion()
838842

839843
log.removeFilter(log_filter)
840-
self.config['request_timeout_ms'] = stashed_request_timeout_ms
844+
for key in stashed:
845+
self.config[key] = stashed[key]
841846
return version
842847

843848
def __repr__(self):

0 commit comments

Comments
 (0)