diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b7fbd8395..e8d42b536 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -205,6 +205,9 @@ class KafkaConsumer(six.Iterator): metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker not being available so that NodeNotReadyError is raised. + Default: None selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector @@ -269,6 +272,7 @@ class KafkaConsumer(six.Iterator): 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'metric_group_prefix': 'consumer', + 'node_not_ready_retry_timeout_ms': None, 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, 'sasl_mechanism': None, diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index af0936c9d..e9f3986fb 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -53,6 +53,7 @@ class BaseCoordinator(object): 'group_id': 'kafka-python-default-group', 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, + 'node_not_ready_retry_timeout_ms': None, 'retry_backoff_ms': 100, 'api_version': (0, 9), 'metric_group_prefix': '', @@ -65,7 +66,7 @@ def __init__(self, client, metrics, **configs): partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 + using Kafka's group management facilities. Default: 30000 heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -75,6 +76,9 @@ def __init__(self, client, metrics, **configs): should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker not being available so that NodeNotReadyError is raised. + Default: None retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. """ @@ -199,6 +203,8 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ + node_not_ready_retry_timeout_ms = self.config['node_not_ready_retry_timeout_ms'] + node_not_ready_retry_start_time = time.time() while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -215,7 +221,12 @@ def ensure_coordinator_known(self): if future.failed(): if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): + if node_not_ready_retry_timeout_ms is not None and isinstance(future.exception, Errors.NodeNotReadyError): + self._client.poll(timeout_ms=node_not_ready_retry_timeout_ms) + node_not_ready_retry_timeout_ms -= (time.time() - node_not_ready_retry_start_time) * 1000 + if node_not_ready_retry_timeout_ms <= 0: + raise future.exception # pylint: disable-msg=raising-bad-type + elif getattr(future.exception, 'invalid_metadata', False): log.debug('Requesting metadata for group coordinator request: %s', future.exception) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 123699f24..addae0928 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -37,7 +37,8 @@ class ConsumerCoordinator(BaseCoordinator): 'retry_backoff_ms': 100, 'api_version': (0, 9), 'exclude_internal_topics': True, - 'metric_group_prefix': 'consumer' + 'metric_group_prefix': 'consumer', + 'node_not_ready_retry_timeout_ms': None } def __init__(self, client, subscription, metrics, **configs): @@ -68,13 +69,16 @@ def __init__(self, client, subscription, metrics, **configs): adjusted even lower to control the expected time for normal rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 + using Kafka's group management facilities. Default: 30000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. exclude_internal_topics (bool): Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker not being available so that NodeNotReadyError is raised. + Default: None """ super(ConsumerCoordinator, self).__init__(client, metrics, **configs)