From 3f3a257189bbb7c2db948ffe897b915dc8b5d19a Mon Sep 17 00:00:00 2001 From: Wayde Date: Tue, 12 Feb 2019 14:03:16 +0800 Subject: [PATCH 1/2] KafkaProducer change client to self._client --- kafka/producer/kafka.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index ccdd91ad4..36f036c67 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -23,7 +23,6 @@ from kafka.serializer import Serializer from kafka.structs import TopicPartition - log = logging.getLogger(__name__) PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() @@ -367,13 +366,13 @@ def __init__(self, **configs): reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', - wakeup_timeout_ms=self.config['max_block_ms'], - **self.config) + self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + wakeup_timeout_ms=self.config['max_block_ms'], + **self.config) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: - self.config['api_version'] = client.config['api_version'] + self.config['api_version'] = self._client.config['api_version'] if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' @@ -389,9 +388,9 @@ def __init__(self, **configs): message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) - self._metadata = client.cluster + self._metadata = self._client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) - self._sender = Sender(client, self._metadata, + self._sender = Sender(self._client, self._metadata, self._accumulator, self._metrics, guarantee_message_order=guarantee_message_order, **self.config) @@ -406,11 +405,13 @@ def __init__(self, **configs): def _cleanup_factory(self): """Build a cleanup clojure that doesn't increase our ref count""" _self = weakref.proxy(self) + def wrapper(): try: _self.close(timeout=0) except (ReferenceError, AttributeError): pass + return wrapper def _unregister_cleanup(self): @@ -455,7 +456,7 @@ def close(self, timeout=None): assert timeout >= 0 log.info("Closing the Kafka producer with %s secs timeout.", timeout) - #first_exception = AtomicReference() # this will keep track of the first encountered exception + # first_exception = AtomicReference() # this will keep track of the first encountered exception invoked_from_callback = bool(threading.current_thread() is self._sender) if timeout > 0: if invoked_from_callback: From d902d7f9b4927ee8194f7ab1e3b323da73b529c7 Mon Sep 17 00:00:00 2001 From: Wayde Date: Tue, 12 Feb 2019 14:39:51 +0800 Subject: [PATCH 2/2] add function bootstrap_connected --- kafka/consumer/group.py | 6 ++++++ kafka/producer/kafka.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 8d2c65e80..a60b324a5 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -377,6 +377,12 @@ def __init__(self, *topics, **configs): self._subscription.subscribe(topics=topics) self._client.set_topics(topics) + def bootstrap_connected(self): + """Return True if the bootstrap is connected.""" + if self._client._bootstrap_fails > 0: + return False + return True + def assign(self, partitions): """Manually assign a list of TopicPartitions to this consumer. diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 36f036c67..3a1687c33 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -402,6 +402,12 @@ def __init__(self, **configs): atexit.register(self._cleanup) log.debug("Kafka producer started") + def bootstrap_connected(self): + """Return True if the bootstrap is connected.""" + if self._client._bootstrap_fails > 0: + return False + return True + def _cleanup_factory(self): """Build a cleanup clojure that doesn't increase our ref count""" _self = weakref.proxy(self)