From ef4721c1ba463f1ed0cd951fc304fccb77a648dd Mon Sep 17 00:00:00 2001 From: shimon-armis Date: Mon, 26 Jun 2023 14:18:39 +0300 Subject: [PATCH] client_async: Allow throwing an exception upon socket error during wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis --- kafka/client_async.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..581dbbf02 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,8 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception + upon socket error during wakeup(). Default: False """ DEFAULT_CONFIG = { @@ -192,7 +194,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'raise_upon_socket_err_during_wakeup': False } def __init__(self, **configs): @@ -243,6 +246,8 @@ def __init__(self, **configs): check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 self.config['api_version'] = self.check_version(timeout=check_timeout) + self._raise_upon_socket_err_during_wakeup = self.config['raise_upon_socket_err_during_wakeup'] + def _can_bootstrap(self): effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts backoff_factor = 2 ** effective_failures @@ -933,8 +938,10 @@ def wakeup(self): except socket.timeout: log.warning('Timeout to send to wakeup socket!') raise Errors.KafkaTimeoutError() - except socket.error: + except socket.error as e: log.warning('Unable to send to wakeup socket!') + if self._raise_upon_socket_err_during_wakeup: + raise e def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread