From e7a42e8beae646f0fefcf8a14df4aeb1fba805c2 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Tue, 8 Nov 2016 17:57:48 +0200 Subject: [PATCH 01/47] hello trial --- kafka/coordinator/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5f60aa321..12b097c71 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -516,7 +516,7 @@ def _handle_group_coordinator_response(self, future, response): def coordinator_dead(self, error=None): """Mark the current coordinator as dead.""" if self.coordinator_id is not None: - log.warning("Marking the coordinator dead (node %s) for group %s: %s.", + log.warning("ZZZZMarking the coordinator dead (node %s) for group %s: %s.", self.coordinator_id, self.group_id, error) self.coordinator_id = None From d51f8db0fe8777a888beceb970047a42552cd4c7 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Tue, 8 Nov 2016 18:03:46 +0200 Subject: [PATCH 02/47] why coordinator_dead --- kafka/coordinator/base.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 12b097c71..63892129c 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -190,6 +190,7 @@ def coordinator_unknown(self): return True if self._client.is_disconnected(self.coordinator_id): + log.warning("DISCONNECTED") self.coordinator_dead() return True @@ -311,6 +312,7 @@ def _failed_request(self, node_id, request, future, error): # unless the error is caused by internal client pipelining if not isinstance(error, (Errors.NodeNotReadyError, Errors.TooManyInFlightRequests)): + log.warning("NOT INSTANCE") self.coordinator_dead() future.failure(error) @@ -348,6 +350,7 @@ def _handle_join_group_response(self, future, send_time, response): elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): # re-discover the coordinator and retry with backoff + log.warning("JOIN GROUP") self.coordinator_dead() log.debug("Attempt to join group %s failed due to obsolete " "coordinator information: %s", self.group_id, @@ -516,7 +519,7 @@ def _handle_group_coordinator_response(self, future, response): def coordinator_dead(self, error=None): """Mark the current coordinator as dead.""" if self.coordinator_id is not None: - log.warning("ZZZZMarking the coordinator dead (node %s) for group %s: %s.", + log.warning("Marking the coordinator dead (node %s) for group %s: %s.", self.coordinator_id, self.group_id, error) self.coordinator_id = None From 089ee856366e5acbc045164d313528aa2f133624 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Tue, 8 Nov 2016 18:20:52 +0200 Subject: [PATCH 03/47] why coordinator_dead --- kafka/coordinator/base.py | 1 + kafka/coordinator/consumer.py | 1 + 2 files changed, 2 insertions(+) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 63892129c..162435e6e 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -451,6 +451,7 @@ def _handle_sync_group_response(self, future, send_time, response): Errors.NotCoordinatorForGroupError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) + log.warning("SYNCGROUP") self.coordinator_dead() future.failure(error) else: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a600cb471..c1a9c0ea1 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -630,6 +630,7 @@ def _handle_offset_fetch_response(self, future, response): future.failure(error) elif error_type is Errors.NotCoordinatorForGroupError: # re-discover the coordinator and retry + log.warning("OFFSET FETCH") self.coordinator_dead() future.failure(error) elif error_type in (Errors.UnknownMemberIdError, From dada08dbed3cefa31ccac87f544c808a6b3b2137 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 9 Nov 2016 14:34:20 +0200 Subject: [PATCH 04/47] where coordinator_dead --- kafka/consumer/group.py | 7 +++++++ kafka/coordinator/base.py | 4 ---- kafka/coordinator/consumer.py | 1 - 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 3ab68a7d3..7c1a5b84d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -290,9 +290,12 @@ def __init__(self, *topics, **configs): if self.config['api_version'] is None: self.config['api_version'] = self._client.config['api_version'] + log.warning("BEFORE SUBSCRIPTION STATE") self._subscription = SubscriptionState(self.config['auto_offset_reset']) + log.warning("BEFORE FETCHER") self._fetcher = Fetcher( self._client, self._subscription, self._metrics, **self.config) + log.warning("BEFORE CONSUMER COORDINATOR") self._coordinator = ConsumerCoordinator( self._client, self._subscription, self._metrics, assignors=self.config['partition_assignment_strategy'], @@ -301,9 +304,13 @@ def __init__(self, *topics, **configs): self._iterator = None self._consumer_timeout = float('inf') + log.warning("BEFORE IF TOPICS") if topics: + log.warning("BEFORE SUBSCRIPTION SUBSCRIBE") self._subscription.subscribe(topics=topics) + log.warning("BEFORE CLIENT SET TOPICS") self._client.set_topics(topics) + log.warning("AFTER IF TOPICS") def assign(self, partitions): """Manually assign a list of TopicPartitions to this consumer. diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 162435e6e..5f60aa321 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -190,7 +190,6 @@ def coordinator_unknown(self): return True if self._client.is_disconnected(self.coordinator_id): - log.warning("DISCONNECTED") self.coordinator_dead() return True @@ -312,7 +311,6 @@ def _failed_request(self, node_id, request, future, error): # unless the error is caused by internal client pipelining if not isinstance(error, (Errors.NodeNotReadyError, Errors.TooManyInFlightRequests)): - log.warning("NOT INSTANCE") self.coordinator_dead() future.failure(error) @@ -350,7 +348,6 @@ def _handle_join_group_response(self, future, send_time, response): elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): # re-discover the coordinator and retry with backoff - log.warning("JOIN GROUP") self.coordinator_dead() log.debug("Attempt to join group %s failed due to obsolete " "coordinator information: %s", self.group_id, @@ -451,7 +448,6 @@ def _handle_sync_group_response(self, future, send_time, response): Errors.NotCoordinatorForGroupError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - log.warning("SYNCGROUP") self.coordinator_dead() future.failure(error) else: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index c1a9c0ea1..a600cb471 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -630,7 +630,6 @@ def _handle_offset_fetch_response(self, future, response): future.failure(error) elif error_type is Errors.NotCoordinatorForGroupError: # re-discover the coordinator and retry - log.warning("OFFSET FETCH") self.coordinator_dead() future.failure(error) elif error_type in (Errors.UnknownMemberIdError, From 8355743d02e4b7b225e9a63d2604bbb3170d9960 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 9 Nov 2016 17:13:48 +0200 Subject: [PATCH 05/47] poll check --- kafka/consumer/group.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7c1a5b84d..2c4b9c1e1 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -290,12 +290,9 @@ def __init__(self, *topics, **configs): if self.config['api_version'] is None: self.config['api_version'] = self._client.config['api_version'] - log.warning("BEFORE SUBSCRIPTION STATE") self._subscription = SubscriptionState(self.config['auto_offset_reset']) - log.warning("BEFORE FETCHER") self._fetcher = Fetcher( self._client, self._subscription, self._metrics, **self.config) - log.warning("BEFORE CONSUMER COORDINATOR") self._coordinator = ConsumerCoordinator( self._client, self._subscription, self._metrics, assignors=self.config['partition_assignment_strategy'], @@ -304,13 +301,9 @@ def __init__(self, *topics, **configs): self._iterator = None self._consumer_timeout = float('inf') - log.warning("BEFORE IF TOPICS") if topics: - log.warning("BEFORE SUBSCRIPTION SUBSCRIBE") self._subscription.subscribe(topics=topics) - log.warning("BEFORE CLIENT SET TOPICS") self._client.set_topics(topics) - log.warning("AFTER IF TOPICS") def assign(self, partitions): """Manually assign a list of TopicPartitions to this consumer. @@ -522,6 +515,7 @@ def poll(self, timeout_ms=0, max_records=None): start = time.time() remaining = timeout_ms while True: + log.warning("PPPOOOOOOOOOOOOOOOOLLL ONCE") records = self._poll_once(remaining, max_records) if records: return records From 9f69b93c23b91bd3e8118afefd8474315e7b1809 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 9 Nov 2016 17:32:06 +0200 Subject: [PATCH 06/47] reponse details --- kafka/consumer/group.py | 1 - kafka/coordinator/consumer.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 2c4b9c1e1..3ab68a7d3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -515,7 +515,6 @@ def poll(self, timeout_ms=0, max_records=None): start = time.time() remaining = timeout_ms while True: - log.warning("PPPOOOOOOOOOOOOOOOOLLL ONCE") records = self._poll_once(remaining, max_records) if records: return records diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a600cb471..2b55c28b4 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -617,6 +617,7 @@ def _send_offset_fetch_request(self, partitions): def _handle_offset_fetch_response(self, future, response): offsets = {} + log.warning("DDDDDDDDDDDDD %s", response) for topic, partitions in response.topics: for partition, offset, metadata, error_code in partitions: tp = TopicPartition(topic, partition) From 366f6353a7e23c5d71253545bbafdf702de422ce Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 10 Nov 2016 09:55:01 +0200 Subject: [PATCH 07/47] coordinator_id details --- kafka/coordinator/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5f60aa321..00446aff5 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -208,8 +208,9 @@ def ensure_coordinator_known(self): self.coordinator_id = self._client.least_loaded_node() self._client.ready(self.coordinator_id) continue - + log.warning("AAAAAAAAPI VERSION OK %s", self.config['api_version']) future = self._send_group_coordinator_request() + log.warning("FUTURE %s", future) self._client.poll(future=future) if future.failed(): From 5e1d185acf0156727ecaf0d1852a13e214e2eea0 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 10 Nov 2016 10:25:41 +0200 Subject: [PATCH 08/47] coordinator_id details --- kafka/client_async.py | 3 ++- kafka/coordinator/base.py | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 0849c7bad..f545b63a4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -496,9 +496,9 @@ def poll(self, timeout_ms=None, future=None, sleep=True): task_future.failure(e) else: task_future.success(result) - # If we got a future that is already done, dont block in _poll if future and future.is_done: + log.warning("FUTURE IS done") timeout = 0 else: timeout = min( @@ -509,6 +509,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True): timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) + log.warning("RESPONSES %s", responses) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 00446aff5..eda40fad7 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -208,9 +208,7 @@ def ensure_coordinator_known(self): self.coordinator_id = self._client.least_loaded_node() self._client.ready(self.coordinator_id) continue - log.warning("AAAAAAAAPI VERSION OK %s", self.config['api_version']) future = self._send_group_coordinator_request() - log.warning("FUTURE %s", future) self._client.poll(future=future) if future.failed(): From 7bcdcebb0337ad32633ca7ea4cf0526268fceb8c Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 10 Nov 2016 11:06:13 +0200 Subject: [PATCH 09/47] fetch offset timing --- kafka/client_async.py | 2 -- kafka/coordinator/consumer.py | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index f545b63a4..fb332cb2a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -498,7 +498,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True): task_future.success(result) # If we got a future that is already done, dont block in _poll if future and future.is_done: - log.warning("FUTURE IS done") timeout = 0 else: timeout = min( @@ -509,7 +508,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True): timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) - log.warning("RESPONSES %s", responses) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 2b55c28b4..85cbbf708 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -315,7 +315,8 @@ def fetch_committed_offsets(self, partitions): # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) - self._client.poll(future=future) + time.sleep(0.1) + log.warning("FETCH COMMITTED OFFSET ", self._client.poll(future=future)) if future.succeeded(): return future.value From bfb520e7791b8677d25ae2f6fe5becd8c88ac910 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 10 Nov 2016 11:07:43 +0200 Subject: [PATCH 10/47] fetch offset timing --- kafka/coordinator/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 85cbbf708..c309a78c4 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -316,7 +316,7 @@ def fetch_committed_offsets(self, partitions): # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) time.sleep(0.1) - log.warning("FETCH COMMITTED OFFSET ", self._client.poll(future=future)) + log.warning("FETCH COMMITTED OFFSET %s", self._client.poll(future=future)) if future.succeeded(): return future.value From 6efb8e8928d19f86ccd81e106fcdbadf0b608295 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 10 Nov 2016 11:18:40 +0200 Subject: [PATCH 11/47] fetch offset timing --- kafka/client_async.py | 5 +++-- kafka/coordinator/base.py | 1 + kafka/coordinator/consumer.py | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index fb332cb2a..410d1f8df 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -508,7 +508,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True): timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) - + log.warning("RESPONSES %s", responses) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done if not future or future.is_done: @@ -692,6 +692,7 @@ def _maybe_refresh_metadata(self): timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms) if timeout == 0: + log.warning("MAY REFRESH METADATA") node_id = self.least_loaded_node() if node_id is None: log.debug("Give up sending metadata request since no node is available") @@ -784,7 +785,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): """ end = time.time() + timeout while time.time() < end: - + log.warning("CHECK VERSION") # It is possible that least_loaded_node falls back to bootstrap, # which can block for an increasing backoff period try_node = node_id or self.least_loaded_node() diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index eda40fad7..70f0e86b9 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -460,6 +460,7 @@ def _send_group_coordinator_request(self): Returns: Future: resolves to the node id of the coordinator """ + log.warning("SEND GROUP COORDINATOR REQ") node_id = self._client.least_loaded_node() if node_id is None: return Future().failure(Errors.NoBrokersAvailable()) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index c309a78c4..acc4abc76 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -310,12 +310,13 @@ def fetch_committed_offsets(self, partitions): if not partitions: return {} + time.sleep(10) + while True: self.ensure_coordinator_known() # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) - time.sleep(0.1) log.warning("FETCH COMMITTED OFFSET %s", self._client.poll(future=future)) if future.succeeded(): From 52b90d79f5f8ec65a82d9d702173ffb75337a328 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 10 Nov 2016 11:28:42 +0200 Subject: [PATCH 12/47] fetch offset timing --- kafka/client_async.py | 2 -- kafka/coordinator/base.py | 2 +- kafka/coordinator/consumer.py | 4 +--- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 410d1f8df..401966321 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -692,7 +692,6 @@ def _maybe_refresh_metadata(self): timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms) if timeout == 0: - log.warning("MAY REFRESH METADATA") node_id = self.least_loaded_node() if node_id is None: log.debug("Give up sending metadata request since no node is available") @@ -785,7 +784,6 @@ def check_version(self, node_id=None, timeout=2, strict=False): """ end = time.time() + timeout while time.time() < end: - log.warning("CHECK VERSION") # It is possible that least_loaded_node falls back to bootstrap, # which can block for an increasing backoff period try_node = node_id or self.least_loaded_node() diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 70f0e86b9..b48b70677 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -460,7 +460,7 @@ def _send_group_coordinator_request(self): Returns: Future: resolves to the node id of the coordinator """ - log.warning("SEND GROUP COORDINATOR REQ") + node_id = self._client.least_loaded_node() if node_id is None: return Future().failure(Errors.NoBrokersAvailable()) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index acc4abc76..62b9201ca 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -310,11 +310,9 @@ def fetch_committed_offsets(self, partitions): if not partitions: return {} - time.sleep(10) - while True: self.ensure_coordinator_known() - + time.sleep(5) # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) log.warning("FETCH COMMITTED OFFSET %s", self._client.poll(future=future)) From 286c0507c404db68e65d876f5eeddbf4911bbf89 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 10 Nov 2016 11:37:19 +0200 Subject: [PATCH 13/47] fetch offset timing --- kafka/client_async.py | 1 - kafka/coordinator/consumer.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 401966321..6d378d4e5 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -508,7 +508,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True): timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) - log.warning("RESPONSES %s", responses) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done if not future or future.is_done: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 62b9201ca..8e6a8206c 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -312,10 +312,8 @@ def fetch_committed_offsets(self, partitions): while True: self.ensure_coordinator_known() - time.sleep(5) # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) - log.warning("FETCH COMMITTED OFFSET %s", self._client.poll(future=future)) if future.succeeded(): return future.value @@ -617,7 +615,6 @@ def _send_offset_fetch_request(self, partitions): def _handle_offset_fetch_response(self, future, response): offsets = {} - log.warning("DDDDDDDDDDDDD %s", response) for topic, partitions in response.topics: for partition, offset, metadata, error_code in partitions: tp = TopicPartition(topic, partition) From 8a7eb82243357d8d134ec6593ff78240bede258c Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 10 Nov 2016 11:46:19 +0200 Subject: [PATCH 14/47] fetch offset timing --- kafka/coordinator/base.py | 1 - kafka/coordinator/consumer.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b48b70677..eda40fad7 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -460,7 +460,6 @@ def _send_group_coordinator_request(self): Returns: Future: resolves to the node id of the coordinator """ - node_id = self._client.least_loaded_node() if node_id is None: return Future().failure(Errors.NoBrokersAvailable()) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 8e6a8206c..5fc59af2a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -314,6 +314,7 @@ def fetch_committed_offsets(self, partitions): self.ensure_coordinator_known() # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) + self._client.poll(future=future) if future.succeeded(): return future.value From d185a4213459cd98ad5a3b4de1cbcb0be7fabb47 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 22 Feb 2017 18:30:45 +0200 Subject: [PATCH 15/47] retry count --- kafka/coordinator/base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 29568085f..d8028bb7d 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -199,6 +199,8 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ + num_retries = 3 + retry_count = 0 while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -215,7 +217,11 @@ def ensure_coordinator_known(self): if isinstance(future.exception, Errors.GroupCoordinatorNotAvailableError): continue - elif future.retriable(): + elif isinstance(future.exception, Errors.NoBrokersAvailable): + if num_retries == retry_count: + break + retry_count += 1 + if future.retriable(): metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) else: From 653b1d88439a1acbac700e17820d45ac6b4e78e0 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 22 Feb 2017 19:15:13 +0200 Subject: [PATCH 16/47] logging of retry count --- kafka/coordinator/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index d8028bb7d..1a93d584e 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -217,7 +217,8 @@ def ensure_coordinator_known(self): if isinstance(future.exception, Errors.GroupCoordinatorNotAvailableError): continue - elif isinstance(future.exception, Errors.NoBrokersAvailable): + if isinstance(future.exception, Errors.NoBrokersAvailable): + log.error('NO BROKERS AVAILABLE retry count %d', retry_count) if num_retries == retry_count: break retry_count += 1 From 4a3ba3ee91f5811c39423590851c2d8c6397fe63 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 22 Feb 2017 19:25:28 +0200 Subject: [PATCH 17/47] verify logging is true --- kafka/coordinator/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 1a93d584e..c511dc688 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -217,6 +217,7 @@ def ensure_coordinator_known(self): if isinstance(future.exception, Errors.GroupCoordinatorNotAvailableError): continue + log.error('FUTURE EXCEPTION %s', future.exception) if isinstance(future.exception, Errors.NoBrokersAvailable): log.error('NO BROKERS AVAILABLE retry count %d', retry_count) if num_retries == retry_count: From 6cbe2992a85e598133db4d933c833fd79734828d Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 12:43:20 +0200 Subject: [PATCH 18/47] more log --- kafka/coordinator/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c511dc688..03e622a52 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -224,9 +224,11 @@ def ensure_coordinator_known(self): break retry_count += 1 if future.retriable(): + log.error('FUTURE RETRIABLE %s', future.exception) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) else: + log.error('FUTURE RAISE %s', future.exception) raise future.exception # pylint: disable-msg=raising-bad-type def need_rejoin(self): From e94e4b2897a44bfcea337170c750d1660d4c4517 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 12:50:23 +0200 Subject: [PATCH 19/47] more logs --- kafka/coordinator/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 03e622a52..de538fdbd 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -186,13 +186,16 @@ def coordinator_unknown(self): Returns: bool: True if the coordinator is unknown """ + log.error('COORDINATOR UNKNOWN 1 %s', self.coordinator_id) if self.coordinator_id is None: return True + log.error('COORDINATOR UNKNOWN 2 %s', self.coordinator_id) if self._client.is_disconnected(self.coordinator_id): self.coordinator_dead('Node Disconnected') return True + log.error('COORDINATOR UNKNOWN 3 %s', self.coordinator_id) return False def ensure_coordinator_known(self): From d931068b328e67f9e567519b48d3fdec374b2ffb Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 13:01:23 +0200 Subject: [PATCH 20/47] cleanup --- kafka/coordinator/base.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index de538fdbd..2d4c926c6 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -186,24 +186,19 @@ def coordinator_unknown(self): Returns: bool: True if the coordinator is unknown """ - log.error('COORDINATOR UNKNOWN 1 %s', self.coordinator_id) if self.coordinator_id is None: return True - log.error('COORDINATOR UNKNOWN 2 %s', self.coordinator_id) if self._client.is_disconnected(self.coordinator_id): self.coordinator_dead('Node Disconnected') return True - log.error('COORDINATOR UNKNOWN 3 %s', self.coordinator_id) return False def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - num_retries = 3 - retry_count = 0 while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -220,18 +215,11 @@ def ensure_coordinator_known(self): if isinstance(future.exception, Errors.GroupCoordinatorNotAvailableError): continue - log.error('FUTURE EXCEPTION %s', future.exception) - if isinstance(future.exception, Errors.NoBrokersAvailable): - log.error('NO BROKERS AVAILABLE retry count %d', retry_count) - if num_retries == retry_count: - break - retry_count += 1 if future.retriable(): log.error('FUTURE RETRIABLE %s', future.exception) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) else: - log.error('FUTURE RAISE %s', future.exception) raise future.exception # pylint: disable-msg=raising-bad-type def need_rejoin(self): From a0fbdb4ba7dcaa11c2f2e5effbd7f0c39e87b18c Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 13:05:16 +0200 Subject: [PATCH 21/47] Node not ready --- kafka/coordinator/base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 2d4c926c6..fe21bc7c6 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -216,9 +216,11 @@ def ensure_coordinator_known(self): Errors.GroupCoordinatorNotAvailableError): continue if future.retriable(): - log.error('FUTURE RETRIABLE %s', future.exception) - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) + if isinstance(future.exception, Errors.NodeNotReadyError): + self._client.poll() + else: + metadata_update = self._client.cluster.request_update() + self._client.poll(future=metadata_update) else: raise future.exception # pylint: disable-msg=raising-bad-type From eb1e978f05d5823eb07e106116a9f19bd9fdbefe Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 13:11:14 +0200 Subject: [PATCH 22/47] timeout --- kafka/client_async.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index f09f12653..055cf5672 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -509,6 +509,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts + log.error("TIMEOUT %d", timeout) responses.extend(self._poll(timeout, sleep=sleep)) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done From 3287bb0a7ef8af0bfcfdb22194ac12d521c06aff Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 13:14:06 +0200 Subject: [PATCH 23/47] future --- kafka/client_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 055cf5672..d4803edb7 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -509,7 +509,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts - log.error("TIMEOUT %d", timeout) + log.error("TIMEOUT %d FUTURE %s", timeout, future) responses.extend(self._poll(timeout, sleep=sleep)) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done From dfa70ee135b79b000554cabadf11a551f74e448a Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 13:18:11 +0200 Subject: [PATCH 24/47] responses --- kafka/client_async.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index d4803edb7..ac0438b02 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -514,6 +514,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done if not future or future.is_done: + log.error("BREAK TIMEOUT %d FUTURE %s RESPONSES %s", timeout, future, responses) break return responses From 3945c44eab536c3b6769b39e32075e25d73efc70 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 13:23:57 +0200 Subject: [PATCH 25/47] retry --- kafka/coordinator/base.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index fe21bc7c6..cfc84375b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -199,6 +199,8 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ + num_retries = 3 + retry_count = 0 while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -218,6 +220,11 @@ def ensure_coordinator_known(self): if future.retriable(): if isinstance(future.exception, Errors.NodeNotReadyError): self._client.poll() + log.error("RETURNED FROM POLL %d", retry_count) + if num_retries == retry_count: + log.error("BREAK ENSURE_COORDINATOR_KNOWN") + break + retry_count += 1 else: metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) From 46e3347e1ecd864b3f3f704d6b9daa211e05e9f5 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 13:29:55 +0200 Subject: [PATCH 26/47] retry and future None --- kafka/client_async.py | 2 -- kafka/coordinator/base.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index ac0438b02..f09f12653 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -509,12 +509,10 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts - log.error("TIMEOUT %d FUTURE %s", timeout, future) responses.extend(self._poll(timeout, sleep=sleep)) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done if not future or future.is_done: - log.error("BREAK TIMEOUT %d FUTURE %s RESPONSES %s", timeout, future, responses) break return responses diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index cfc84375b..106750246 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -220,9 +220,7 @@ def ensure_coordinator_known(self): if future.retriable(): if isinstance(future.exception, Errors.NodeNotReadyError): self._client.poll() - log.error("RETURNED FROM POLL %d", retry_count) if num_retries == retry_count: - log.error("BREAK ENSURE_COORDINATOR_KNOWN") break retry_count += 1 else: From 384b1494e0f51d8db8a98ad1dec7ac427cc3c57f Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 13:46:12 +0200 Subject: [PATCH 27/47] raise in case of retries finished --- kafka/coordinator/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 106750246..2ee2bd991 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -221,7 +221,7 @@ def ensure_coordinator_known(self): if isinstance(future.exception, Errors.NodeNotReadyError): self._client.poll() if num_retries == retry_count: - break + raise future.exception # pylint: disable-msg=raising-bad-type retry_count += 1 else: metadata_update = self._client.cluster.request_update() From e21404041326fd9f026f3c293f882904c821a1e6 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 15:19:19 +0200 Subject: [PATCH 28/47] producer send logs fatal --- kafka/producer/kafka.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 98d442699..ecf1cf3c0 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -505,13 +505,13 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): tp = TopicPartition(topic, partition) if timestamp_ms is None: timestamp_ms = int(time.time() * 1000) - log.debug("Sending (key=%s value=%s) to %s", key, value, tp) + log.fatal("Sending (key=%s value=%s) to %s", key, value, tp) result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, self.config['max_block_ms']) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: - log.debug("Waking up the sender since %s is either full or" + log.fatal("Waking up the sender since %s is either full or" " getting a new batch", tp) self._sender.wakeup() @@ -524,7 +524,7 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): except AssertionError: raise except Exception as e: - log.debug("Exception occurred during message send: %s", e) + log.fatal("Exception occurred during message send: %s", e) return FutureRecordMetadata( FutureProduceResult(TopicPartition(topic, partition)), -1, None, None, From 046cf5c335cf31b581b2d187843e1ea8114d3f1e Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 17:01:38 +0200 Subject: [PATCH 29/47] logging for producer --- kafka/client_async.py | 1 + kafka/coordinator/base.py | 1 + 2 files changed, 2 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index f09f12653..077b83e73 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -485,6 +485,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): self._maybe_connect(node_id) # Send a metadata request if needed + log.error("POLLLING") metadata_timeout_ms = self._maybe_refresh_metadata() # Send scheduled tasks diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 2ee2bd991..989072ad6 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -218,6 +218,7 @@ def ensure_coordinator_known(self): Errors.GroupCoordinatorNotAvailableError): continue if future.retriable(): + log.error("FUTURE RETRIABLE %s", future.exception) if isinstance(future.exception, Errors.NodeNotReadyError): self._client.poll() if num_retries == retry_count: From e6ed4b0fad4087063f3fc0a42566f430795b85d9 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 17:12:39 +0200 Subject: [PATCH 30/47] logging in sender --- kafka/producer/sender.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 2974faf98..85407654b 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -156,6 +156,7 @@ def run_once(self): # difference between now and its linger expiry time; otherwise the # select time will be the time difference between now and the # metadata expiry time + log.error("SENDER.RUN_ONCE") self._client.poll(poll_timeout_ms, sleep=True) def initiate_close(self): From 13a568371fbaa03d256837696665573300e70e13 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 17:28:24 +0200 Subject: [PATCH 31/47] more logging --- kafka/client_async.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 077b83e73..afce2ba29 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -485,7 +485,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): self._maybe_connect(node_id) # Send a metadata request if needed - log.error("POLLLING") metadata_timeout_ms = self._maybe_refresh_metadata() # Send scheduled tasks @@ -511,6 +510,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) + log.error("POLLLING %s", responses) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done if not future or future.is_done: @@ -704,12 +704,14 @@ def _maybe_refresh_metadata(self): if timeout == 0: node_id = self.least_loaded_node() if node_id is None: + log.error("GNNNNNNNNNNNNNNNNNNODE IS NONE") log.debug("Give up sending metadata request since no node is available") # mark the timestamp for no node available to connect self._last_no_node_available_ms = time.time() * 1000 return timeout if self._can_send_request(node_id): + log.error("XXXXXXXXXXX_can_send_requeste") topics = list(self._topics) if self.cluster.need_all_topic_metadata or not topics: topics = [] if self.config['api_version'] < (0, 10) else None @@ -727,6 +729,7 @@ def refresh_done(val_or_error): future.add_errback(refresh_done) elif self._can_connect(node_id): + log.error("GNNNNNNNNNNNNNNNNNNO_can_connect") log.debug("Initializing connection to node %s for metadata request", node_id) self._maybe_connect(node_id) # If initiateConnect failed immediately, this node will be put into blackout and we @@ -737,6 +740,7 @@ def refresh_done(val_or_error): # connected, but can't send more OR connecting # In either case, we just need to wait for a network event to let us know the selected # connection might be usable again. + log.error("######################################") self._last_no_node_available_ms = time.time() * 1000 return timeout From 46f321ca2e28830ed0eb36b94e1d4b9586a77fa8 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 17:37:01 +0200 Subject: [PATCH 32/47] logging in _poll --- kafka/client_async.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index afce2ba29..2399e0ccd 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -510,7 +510,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) - log.error("POLLLING %s", responses) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done if not future or future.is_done: @@ -526,7 +525,9 @@ def _poll(self, timeout, sleep=True): processed = set() start_select = time.time() + log.error("POLLING 1") ready = self._selector.select(timeout) + log.error("POLLING 2 %s", ready) end_select = time.time() if self._sensors: self._sensors.select_time.record((end_select - start_select) * 1000000000) @@ -704,14 +705,12 @@ def _maybe_refresh_metadata(self): if timeout == 0: node_id = self.least_loaded_node() if node_id is None: - log.error("GNNNNNNNNNNNNNNNNNNODE IS NONE") log.debug("Give up sending metadata request since no node is available") # mark the timestamp for no node available to connect self._last_no_node_available_ms = time.time() * 1000 return timeout if self._can_send_request(node_id): - log.error("XXXXXXXXXXX_can_send_requeste") topics = list(self._topics) if self.cluster.need_all_topic_metadata or not topics: topics = [] if self.config['api_version'] < (0, 10) else None @@ -729,7 +728,6 @@ def refresh_done(val_or_error): future.add_errback(refresh_done) elif self._can_connect(node_id): - log.error("GNNNNNNNNNNNNNNNNNNO_can_connect") log.debug("Initializing connection to node %s for metadata request", node_id) self._maybe_connect(node_id) # If initiateConnect failed immediately, this node will be put into blackout and we @@ -740,7 +738,6 @@ def refresh_done(val_or_error): # connected, but can't send more OR connecting # In either case, we just need to wait for a network event to let us know the selected # connection might be usable again. - log.error("######################################") self._last_no_node_available_ms = time.time() * 1000 return timeout From 2884ac46150f856d612d8cc9553409bc96476209 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 17:47:21 +0200 Subject: [PATCH 33/47] log for maybe_connect --- kafka/client_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 2399e0ccd..1bc757c03 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -482,7 +482,9 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): # Attempt to complete pending connections for node_id in list(self._connecting): + log.error("MAYBE CON 1 %s", node_id) self._maybe_connect(node_id) + log.error("MAYBE CON 2 %s", node_id) # Send a metadata request if needed metadata_timeout_ms = self._maybe_refresh_metadata() @@ -525,9 +527,7 @@ def _poll(self, timeout, sleep=True): processed = set() start_select = time.time() - log.error("POLLING 1") ready = self._selector.select(timeout) - log.error("POLLING 2 %s", ready) end_select = time.time() if self._sensors: self._sensors.select_time.record((end_select - start_select) * 1000000000) From e76e57831c7c8094db3bbcb671821af8eb45c7ef Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 19:28:22 +0200 Subject: [PATCH 34/47] revert --- kafka/client_async.py | 2 -- kafka/coordinator/base.py | 1 - kafka/producer/sender.py | 1 - 3 files changed, 4 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 1bc757c03..f09f12653 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -482,9 +482,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): # Attempt to complete pending connections for node_id in list(self._connecting): - log.error("MAYBE CON 1 %s", node_id) self._maybe_connect(node_id) - log.error("MAYBE CON 2 %s", node_id) # Send a metadata request if needed metadata_timeout_ms = self._maybe_refresh_metadata() diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 989072ad6..2ee2bd991 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -218,7 +218,6 @@ def ensure_coordinator_known(self): Errors.GroupCoordinatorNotAvailableError): continue if future.retriable(): - log.error("FUTURE RETRIABLE %s", future.exception) if isinstance(future.exception, Errors.NodeNotReadyError): self._client.poll() if num_retries == retry_count: diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 85407654b..2974faf98 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -156,7 +156,6 @@ def run_once(self): # difference between now and its linger expiry time; otherwise the # select time will be the time difference between now and the # metadata expiry time - log.error("SENDER.RUN_ONCE") self._client.poll(poll_timeout_ms, sleep=True) def initiate_close(self): From 1373ce46fa448e6d0030048b5d9ca53f69690932 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Thu, 23 Feb 2017 19:31:41 +0200 Subject: [PATCH 35/47] revert --- kafka/producer/kafka.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index ecf1cf3c0..98d442699 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -505,13 +505,13 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): tp = TopicPartition(topic, partition) if timestamp_ms is None: timestamp_ms = int(time.time() * 1000) - log.fatal("Sending (key=%s value=%s) to %s", key, value, tp) + log.debug("Sending (key=%s value=%s) to %s", key, value, tp) result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, self.config['max_block_ms']) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: - log.fatal("Waking up the sender since %s is either full or" + log.debug("Waking up the sender since %s is either full or" " getting a new batch", tp) self._sender.wakeup() @@ -524,7 +524,7 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): except AssertionError: raise except Exception as e: - log.fatal("Exception occurred during message send: %s", e) + log.debug("Exception occurred during message send: %s", e) return FutureRecordMetadata( FutureProduceResult(TopicPartition(topic, partition)), -1, None, None, From cbf1cff86bb7ac253f5244c8cbf5432c94d8ed9f Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 10:57:21 +0200 Subject: [PATCH 36/47] fatal log --- kafka/coordinator/base.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 2ee2bd991..c97c71a24 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -199,8 +199,7 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - num_retries = 3 - retry_count = 0 + node_not_ready_retry_timeout_in_secs = self.config['request_timeout_ms'] while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -219,10 +218,14 @@ def ensure_coordinator_known(self): continue if future.retriable(): if isinstance(future.exception, Errors.NodeNotReadyError): - self._client.poll() - if num_retries == retry_count: + node_not_ready_retry_start_time = time.time() + self._client.poll(timeout_ms=node_not_ready_retry_timeout_in_secs) + node_not_ready_retry_end_time = time.time() + log.fatal("ST %d ET %d\n" % (node_not_ready_retry_start_time, node_not_ready_retry_end_time)) + node_not_ready_retry_timeout_in_secs -=\ + node_not_ready_retry_end_time - node_not_ready_retry_start_time + if node_not_ready_retry_timeout_in_secs <= 0: raise future.exception # pylint: disable-msg=raising-bad-type - retry_count += 1 else: metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) From 32034132fc9832578b11dbfae7e1efa62b9bcdff Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 11:04:28 +0200 Subject: [PATCH 37/47] config --- kafka/coordinator/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c97c71a24..e152fdb41 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -199,7 +199,9 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - node_not_ready_retry_timeout_in_secs = self.config['request_timeout_ms'] + for key in self.config: + log.fatal("CONFIG %s: %s\n" % (key, str(self.config[key]))) + node_not_ready_retry_timeout_in_secs = 40 while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator From 55b5e9f1adf63f4794e9b2beef21cdb71c44a6ca Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 11:09:46 +0200 Subject: [PATCH 38/47] ms --- kafka/coordinator/base.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e152fdb41..1bdb993c4 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -199,9 +199,7 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - for key in self.config: - log.fatal("CONFIG %s: %s\n" % (key, str(self.config[key]))) - node_not_ready_retry_timeout_in_secs = 40 + node_not_ready_retry_timeout_ms = 40000 while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -221,12 +219,12 @@ def ensure_coordinator_known(self): if future.retriable(): if isinstance(future.exception, Errors.NodeNotReadyError): node_not_ready_retry_start_time = time.time() - self._client.poll(timeout_ms=node_not_ready_retry_timeout_in_secs) + self._client.poll(timeout_ms=node_not_ready_retry_timeout_in_ms) node_not_ready_retry_end_time = time.time() log.fatal("ST %d ET %d\n" % (node_not_ready_retry_start_time, node_not_ready_retry_end_time)) - node_not_ready_retry_timeout_in_secs -=\ - node_not_ready_retry_end_time - node_not_ready_retry_start_time - if node_not_ready_retry_timeout_in_secs <= 0: + node_not_ready_retry_timeout_in_ms -=\ + (node_not_ready_retry_end_time - node_not_ready_retry_start_time) * 1000 + if node_not_ready_retry_timeout_in_ms <= 0: raise future.exception # pylint: disable-msg=raising-bad-type else: metadata_update = self._client.cluster.request_update() From b89a00ff7a68ab4bc6ce6ef7f390343487591f8f Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 11:15:34 +0200 Subject: [PATCH 39/47] correction --- kafka/coordinator/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 1bdb993c4..561af7b7a 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -219,12 +219,12 @@ def ensure_coordinator_known(self): if future.retriable(): if isinstance(future.exception, Errors.NodeNotReadyError): node_not_ready_retry_start_time = time.time() - self._client.poll(timeout_ms=node_not_ready_retry_timeout_in_ms) + self._client.poll(timeout_ms=node_not_ready_retry_timeout_ms) node_not_ready_retry_end_time = time.time() log.fatal("ST %d ET %d\n" % (node_not_ready_retry_start_time, node_not_ready_retry_end_time)) - node_not_ready_retry_timeout_in_ms -=\ + node_not_ready_retry_timeout_ms -=\ (node_not_ready_retry_end_time - node_not_ready_retry_start_time) * 1000 - if node_not_ready_retry_timeout_in_ms <= 0: + if node_not_ready_retry_timeout_ms <= 0: raise future.exception # pylint: disable-msg=raising-bad-type else: metadata_update = self._client.cluster.request_update() From 359f1e6e4a03a2348adf65a230f3932adadbf94f Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 11:21:02 +0200 Subject: [PATCH 40/47] better log --- kafka/coordinator/base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 561af7b7a..84b16b4ce 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -199,7 +199,7 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - node_not_ready_retry_timeout_ms = 40000 + node_not_ready_retry_timeout_ms = 4000 while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -221,9 +221,10 @@ def ensure_coordinator_known(self): node_not_ready_retry_start_time = time.time() self._client.poll(timeout_ms=node_not_ready_retry_timeout_ms) node_not_ready_retry_end_time = time.time() - log.fatal("ST %d ET %d\n" % (node_not_ready_retry_start_time, node_not_ready_retry_end_time)) node_not_ready_retry_timeout_ms -=\ (node_not_ready_retry_end_time - node_not_ready_retry_start_time) * 1000 + log.fatal("RT %d ST %.2f ET %.2f\n" % (node_not_ready_retry_timeout_ms, + node_not_ready_retry_start_time, node_not_ready_retry_end_time)) if node_not_ready_retry_timeout_ms <= 0: raise future.exception # pylint: disable-msg=raising-bad-type else: From 5a04201b43e9c7d43e1522184b439e62c099ba73 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 11:26:00 +0200 Subject: [PATCH 41/47] reduced timeout --- kafka/coordinator/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 84b16b4ce..1a587583b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -199,7 +199,7 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - node_not_ready_retry_timeout_ms = 4000 + node_not_ready_retry_timeout_ms = 500 while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator From 9e74ae8fe3a4bf57901bcee91b92be8d0cad71f5 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 13:30:25 +0200 Subject: [PATCH 42/47] config parameter --- kafka/coordinator/base.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 1a587583b..0a1cf6eba 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -53,6 +53,7 @@ class BaseCoordinator(object): 'group_id': 'kafka-python-default-group', 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, + 'node_not_ready_retry_timeout_ms': None, 'retry_backoff_ms': 100, 'api_version': (0, 9), 'metric_group_prefix': '', @@ -75,6 +76,9 @@ def __init__(self, client, metrics, **configs): should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker no being available so that NodeNotReadyError is raised. + Default: None retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. """ @@ -199,7 +203,9 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - node_not_ready_retry_timeout_ms = 500 + node_not_ready_retry_timeout_ms = self.config['node_not_ready_retry_timeout_ms'] + log.fatal("TIMEOUT %s", node_not_ready_retry_timeout_ms) + node_not_ready_retry_start_time = time.time() while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -217,14 +223,10 @@ def ensure_coordinator_known(self): Errors.GroupCoordinatorNotAvailableError): continue if future.retriable(): - if isinstance(future.exception, Errors.NodeNotReadyError): - node_not_ready_retry_start_time = time.time() + if node_not_ready_retry_timeout_ms is not None and isinstance(future.exception, Errors.NodeNotReadyError): self._client.poll(timeout_ms=node_not_ready_retry_timeout_ms) - node_not_ready_retry_end_time = time.time() - node_not_ready_retry_timeout_ms -=\ - (node_not_ready_retry_end_time - node_not_ready_retry_start_time) * 1000 - log.fatal("RT %d ST %.2f ET %.2f\n" % (node_not_ready_retry_timeout_ms, - node_not_ready_retry_start_time, node_not_ready_retry_end_time)) + node_not_ready_retry_timeout_ms -= (time.time() - node_not_ready_retry_start_time) * 1000 + log.fatal("TIMEOUT %s", node_not_ready_retry_timeout_ms) if node_not_ready_retry_timeout_ms <= 0: raise future.exception # pylint: disable-msg=raising-bad-type else: From c85fd45f49a7a9728bebdd0a82234680e65baa90 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 13:35:55 +0200 Subject: [PATCH 43/47] consumer config --- kafka/consumer/group.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 47c721ff3..05ca56275 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -224,6 +224,7 @@ class KafkaConsumer(six.Iterator): 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, 'max_poll_records': 500, + 'node_not_ready_retry_timeout_ms': None, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], From b03e1f1234cd34acce5eb2262edc962500e75883 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 13:49:20 +0200 Subject: [PATCH 44/47] consumer coordinator config --- kafka/coordinator/consumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 142eaee76..27fd1d7bc 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -37,7 +37,8 @@ class ConsumerCoordinator(BaseCoordinator): 'retry_backoff_ms': 100, 'api_version': (0, 9), 'exclude_internal_topics': True, - 'metric_group_prefix': 'consumer' + 'metric_group_prefix': 'consumer', + 'node_not_ready_retry_timeout_ms': None } def __init__(self, client, subscription, metrics, **configs): From 2453c2504d661d35e7e9d28fd90d1fa6f6c33192 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 14:35:43 +0200 Subject: [PATCH 45/47] final changes --- kafka/consumer/group.py | 5 ++++- kafka/coordinator/base.py | 2 -- kafka/coordinator/consumer.py | 3 +++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 05ca56275..728dcf36d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -182,6 +182,9 @@ class KafkaConsumer(six.Iterator): metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker no being available so that NodeNotReadyError is raised. + Default: None selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector @@ -224,7 +227,6 @@ class KafkaConsumer(six.Iterator): 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, 'max_poll_records': 500, - 'node_not_ready_retry_timeout_ms': None, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], @@ -245,6 +247,7 @@ class KafkaConsumer(six.Iterator): 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'metric_group_prefix': 'consumer', + 'node_not_ready_retry_timeout_ms': None, 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, 'sasl_mechanism': None, diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 0a1cf6eba..7fb39d033 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -204,7 +204,6 @@ def ensure_coordinator_known(self): (and we have an active connection -- java client uses unsent queue). """ node_not_ready_retry_timeout_ms = self.config['node_not_ready_retry_timeout_ms'] - log.fatal("TIMEOUT %s", node_not_ready_retry_timeout_ms) node_not_ready_retry_start_time = time.time() while self.coordinator_unknown(): @@ -226,7 +225,6 @@ def ensure_coordinator_known(self): if node_not_ready_retry_timeout_ms is not None and isinstance(future.exception, Errors.NodeNotReadyError): self._client.poll(timeout_ms=node_not_ready_retry_timeout_ms) node_not_ready_retry_timeout_ms -= (time.time() - node_not_ready_retry_start_time) * 1000 - log.fatal("TIMEOUT %s", node_not_ready_retry_timeout_ms) if node_not_ready_retry_timeout_ms <= 0: raise future.exception # pylint: disable-msg=raising-bad-type else: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 27fd1d7bc..40ac74209 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -76,6 +76,9 @@ def __init__(self, client, subscription, metrics, **configs): (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker no being available so that NodeNotReadyError is raised. + Default: None """ super(ConsumerCoordinator, self).__init__(client, metrics, **configs) From 6401520edda80dbf20ef680d3683eb1719e823ea Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 16:08:08 +0200 Subject: [PATCH 46/47] node_not_ready_retry_timeout_ms --- kafka/client_async.py | 2 ++ kafka/consumer/group.py | 2 +- kafka/coordinator/base.py | 4 ++-- kafka/coordinator/consumer.py | 4 ++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index f09f12653..1513f3928 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -510,6 +510,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) + # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done if not future or future.is_done: @@ -786,6 +787,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): """ end = time.time() + timeout while time.time() < end: + # It is possible that least_loaded_node falls back to bootstrap, # which can block for an increasing backoff period try_node = node_id or self.least_loaded_node() diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 728dcf36d..f7db38072 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -183,7 +183,7 @@ class KafkaConsumer(six.Iterator): metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 node_not_ready_retry_timeout_ms (int): The timeout used to detect - the broker no being available so that NodeNotReadyError is raised. + the broker not being available so that NodeNotReadyError is raised. Default: None selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 7fb39d033..57dda03e0 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -66,7 +66,7 @@ def __init__(self, client, metrics, **configs): partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 + using Kafka's group management facilities. Default: 30000 heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -77,7 +77,7 @@ def __init__(self, client, metrics, **configs): adjusted even lower to control the expected time for normal rebalances. Default: 3000 node_not_ready_retry_timeout_ms (int): The timeout used to detect - the broker no being available so that NodeNotReadyError is raised. + the broker not being available so that NodeNotReadyError is raised. Default: None retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 40ac74209..7ec922b83 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -69,7 +69,7 @@ def __init__(self, client, subscription, metrics, **configs): adjusted even lower to control the expected time for normal rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 + using Kafka's group management facilities. Default: 30000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. exclude_internal_topics (bool): Whether records from internal topics @@ -77,7 +77,7 @@ def __init__(self, client, subscription, metrics, **configs): True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True node_not_ready_retry_timeout_ms (int): The timeout used to detect - the broker no being available so that NodeNotReadyError is raised. + the broker not being available so that NodeNotReadyError is raised. Default: None """ super(ConsumerCoordinator, self).__init__(client, metrics, **configs) From 2b3211d1a6ad9ceacee611e87c5e32009fec8353 Mon Sep 17 00:00:00 2001 From: Zeynep Arikoglu Date: Wed, 1 Mar 2017 16:11:50 +0200 Subject: [PATCH 47/47] reverted empty line deletes --- kafka/coordinator/base.py | 1 + kafka/coordinator/consumer.py | 1 + 2 files changed, 2 insertions(+) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 57dda03e0..c5c979921 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -214,6 +214,7 @@ def ensure_coordinator_known(self): self.coordinator_id = self._client.least_loaded_node() self._client.ready(self.coordinator_id) continue + future = self._send_group_coordinator_request() self._client.poll(future=future) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 7ec922b83..c1c31c3a4 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -316,6 +316,7 @@ def fetch_committed_offsets(self, partitions): while True: self.ensure_coordinator_known() + # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) self._client.poll(future=future)