Skip to content
This repository was archived by the owner on Jul 11, 2025. It is now read-only.

Commit 127eb80

Browse files
committed
Avoid FD spike after retrying KafkaAdminClient
A caller might call kafka.KafkaAdminClient repeatedly and handle kafka.errors.NoBrokersAvailable if the broker is not available. However, each retry will cause 3 extra FD being used. Depends on how long the caller wait before retry, the FD usage can reach 300~700 before Python garbage collector collecting those FD. This commit close those FD early.
1 parent 31a6b92 commit 127eb80

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

kafka/admin/client.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -209,14 +209,20 @@ def __init__(self, **configs):
209209
metric_group_prefix='admin',
210210
**self.config
211211
)
212-
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
213-
214-
# Get auto-discovered version from client if necessary
215-
if self.config['api_version'] is None:
216-
self.config['api_version'] = self._client.config['api_version']
217-
218-
self._closed = False
219-
self._refresh_controller_id()
212+
try:
213+
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
214+
215+
# Get auto-discovered version from client if necessary
216+
if self.config['api_version'] is None:
217+
self.config['api_version'] = self._client.config['api_version']
218+
219+
self._closed = False
220+
self._refresh_controller_id()
221+
except Exception:
222+
self._metrics.close()
223+
self._client.close() # prevent FD leak
224+
self._closed = True
225+
raise
220226
log.debug("KafkaAdminClient started.")
221227

222228
def close(self):

0 commit comments

Comments
 (0)