Skip to content

Raise NotNotReadyError from KafkaConsumer.poll after a retry timeout if the KafkaBroker is not available #1000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e7a42e8
hello trial
Nov 8, 2016
d51f8db
why coordinator_dead
Nov 8, 2016
089ee85
why coordinator_dead
Nov 8, 2016
dada08d
where coordinator_dead
Nov 9, 2016
8355743
poll check
Nov 9, 2016
9f69b93
reponse details
Nov 9, 2016
366f635
coordinator_id details
Nov 10, 2016
5e1d185
coordinator_id details
Nov 10, 2016
7bcdceb
fetch offset timing
Nov 10, 2016
bfb520e
fetch offset timing
Nov 10, 2016
6efb8e8
fetch offset timing
Nov 10, 2016
52b90d7
fetch offset timing
Nov 10, 2016
286c050
fetch offset timing
Nov 10, 2016
8a7eb82
fetch offset timing
Nov 10, 2016
5151652
Merge branch 'master' of https://github.com/dpkp/kafka-python
Feb 22, 2017
d185a42
retry count
Feb 22, 2017
653b1d8
logging of retry count
Feb 22, 2017
4a3ba3e
verify logging is true
Feb 22, 2017
6cbe299
more log
Feb 23, 2017
e94e4b2
more logs
Feb 23, 2017
d931068
cleanup
Feb 23, 2017
a0fbdb4
Node not ready
Feb 23, 2017
eb1e978
timeout
Feb 23, 2017
3287bb0
future
Feb 23, 2017
dfa70ee
responses
Feb 23, 2017
3945c44
retry
Feb 23, 2017
46e3347
retry and future None
Feb 23, 2017
384b149
raise in case of retries finished
Feb 23, 2017
e214040
producer send logs fatal
Feb 23, 2017
046cf5c
logging for producer
Feb 23, 2017
e6ed4b0
logging in sender
Feb 23, 2017
13a5683
more logging
Feb 23, 2017
46f321c
logging in _poll
Feb 23, 2017
2884ac4
log for maybe_connect
Feb 23, 2017
e76e578
revert
Feb 23, 2017
1373ce4
revert
Feb 23, 2017
cbf1cff
fatal log
Mar 1, 2017
3203413
config
Mar 1, 2017
55b5e9f
ms
Mar 1, 2017
b89a00f
correction
Mar 1, 2017
359f1e6
better log
Mar 1, 2017
5a04201
reduced timeout
Mar 1, 2017
9e74ae8
config parameter
Mar 1, 2017
c85fd45
consumer config
Mar 1, 2017
b03e1f1
consumer coordinator config
Mar 1, 2017
2453c25
final changes
Mar 1, 2017
6401520
node_not_ready_retry_timeout_ms
Mar 1, 2017
2b3211d
reverted empty line deletes
Mar 1, 2017
66577b6
Conflict resolution
zembunia Sep 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class KafkaConsumer(six.Iterator):
metrics. Default: 2
metrics_sample_window_ms (int): The maximum age in milliseconds of
samples used to compute metrics. Default: 30000
node_not_ready_retry_timeout_ms (int): The timeout used to detect
the broker not being available so that NodeNotReadyError is raised.
Default: None
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
Expand Down Expand Up @@ -269,6 +272,7 @@ class KafkaConsumer(six.Iterator):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
'node_not_ready_retry_timeout_ms': None,
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
'sasl_mechanism': None,
Expand Down
15 changes: 13 additions & 2 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BaseCoordinator(object):
'group_id': 'kafka-python-default-group',
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'node_not_ready_retry_timeout_ms': None,
'retry_backoff_ms': 100,
'api_version': (0, 9),
'metric_group_prefix': '',
Expand All @@ -65,7 +66,7 @@ def __init__(self, client, metrics, **configs):
partition assignment (if enabled), and to use for fetching and
committing offsets. Default: 'kafka-python-default-group'
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
using Kafka's group management facilities. Default: 30000
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
Expand All @@ -75,6 +76,9 @@ def __init__(self, client, metrics, **configs):
should be set no higher than 1/3 of that value. It can be
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
node_not_ready_retry_timeout_ms (int): The timeout used to detect
the broker not being available so that NodeNotReadyError is raised.
Default: None
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
"""
Expand Down Expand Up @@ -199,6 +203,8 @@ def ensure_coordinator_known(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
node_not_ready_retry_timeout_ms = self.config['node_not_ready_retry_timeout_ms']
node_not_ready_retry_start_time = time.time()
while self.coordinator_unknown():

# Prior to 0.8.2 there was no group coordinator
Expand All @@ -215,7 +221,12 @@ def ensure_coordinator_known(self):

if future.failed():
if future.retriable():
if getattr(future.exception, 'invalid_metadata', False):
if node_not_ready_retry_timeout_ms is not None and isinstance(future.exception, Errors.NodeNotReadyError):
self._client.poll(timeout_ms=node_not_ready_retry_timeout_ms)
node_not_ready_retry_timeout_ms -= (time.time() - node_not_ready_retry_start_time) * 1000
if node_not_ready_retry_timeout_ms <= 0:
raise future.exception # pylint: disable-msg=raising-bad-type
elif getattr(future.exception, 'invalid_metadata', False):
log.debug('Requesting metadata for group coordinator request: %s', future.exception)
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update)
Expand Down
8 changes: 6 additions & 2 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class ConsumerCoordinator(BaseCoordinator):
'retry_backoff_ms': 100,
'api_version': (0, 9),
'exclude_internal_topics': True,
'metric_group_prefix': 'consumer'
'metric_group_prefix': 'consumer',
'node_not_ready_retry_timeout_ms': None
}

def __init__(self, client, subscription, metrics, **configs):
Expand Down Expand Up @@ -68,13 +69,16 @@ def __init__(self, client, subscription, metrics, **configs):
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
using Kafka's group management facilities. Default: 30000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
exclude_internal_topics (bool): Whether records from internal topics
(such as offsets) should be exposed to the consumer. If set to
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
node_not_ready_retry_timeout_ms (int): The timeout used to detect
the broker not being available so that NodeNotReadyError is raised.
Default: None
"""
super(ConsumerCoordinator, self).__init__(client, metrics, **configs)

Expand Down