From 4a875b649f40a50a2ff17287ec5726089ade6552 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 2 Jul 2013 14:05:44 +0530 Subject: [PATCH 01/11] Zookeeper support including partition rebalancing --- README.md | 33 +++++ kafka/client.py | 12 +- kafka/common.py | 5 + kafka/conn.py | 8 +- kafka/util.py | 4 + kafka/zookeeper.py | 302 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 360 insertions(+), 4 deletions(-) create mode 100644 kafka/zookeeper.py diff --git a/README.md b/README.md index 62befbd04..1c231a627 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,39 @@ consumer = MultiConsumer(kafka, "my-group", "my-topic", driver_type=KAFKA_THREAD_DRIVER) ``` +## Zookeeper support +The Zookeeper supports creating a producer and SimpleConsumer. +The Zookeeper consumer takes care of rebalancing partitions for a topic +among a consumer-group. + +NOTE: This will work only with other kafka-python clients and will not +work with Java/Scala clients (this is a TODO) +```python +from kafka.zookeeper import ZSimpleProducer, ZKeyedProducer +from kafka.zookeeper import ZSimpleConsumer +from kafka.partitioner import HashedPartitioner + +# Zookeeper SimpleProducer +# Takes all arguments similar to SimpleProducer +producer = ZSimpleProducer("127.0.0.1:2181", "my-topic") +producer.send_messages("msg1", "msg2") + +# Zookeeper KeyedProducer +# Takes all arguments similar to KeyedProducer +producer = ZKeyedProducer("127.0.0.1:2181,127.0.0.1:2182", "my-topic", + partitioner=HashedPartitioner) +producer.send("key1", "msg1") + +# Zookeeper consumer. +# Takes all arguments similar to SimpleConsumer +consumer = ZSimpleConsumer("127.0.0.1:2181", "my-group", "my-topic") + +for msg in consumer: + print msg + +consumer.get_messages(block=True, timeout=10) +``` + ## Low level ```python diff --git a/kafka/client.py b/kafka/client.py index 6eff6e101..8fae64cec 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -7,6 +7,7 @@ import os import socket import struct +import sys import time import zlib @@ -167,8 +168,15 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): correlation_id=requestId, payloads=payloads) # Send the request, recv the response - conn.send(requestId, request) - response = conn.recv(requestId) + try: + conn.send(requestId, request) + response = conn.recv(requestId) + except (socket.error, KafkaConnectionError) as exp: + log.error("Error in broker", exc_info=sys.exc_info()) + # Remove this broker information from the connection pool + self.conns.pop(broker, None) + raise + for response in decoder_fn(response): acc[(response.topic, response.partition)] = response diff --git a/kafka/common.py b/kafka/common.py index 33f757328..afdba0061 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -1,6 +1,8 @@ from collections import namedtuple import Queue +from kazoo.handlers.threading import SequentialThreadingHandler +from kazoo.handlers.gevent import SequentialGeventHandler import multiprocessing import threading import socket @@ -74,11 +76,13 @@ def __init__(self, driver_type): self.Queue = Queue.Queue self.Event = threading.Event self.Proc = threading.Thread + self.kazoo_handler = SequentialThreadingHandler elif driver_type == KAFKA_PROCESS_DRIVER: self.Queue = multiprocessing.Queue self.Event = multiprocessing.Event self.Proc = multiprocessing.Process + self.kazoo_handler = SequentialThreadingHandler elif driver_type == KAFKA_GEVENT_DRIVER: self.Queue = gevent.queue.Queue @@ -86,6 +90,7 @@ def __init__(self, driver_type): self.socket = gevent.socket self.Proc = self.gevent_proc self.sleep = gevent.sleep + self.kazoo_handler = SequentialGeventHandler def gevent_proc(self, target=None, args=(), kwargs=None): kwargs = {} if kwargs is None else kwargs diff --git a/kafka/conn.py b/kafka/conn.py index 9f6fd8f76..c1c62676a 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,8 +2,11 @@ import socket import struct +from kafka.util import KafkaConnectionError, BufferUnderflowError + log = logging.getLogger("kafka") + class KafkaConnection(object): """ A socket connection to a single Kafka broker @@ -47,7 +50,7 @@ def _consume_response_iter(self): # Read the size off of the header resp = self._sock.recv(4) if resp == "": - raise Exception("Got no response from Kafka") + raise KafkaConnectionError("Got no response from Kafka") (size,) = struct.unpack('>i', resp) messageSize = size - 4 @@ -75,8 +78,9 @@ def send(self, requestId, payload): log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), requestId)) sent = self._sock.sendall(payload) + if sent != None: - raise RuntimeError("Kafka went away") + raise KafkaConnectionError("Kafka went away") self.data = self._consume_response() def recv(self, requestId): diff --git a/kafka/util.py b/kafka/util.py index 8318ac5c3..18195c350 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -71,3 +71,7 @@ class BufferUnderflowError(Exception): class ChecksumError(Exception): pass + + +class KafkaConnectionError(Exception): + pass diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py new file mode 100644 index 000000000..e0019541b --- /dev/null +++ b/kafka/zookeeper.py @@ -0,0 +1,302 @@ +# Zookeeper support for kafka clients + +import logging +import os +import random +import sys +import time + +from functools import partial +from Queue import Empty + +from kafka.client import KafkaClient +from kafka.common import KafkaDriver +from kafka.producer import Producer, SimpleProducer, KeyedProducer +from kafka.consumer import SimpleConsumer +from kazoo.client import KazooClient + + +BROKER_IDS_PATH = '/brokers/ids/' +PARTITIONER_PATH = '/python/kafka/' +DEFAULT_TIME_BOUNDARY = 5 +CHECK_INTERVAL = 30 + +log = logging.getLogger("kafka") + + +def _get_brokers(zkclient): + """ + Get the list of available brokers registered in zookeeper + """ + brokers = [] + + for broker_id in zkclient.get_children(BROKER_IDS_PATH): + path = os.path.join(BROKER_IDS_PATH, broker_id) + info, _ = zkclient.get(path) + info = simplejson.loads(info) + brokers.append((info['host'], info['port'])) + + log.debug("List of brokers fetched" + str(brokers)) + + random.shuffle(brokers) + return brokers + + +def _get_client(zkclient): + """ + Given a zookeeper client, return a KafkaClient instance for use + """ + brokers = _get_brokers(zkclient) + client = None + + for host, port in brokers: + try: + return KafkaClient(host, port) + except Exception as exp: + log.error("Error while connecting to %s:%d" % (host, port), + exc_info=sys.exc_info()) + + raise exp + + +class ZProducer(Producer): + """ + A base Zookeeper producer to be used by other producer classes + """ + producer_kls = None + + def __init__(self, hosts, topic, **kwargs): + + if self.producer_kls is None: + raise NotImplemented("Producer class needs to be mentioned") + + self.zkclient = KazooClient(hosts=hosts) + self.zkclient.start() + + # Start the producer instance + client = _get_client(self.zkclient) + self.producer = self.producer_kls(client, topic, **kwargs) + + # Stop Zookeeper + self.zkclient.stop() + self.zkclient.close() + self.zkclient = None + + @staticmethod + def retry(fnc, retries=2, retry_after=2): + """ + A decorator for attemtping retries in sending messages + + retries - Number of times we must attempt a retry + retry_after - Delay in between retries + """ + def retry_send(self, *args, **kwargs): + count = retries + while count > 0: + try: + return fnc(self, *args, **kwargs) + except Exception as exp: + log.error("Error in callback", exc_info=sys.exc_info()) + self.producer.driver.sleep(retry_after) + count -= 1 + + return retry_send + + +class ZSimpleProducer(ZProducer): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + + Args: + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to + """ + producer_kls = SimpleProducer + + @ZProducer.retry + def send_messages(self, *msg): + self.producer.send_messages(*msg) + + +class ZKeyedProducer(Producer): + """ + A producer which distributes messages to partitions based on a + partitioner function (class) and the key + + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + """ + producer_kls = KeyedProducer + + @ZProducer.retry + def send(self, key, msg): + self.producer.send(key, msg) + + +class ZSimpleConsumer(object): + """ + A consumer that uses Zookeeper to co-ordinate and share the partitions + of a queue with other consumers + + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + driver_type: The driver type to use for the consumer + time_boundary: The time interval to wait out before deciding on consumer + changes in zookeeper + ignore_non_allocation: If set to True, the consumer will ignore the + case where no partitions were allocated to it + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + """ + def __init__(self, hosts, group, topic, + driver_type=KAFKA_PROCESS_DRIVER, + time_boundary=DEFAULT_TIME_BOUNDARY, + ignore_non_allocation=False, + **kwargs): + + if 'partitions' in kwargs: + raise ValueError("Partitions cannot be specified") + + self.driver = KafkaDriver(driver_type) + self.ignore_non_allocation = ignore_non_allocation + + self.zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) + self.zkclient.start() + + self.client = _get_client(self.zkclient) + self.client._load_metadata_for_topics(topic) + self.partitions = set(self.client.topic_partitions[topic]) + + # create consumer id + hostname = self.driver.socket.gethostname() + consumer_id = "%s-%s-%s-%d" % (topic, group, hostname, os.getpid()) + path = os.path.join(PARTITIONER_PATH, topic, group) + + self.partitioner = self.zkclient.SetPartitioner( + path=path, + set=self.partitions, + identifier=consumer_id, + time_boundary=time_boundary) + + self.consumer = None + self.consumer_fact = partial(SimpleConsumer, + client, group, topic, + driver_type=driver_type, **kwargs) + + # Keep monitoring for changes + self.exit = self.driver.Event() + self.changed = self.driver.Event() + + self.proc = self.driver.Proc(target=_check_and_allocate, + args=(CHECK_INTERVAL, self.exit, self.changed)) + self.proc.daemon = True + self.proc.start() + + # Do the setup once + self._set_consumer(block=True, timeout=None) + + def _set_consumer(self, block=False, timeout=0): + """ + Check if a new consumer has to be created because of a re-balance + """ + try: + partitions = self.changed.get(block=block, timeout=timeout) + except Empty: + return + + # Reset the consumer + self.consumer = None + + if partitions == -1: + raise RuntimeError("Error in partition allocation") + + # Check if we must change the consumer + if not partitions: + if not self.ignore_non_allocation: + raise RuntimeError("Did not get any partition allocation") + else: + log.info("No partitions allocated. Ignoring") + else: + self.consumer = self.consumer_fact(partitions=partitions) + + def _check_and_allocate(self, sleep_time, exit, changed): + """ + Checks if a new allocation is needed for the partitions. + If so, co-ordinates with Zookeeper to get a set of partitions + allocated for the consumer + """ + old = [] + + while not exit.is_set(): + if self.partitioner.acquired: + new = list(self.partitioner) + + if new != old: + old = new + changed.put(new) + + log.info("Acquired partitions: %s", new) + exit.wait(sleep_time) + + elif self.partitioner.release: + log.info("Releasing partitions for reallocation") + self.partitioner.release_set() + old = [] + + elif self.partitioner.failed: + raise RuntimeError("Error in partition allocation") + change.put(-1) + + elif self.partitioner.allocating: + log.info("Waiting for partition allocation") + self.partitioner.wait_for_acquire() + + def __iter__(self): + """ + Iterate through data available in partitions allocated to this + instance + """ + self._set_consumer() + + for msg in self.consumer: + yield msg + + if not self.change.empty(): + self._set_consumer() + break + + def get_messages(self, count=1, block=True, timeout=0.1): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ + self._set_consumer(block, timeout=0.1) + return self.consumer.get_messages(count, block, timeout) + + def stop(): + self.exit.set() + self.proc.join() + self.consumer.stop() + self.client.stop() + self.partitioner.finish() + self.zkclient.close() From ac9bad877ae2cc0dac883fc0f52b186fba20f5b0 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 2 Jul 2013 14:08:24 +0530 Subject: [PATCH 02/11] Update setup.py --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index e383e81f2..c3d225ee9 100644 --- a/setup.py +++ b/setup.py @@ -21,8 +21,8 @@ def run(self): name="kafka-python", version="0.8.1-1", - install_requires=["distribute", "tox", "gevent"], - tests_require=["tox", "gevent"], + install_requires=["distribute", "tox", "gevent", "kazoo"], + tests_require=["tox", "gevent", "kazoo"], cmdclass={"test": Tox}, packages=["kafka"], From 3b5c52c0d38ee000a0116045392f6189de0f0de7 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 3 Jul 2013 09:07:54 +0530 Subject: [PATCH 03/11] Improvements in Zookeeper Consumer * Correct behaviour during consumer rebalancing * Correct behaviour for minor APIs like close(), pending(), seek() etc. * Improved documentation --- kafka/zookeeper.py | 119 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 94 insertions(+), 25 deletions(-) diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index e0019541b..69605879d 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -16,10 +16,13 @@ from kazoo.client import KazooClient -BROKER_IDS_PATH = '/brokers/ids/' -PARTITIONER_PATH = '/python/kafka/' +BROKER_IDS_PATH = '/brokers/ids/' # Path where kafka stores broker info +PARTITIONER_PATH = '/python/kafka/' # Path to use for consumer co-ordination DEFAULT_TIME_BOUNDARY = 5 CHECK_INTERVAL = 30 +ALLOCATION_CHANGING = -1 +ALLOCATION_FAILED = -2 + log = logging.getLogger("kafka") @@ -42,7 +45,7 @@ def _get_brokers(zkclient): return brokers -def _get_client(zkclient): +def get_client(zkclient): """ Given a zookeeper client, return a KafkaClient instance for use """ @@ -74,14 +77,18 @@ def __init__(self, hosts, topic, **kwargs): self.zkclient.start() # Start the producer instance - client = _get_client(self.zkclient) - self.producer = self.producer_kls(client, topic, **kwargs) + self.client = get_client(self.zkclient) + self.producer = self.producer_kls(self.client, topic, **kwargs) # Stop Zookeeper self.zkclient.stop() self.zkclient.close() self.zkclient = None + def stop(): + self.producer.stop() + self.client.close() + @staticmethod def retry(fnc, retries=2, retry_after=2): """ @@ -150,7 +157,9 @@ class ZSimpleConsumer(object): time_boundary: The time interval to wait out before deciding on consumer changes in zookeeper ignore_non_allocation: If set to True, the consumer will ignore the - case where no partitions were allocated to it + case where no partitions were allocated to it. + This can be used to keep consumers in stand-by. They will take over + when another consumer fails auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume @@ -163,6 +172,15 @@ class ZSimpleConsumer(object): reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers + + Partition allocation details + * When the consumer is initialized, it blocks till it gets an allocation + * If ignore_non_allocation is False, the consumer will throw an error + in init or during other operations + * During re-balancing of partitions, the consumer will not return any + messages (iteration or get_messages) + * After re-balancing, if the consumer does not get any partitions, + ignore_non_allocation will control it's behaviour """ def __init__(self, hosts, group, topic, driver_type=KAFKA_PROCESS_DRIVER, @@ -170,6 +188,7 @@ def __init__(self, hosts, group, topic, ignore_non_allocation=False, **kwargs): + # User is not allowed to specify partitions if 'partitions' in kwargs: raise ValueError("Partitions cannot be specified") @@ -179,7 +198,7 @@ def __init__(self, hosts, group, topic, self.zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) self.zkclient.start() - self.client = _get_client(self.zkclient) + self.client = get_client(self.zkclient) self.client._load_metadata_for_topics(topic) self.partitions = set(self.client.topic_partitions[topic]) @@ -194,21 +213,21 @@ def __init__(self, hosts, group, topic, identifier=consumer_id, time_boundary=time_boundary) - self.consumer = None + # Create a function which can be used for creating consumers self.consumer_fact = partial(SimpleConsumer, client, group, topic, driver_type=driver_type, **kwargs) # Keep monitoring for changes - self.exit = self.driver.Event() - self.changed = self.driver.Event() + self.exit = self.driver.Event() # Notify worker to exit + self.changed = self.driver.Event() # Notify of partition changes self.proc = self.driver.Proc(target=_check_and_allocate, - args=(CHECK_INTERVAL, self.exit, self.changed)) + args=(CHECK_INTERVAL,)) self.proc.daemon = True self.proc.start() - # Do the setup once + # Do the setup once and block till we get an allocation self._set_consumer(block=True, timeout=None) def _set_consumer(self, block=False, timeout=0): @@ -220,48 +239,67 @@ def _set_consumer(self, block=False, timeout=0): except Empty: return - # Reset the consumer + # If we have a consumer clean it up + if self.consumer: + self.consumer.close() + self.consumer = None - if partitions == -1: + if partitions == ALLOCATION_FAILED: + # Allocation has failed. Nothing we can do about it raise RuntimeError("Error in partition allocation") - # Check if we must change the consumer - if not partitions: + elif partitions == ALLOCATION_CHANGING: + # Allocation is changing because of consumer changes + self.consumer = [] + log.info("Partitions are being reassigned") + return + + elif not partitions: + # Check if we must change the consumer if not self.ignore_non_allocation: raise RuntimeError("Did not get any partition allocation") else: log.info("No partitions allocated. Ignoring") + self.consumer = [] else: + # Create a new consumer self.consumer = self.consumer_fact(partitions=partitions) - def _check_and_allocate(self, sleep_time, exit, changed): + def _check_and_allocate(self, sleep_time): """ Checks if a new allocation is needed for the partitions. If so, co-ordinates with Zookeeper to get a set of partitions allocated for the consumer + + sleep_time: Time interval between each check after getting allocation """ old = [] - while not exit.is_set(): + while not self.exit.is_set(): if self.partitioner.acquired: + # A new set of partitions has been acquired new = list(self.partitioner) + # If there is a change, notify for a consumer change if new != old: + log.info("Acquired partitions: %s", new) old = new - changed.put(new) + self.changed.put(new) - log.info("Acquired partitions: %s", new) - exit.wait(sleep_time) + # Wait for a while before checking again. In the meantime + # wake up if the user calls for exit + self.exit.wait(sleep_time) elif self.partitioner.release: log.info("Releasing partitions for reallocation") + self.change.put(ALLOCATION_CHANGING) self.partitioner.release_set() old = [] elif self.partitioner.failed: + self.change.put(ALLOCATION_FAILED) raise RuntimeError("Error in partition allocation") - change.put(-1) elif self.partitioner.allocating: log.info("Waiting for partition allocation") @@ -272,13 +310,16 @@ def __iter__(self): Iterate through data available in partitions allocated to this instance """ - self._set_consumer() + self._set_consumer(block=False) + + if self.consumer is None: + raise RuntimeError("Error in partition allocation") for msg in self.consumer: yield msg if not self.change.empty(): - self._set_consumer() + self._set_consumer(block=False) break def get_messages(self, count=1, block=True, timeout=0.1): @@ -291,12 +332,40 @@ def get_messages(self, count=1, block=True, timeout=0.1): If >0, API will block for specified time (in seconds) """ self._set_consumer(block, timeout=0.1) + + if not self.consumer: + raise RuntimeError("Error in partition allocation") + return self.consumer.get_messages(count, block, timeout) def stop(): self.exit.set() self.proc.join() - self.consumer.stop() + + if self.consumer: + self.consumer.stop() + self.client.stop() self.partitioner.finish() self.zkclient.close() + + def commit(self): + if self.consumer: + self.consumer.commit() + self._set_consumer(block=False) + + def seek(self, *args, **kwargs): + self._set_consumer() + + if not self.consumer: + raise RuntimeError("Error in partition allocation") + + return self.consumer.seek(*args, **kwargs) + + def pending(self): + if self.consumer is None: + raise RuntimeError("Error in partition allocation") + elif not self.consumer: + return 0 + + return self.consumer.pending() From 172d8387d7536876c2ce2add8a26daf104a3decd Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 3 Jul 2013 10:46:12 +0530 Subject: [PATCH 04/11] Remove retry in zookeeper producer This can be made common for all producers later --- kafka/zookeeper.py | 48 +++++++++++++++++++--------------------------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index 69605879d..462144376 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -25,6 +25,7 @@ log = logging.getLogger("kafka") +random.seed() def _get_brokers(zkclient): @@ -65,6 +66,11 @@ def get_client(zkclient): class ZProducer(Producer): """ A base Zookeeper producer to be used by other producer classes + + Args + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to """ producer_kls = None @@ -89,26 +95,6 @@ def stop(): self.producer.stop() self.client.close() - @staticmethod - def retry(fnc, retries=2, retry_after=2): - """ - A decorator for attemtping retries in sending messages - - retries - Number of times we must attempt a retry - retry_after - Delay in between retries - """ - def retry_send(self, *args, **kwargs): - count = retries - while count > 0: - try: - return fnc(self, *args, **kwargs) - except Exception as exp: - log.error("Error in callback", exc_info=sys.exc_info()) - self.producer.driver.sleep(retry_after) - count -= 1 - - return retry_send - class ZSimpleProducer(ZProducer): """ @@ -121,7 +107,6 @@ class ZSimpleProducer(ZProducer): """ producer_kls = SimpleProducer - @ZProducer.retry def send_messages(self, *msg): self.producer.send_messages(*msg) @@ -139,7 +124,6 @@ class ZKeyedProducer(Producer): """ producer_kls = KeyedProducer - @ZProducer.retry def send(self, key, msg): self.producer.send(key, msg) @@ -147,19 +131,21 @@ def send(self, key, msg): class ZSimpleConsumer(object): """ A consumer that uses Zookeeper to co-ordinate and share the partitions - of a queue with other consumers + of a topic with other consumers hosts: Comma-separated list of hosts to connect to (e.g. 127.0.0.1:2181,127.0.0.1:2182) group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume driver_type: The driver type to use for the consumer - time_boundary: The time interval to wait out before deciding on consumer - changes in zookeeper + time_boundary: The time interval, in seconds, to wait out before deciding + on consumer changes in zookeeper. A higher value will ensure that a + consumer restart will not cause two re-balances. + (Default 10s) ignore_non_allocation: If set to True, the consumer will ignore the case where no partitions were allocated to it. This can be used to keep consumers in stand-by. They will take over - when another consumer fails + when another consumer fails. (Default False) auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume @@ -207,6 +193,9 @@ def __init__(self, hosts, group, topic, consumer_id = "%s-%s-%s-%d" % (topic, group, hostname, os.getpid()) path = os.path.join(PARTITIONER_PATH, topic, group) + log.debug("Consumer id set to: %s" % consumer_id) + log.debug("Using path %s for co-ordination" % path) + self.partitioner = self.zkclient.SetPartitioner( path=path, set=self.partitions, @@ -357,8 +346,10 @@ def commit(self): def seek(self, *args, **kwargs): self._set_consumer() - if not self.consumer: - raise RuntimeError("Error in partition allocation") + if self.consumer is None: + raise RuntimeError("Partition allocation failed") + elif not self.consumer: + raise RuntimeError("Waiting for partition allocation") return self.consumer.seek(*args, **kwargs) @@ -366,6 +357,7 @@ def pending(self): if self.consumer is None: raise RuntimeError("Error in partition allocation") elif not self.consumer: + # We are in a transition/suspended state return 0 return self.consumer.pending() From b2f232ebec21612e11689b103e24059d30cb9bea Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 3 Jul 2013 11:02:08 +0530 Subject: [PATCH 05/11] Got Zookeeper producers to work --- kafka/__init__.py | 2 ++ kafka/zookeeper.py | 13 +++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index 49ea8d5f6..ce41a3183 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -15,11 +15,13 @@ ) from kafka.producer import SimpleProducer from kafka.consumer import SimpleConsumer, MultiConsumer +from kafka.zookeeper import ZSimpleProducer, ZKeyedProducer, ZSimpleConsumer __all__ = [ 'KAFKA_THREAD_DRIVER', 'KAFKA_GEVENT_DRIVER', 'KAFKA_PROCESS_DRIVER', 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer', 'MultiConsumer', + 'ZSimpleProducer', 'ZKeyedProducer', 'ZSimpleConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' ] diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index 462144376..de5b47889 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -5,15 +5,19 @@ import random import sys import time +import simplejson from functools import partial from Queue import Empty from kafka.client import KafkaClient -from kafka.common import KafkaDriver -from kafka.producer import Producer, SimpleProducer, KeyedProducer +from kafka.producer import SimpleProducer, KeyedProducer from kafka.consumer import SimpleConsumer from kazoo.client import KazooClient +from kafka.common import ( + KAFKA_PROCESS_DRIVER, KAFKA_THREAD_DRIVER, KAFKA_GEVENT_DRIVER, + KafkaDriver +) BROKER_IDS_PATH = '/brokers/ids/' # Path where kafka stores broker info @@ -63,7 +67,8 @@ def get_client(zkclient): raise exp -class ZProducer(Producer): +# TODO: Make this a subclass of Producer later +class ZProducer(object): """ A base Zookeeper producer to be used by other producer classes @@ -111,7 +116,7 @@ def send_messages(self, *msg): self.producer.send_messages(*msg) -class ZKeyedProducer(Producer): +class ZKeyedProducer(ZProducer): """ A producer which distributes messages to partitions based on a partitioner function (class) and the key From b5b7f3974556a7766fe47ef5c760715e0771d7f0 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 3 Jul 2013 14:10:25 +0530 Subject: [PATCH 06/11] Moved logic from queues to locked shared memory The queues approach had a problem. Say the consumer is not used to fetch messages for a while. In this while the allocation has changed multiple times. When the next invocation of the consumer happens, the oldest message is fetched from the queue. It will take a while before the consumer catches up The approach has been simplified with a shared memory protected by a lock and notified using an event --- kafka/common.py | 4 ++ kafka/zookeeper.py | 171 +++++++++++++++++++++++++++++++-------------- 2 files changed, 122 insertions(+), 53 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index afdba0061..1c4cdf09a 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -11,6 +11,7 @@ import gevent.queue import gevent.pool import gevent.socket +import gevent.coros import time ############### @@ -77,12 +78,14 @@ def __init__(self, driver_type): self.Event = threading.Event self.Proc = threading.Thread self.kazoo_handler = SequentialThreadingHandler + self.Lock = threading.Lock elif driver_type == KAFKA_PROCESS_DRIVER: self.Queue = multiprocessing.Queue self.Event = multiprocessing.Event self.Proc = multiprocessing.Process self.kazoo_handler = SequentialThreadingHandler + self.Lock = multiprocessing.Lock elif driver_type == KAFKA_GEVENT_DRIVER: self.Queue = gevent.queue.Queue @@ -91,6 +94,7 @@ def __init__(self, driver_type): self.Proc = self.gevent_proc self.sleep = gevent.sleep self.kazoo_handler = SequentialGeventHandler + self.Lock = gevent.coros.Semaphore def gevent_proc(self, target=None, args=(), kwargs=None): kwargs = {} if kwargs is None else kwargs diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index de5b47889..f7d1bc985 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -1,6 +1,7 @@ # Zookeeper support for kafka clients import logging +import multiprocessing import os import random import sys @@ -185,119 +186,188 @@ def __init__(self, hosts, group, topic, self.driver = KafkaDriver(driver_type) self.ignore_non_allocation = ignore_non_allocation + self.time_boundary = time_boundary - self.zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) - self.zkclient.start() + zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) + zkclient.start() - self.client = get_client(self.zkclient) + self.client = get_client(zkclient) self.client._load_metadata_for_topics(topic) - self.partitions = set(self.client.topic_partitions[topic]) + partitions = set(self.client.topic_partitions[topic]) # create consumer id hostname = self.driver.socket.gethostname() - consumer_id = "%s-%s-%s-%d" % (topic, group, hostname, os.getpid()) + self.identifier = "%s-%s-%s-%d" % (topic, group, hostname, os.getpid()) + path = os.path.join(PARTITIONER_PATH, topic, group) - log.debug("Consumer id set to: %s" % consumer_id) + log.debug("Consumer id set to: %s" % self.identifier) log.debug("Using path %s for co-ordination" % path) - self.partitioner = self.zkclient.SetPartitioner( - path=path, - set=self.partitions, - identifier=consumer_id, - time_boundary=time_boundary) - # Create a function which can be used for creating consumers + self.consumer = None self.consumer_fact = partial(SimpleConsumer, - client, group, topic, + self.client, group, topic, driver_type=driver_type, **kwargs) # Keep monitoring for changes - self.exit = self.driver.Event() # Notify worker to exit - self.changed = self.driver.Event() # Notify of partition changes - self.proc = self.driver.Proc(target=_check_and_allocate, - args=(CHECK_INTERVAL,)) + # Design: + # * We will have a worker which will keep monitoring for rebalance + # * The worker and main consumer will share data via shared memory + # protected by a lock + # * If the worker gets new allocations, it will SET an Event() + # * The main consumer will check this event to change itself + # * Main consumer will SET another Event() to indicate worker to exit + + # This event will notify the worker to exit + self.exit = self.driver.Event() + + # Used by the worker to indicate that allocation has changed + self.changed = self.driver.Event() + + # The shared memory and lock used for sharing allocation info + self.lock = self.driver.Lock() + self.allocated = multiprocessing.Array('i', len(partitions)) + + # Initialize the array + self._set_partitions(self.allocated, [], filler=ALLOCATION_CHANGING) + + # Start the worker + self.proc = self.driver.Proc(target=self._check_and_allocate, + args=(hosts, path, partitions, + self.allocated, CHECK_INTERVAL)) self.proc.daemon = True self.proc.start() + # Stop the Zookeeper client (worker will create one itself) + zkclient.stop() + zkclient.close() + # Do the setup once and block till we get an allocation self._set_consumer(block=True, timeout=None) + def _set_partitions(self, array, partitions, filler=ALLOCATION_CHANGING): + """ + Update partition info in the shared memory array + """ + i = 0 + for partition in partitions: + array[i] = partition + i += 1 + + while i < len(array): + array[i] = filler + i += 1 + def _set_consumer(self, block=False, timeout=0): """ Check if a new consumer has to be created because of a re-balance """ - try: - partitions = self.changed.get(block=block, timeout=timeout) - except Empty: + if not block: + timeout = 0 + + if not self.changed.wait(timeout=timeout): return + # There is a change. Get our new partitions + with self.lock: + partitions = [p for p in self.allocated] + self.changed.clear() + # If we have a consumer clean it up if self.consumer: - self.consumer.close() + self.consumer.stop() self.consumer = None - if partitions == ALLOCATION_FAILED: + if not partitions: + # Check if we must change the consumer + if not self.ignore_non_allocation: + raise RuntimeError("Did not get any partition allocation") + else: + log.info("No partitions allocated. Ignoring") + self.consumer = [] + + elif partitions[0] == ALLOCATION_FAILED: # Allocation has failed. Nothing we can do about it raise RuntimeError("Error in partition allocation") - elif partitions == ALLOCATION_CHANGING: + elif partitions[0] == ALLOCATION_CHANGING: # Allocation is changing because of consumer changes self.consumer = [] log.info("Partitions are being reassigned") return - elif not partitions: - # Check if we must change the consumer - if not self.ignore_non_allocation: - raise RuntimeError("Did not get any partition allocation") - else: - log.info("No partitions allocated. Ignoring") - self.consumer = [] else: # Create a new consumer + partitions = filter(lambda x: x >= 0, partitions) self.consumer = self.consumer_fact(partitions=partitions) + log.info("Reinitialized consumer with %s" % partitions) - def _check_and_allocate(self, sleep_time): + def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): """ Checks if a new allocation is needed for the partitions. If so, co-ordinates with Zookeeper to get a set of partitions allocated for the consumer - - sleep_time: Time interval between each check after getting allocation """ + old = [] + # Start zookeeper connection again + zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) + zkclient.start() + + # Set up the partitioner + partitioner = zkclient.SetPartitioner(path=path, set=partitions, + identifier=self.identifier, + time_boundary=self.time_boundary) + + # Keep running the allocation logic till we are asked to exit while not self.exit.is_set(): - if self.partitioner.acquired: + if partitioner.acquired: # A new set of partitions has been acquired - new = list(self.partitioner) + new = list(partitioner) # If there is a change, notify for a consumer change if new != old: log.info("Acquired partitions: %s", new) old = new - self.changed.put(new) + with self.lock: + self._set_partitions(array, new) + self.changed.set() # Wait for a while before checking again. In the meantime # wake up if the user calls for exit self.exit.wait(sleep_time) - elif self.partitioner.release: + elif partitioner.release: + # We have been asked to release the partitions log.info("Releasing partitions for reallocation") - self.change.put(ALLOCATION_CHANGING) - self.partitioner.release_set() old = [] + with self.lock: + self._set_partitions(array, old) + self.changed.set() + + partitioner.release_set() - elif self.partitioner.failed: - self.change.put(ALLOCATION_FAILED) - raise RuntimeError("Error in partition allocation") + elif partitioner.failed: + # Partition allocation failed + old = [] + with self.lock: + self._set_partitions(array, old, filler=ALLOCATION_FAILED) + self.changed.set() + break - elif self.partitioner.allocating: + elif partitioner.allocating: + # We have to wait till the partition is allocated log.info("Waiting for partition allocation") - self.partitioner.wait_for_acquire() + partitioner.wait_for_acquire() + + # Clean up + partitioner.finish() + zkclient.stop() + zkclient.close() def __iter__(self): """ @@ -311,10 +381,7 @@ def __iter__(self): for msg in self.consumer: yield msg - - if not self.change.empty(): - self._set_consumer(block=False) - break + self._set_consumer(block=False) def get_messages(self, count=1, block=True, timeout=0.1): """ @@ -325,23 +392,21 @@ def get_messages(self, count=1, block=True, timeout=0.1): timeout: If None, and block=True, the API will block infinitely. If >0, API will block for specified time (in seconds) """ - self._set_consumer(block, timeout=0.1) + self._set_consumer(block, timeout) if not self.consumer: raise RuntimeError("Error in partition allocation") return self.consumer.get_messages(count, block, timeout) - def stop(): + def stop(self): self.exit.set() self.proc.join() if self.consumer: self.consumer.stop() - self.client.stop() - self.partitioner.finish() - self.zkclient.close() + self.client.close() def commit(self): if self.consumer: From 010f43210f13b29fa11ccf01a64ffc82e58ad522 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 3 Jul 2013 14:58:37 +0530 Subject: [PATCH 07/11] Minor bug fixes Ensure that the standby consumer logic works Also add a __repr__ method for Zookeeper consumer --- kafka/zookeeper.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index f7d1bc985..d074f8ee7 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -247,6 +247,11 @@ def __init__(self, hosts, group, topic, # Do the setup once and block till we get an allocation self._set_consumer(block=True, timeout=None) + def __repr__(self): + partitions = filter(lambda x: x>=0, self.allocated) + partitions = ','.join([str(i) for i in partitions]) + return u'ZSimpleConsumer<%s>' % partitions + def _set_partitions(self, array, partitions, filler=ALLOCATION_CHANGING): """ Update partition info in the shared memory array @@ -312,7 +317,7 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): allocated for the consumer """ - old = [] + old = None # Start zookeeper connection again zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) @@ -366,6 +371,11 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): # Clean up partitioner.finish() + + with self.lock: + self._set_partitions(array, []) + self.changed.set() + zkclient.stop() zkclient.close() @@ -394,8 +404,10 @@ def get_messages(self, count=1, block=True, timeout=0.1): """ self._set_consumer(block, timeout) - if not self.consumer: + if self.consumer is None: raise RuntimeError("Error in partition allocation") + elif not self.consumer: + return [] return self.consumer.get_messages(count, block, timeout) @@ -417,7 +429,7 @@ def seek(self, *args, **kwargs): self._set_consumer() if self.consumer is None: - raise RuntimeError("Partition allocation failed") + raise RuntimeError("Error in partition allocation") elif not self.consumer: raise RuntimeError("Waiting for partition allocation") From 04a200af4c931e4edbcc67005de0533c2d760a76 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 4 Jul 2013 08:39:10 +0530 Subject: [PATCH 08/11] Add support for non-blocking init and status() --- kafka/zookeeper.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index d074f8ee7..b17ec8a9f 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -144,6 +144,9 @@ class ZSimpleConsumer(object): group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume driver_type: The driver type to use for the consumer + block_init: If True, the init method will block till the allocation is + completed. If not, it will return immediately and user can invoke + consumer.status() to check the status. Default True. time_boundary: The time interval, in seconds, to wait out before deciding on consumer changes in zookeeper. A higher value will ensure that a consumer restart will not cause two re-balances. @@ -176,6 +179,7 @@ class ZSimpleConsumer(object): """ def __init__(self, hosts, group, topic, driver_type=KAFKA_PROCESS_DRIVER, + block_init=True, time_boundary=DEFAULT_TIME_BOUNDARY, ignore_non_allocation=False, **kwargs): @@ -205,7 +209,7 @@ def __init__(self, hosts, group, topic, log.debug("Using path %s for co-ordination" % path) # Create a function which can be used for creating consumers - self.consumer = None + self.consumer = [] self.consumer_fact = partial(SimpleConsumer, self.client, group, topic, driver_type=driver_type, **kwargs) @@ -245,12 +249,30 @@ def __init__(self, hosts, group, topic, zkclient.close() # Do the setup once and block till we get an allocation - self._set_consumer(block=True, timeout=None) + self._set_consumer(block=block_init, timeout=None) def __repr__(self): + """ + Give a string representation of the consumer + """ partitions = filter(lambda x: x>=0, self.allocated) - partitions = ','.join([str(i) for i in partitions]) - return u'ZSimpleConsumer<%s>' % partitions + message = ','.join([str(i) for i in partitions]) + + if not message: + message = str(self.consumer_status) + + return u'ZSimpleConsumer<%s>' % message + + def status(self): + """ + Returns the status of the consumer + """ + if self.consumer is None: + return 'FAILED' + elif self.consumer is []: + return 'ALLOCATING' + else: + return 'ALLOCATED' def _set_partitions(self, array, partitions, filler=ALLOCATION_CHANGING): """ From 179b5d0aaffec031600175ad9934e00d445d3ce8 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 4 Jul 2013 10:05:49 +0530 Subject: [PATCH 09/11] Ensure that state transitions are handled fine --- kafka/zookeeper.py | 55 ++++++++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index b17ec8a9f..74f4fffcc 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -25,8 +25,13 @@ PARTITIONER_PATH = '/python/kafka/' # Path to use for consumer co-ordination DEFAULT_TIME_BOUNDARY = 5 CHECK_INTERVAL = 30 -ALLOCATION_CHANGING = -1 -ALLOCATION_FAILED = -2 + +# Allocation states +ALLOCATION_COMPLETED = -1 +ALLOCATION_CHANGING = -2 +ALLOCATION_FAILED = -3 +ALLOCATION_MISSED = -4 +ALLOCATION_INACTIVE = -5 log = logging.getLogger("kafka") @@ -235,7 +240,8 @@ def __init__(self, hosts, group, topic, self.allocated = multiprocessing.Array('i', len(partitions)) # Initialize the array - self._set_partitions(self.allocated, [], filler=ALLOCATION_CHANGING) + self._set_partitions(self.allocated, [], ALLOCATION_CHANGING) + self.consumer_state = ALLOCATION_CHANGING # Start the worker self.proc = self.driver.Proc(target=self._check_and_allocate, @@ -259,7 +265,7 @@ def __repr__(self): message = ','.join([str(i) for i in partitions]) if not message: - message = str(self.consumer_status) + message = self.status() return u'ZSimpleConsumer<%s>' % message @@ -267,14 +273,20 @@ def status(self): """ Returns the status of the consumer """ - if self.consumer is None: - return 'FAILED' - elif self.consumer is []: - return 'ALLOCATING' - else: + self._set_consumer(block=False) + + if self.consumer_state == ALLOCATION_COMPLETED: return 'ALLOCATED' + elif self.consumer_state == ALLOCATION_CHANGING: + return 'ALLOCATING' + elif self.consumer_state == ALLOCATION_FAILED: + return 'FAILED' + elif self.consumer_state == ALLOCATION_MISSED: + return 'MISSED' + elif self.consumer_state == ALLOCATION_INACTIVE: + return 'INACTIVE' - def _set_partitions(self, array, partitions, filler=ALLOCATION_CHANGING): + def _set_partitions(self, array, partitions, filler): """ Update partition info in the shared memory array """ @@ -307,8 +319,9 @@ def _set_consumer(self, block=False, timeout=0): self.consumer.stop() self.consumer = None + self.consumer_state = partitions[0] - if not partitions: + if self.consumer_state == ALLOCATION_MISSED: # Check if we must change the consumer if not self.ignore_non_allocation: raise RuntimeError("Did not get any partition allocation") @@ -316,20 +329,23 @@ def _set_consumer(self, block=False, timeout=0): log.info("No partitions allocated. Ignoring") self.consumer = [] - elif partitions[0] == ALLOCATION_FAILED: + elif self.consumer_state == ALLOCATION_FAILED: # Allocation has failed. Nothing we can do about it raise RuntimeError("Error in partition allocation") - elif partitions[0] == ALLOCATION_CHANGING: + elif self.consumer_state == ALLOCATION_CHANGING: # Allocation is changing because of consumer changes self.consumer = [] log.info("Partitions are being reassigned") return + elif self.consumer_state == ALLOCATION_INACTIVE: + log.info("Consumer is inactive") else: # Create a new consumer partitions = filter(lambda x: x >= 0, partitions) self.consumer = self.consumer_fact(partitions=partitions) + self.consumer_state = ALLOCATION_COMPLETED log.info("Reinitialized consumer with %s" % partitions) def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): @@ -361,7 +377,7 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): log.info("Acquired partitions: %s", new) old = new with self.lock: - self._set_partitions(array, new) + self._set_partitions(array, new, ALLOCATION_MISSED) self.changed.set() # Wait for a while before checking again. In the meantime @@ -373,7 +389,7 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): log.info("Releasing partitions for reallocation") old = [] with self.lock: - self._set_partitions(array, old) + self._set_partitions(array, old, ALLOCATION_CHANGING) self.changed.set() partitioner.release_set() @@ -382,7 +398,7 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): # Partition allocation failed old = [] with self.lock: - self._set_partitions(array, old, filler=ALLOCATION_FAILED) + self._set_partitions(array, old, ALLOCATION_FAILED) self.changed.set() break @@ -395,7 +411,7 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): partitioner.finish() with self.lock: - self._set_partitions(array, []) + self._set_partitions(array, [], ALLOCATION_INACTIVE) self.changed.set() zkclient.stop() @@ -436,10 +452,7 @@ def get_messages(self, count=1, block=True, timeout=0.1): def stop(self): self.exit.set() self.proc.join() - - if self.consumer: - self.consumer.stop() - + self._set_consumer(block=True) self.client.close() def commit(self): From 94b9e1c3b3f3ec8f7dc2a97e12e61ea87e51df37 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Mon, 8 Jul 2013 13:52:32 +0530 Subject: [PATCH 10/11] Test cases for zookeeper based consumers Other changes * If zookeeper consumer was run from the same process, kazoo Partitioner identifier would have been the same - resulting in errors. Fixed * Add support for chroot to a Zookeeper path * Remove dependency on simplejson * Reduce the sleep time for the partitioner (in acquired state) --- kafka/zookeeper.py | 65 ++++++----- test/test_integration.py | 242 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 281 insertions(+), 26 deletions(-) diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index 74f4fffcc..08a10ad23 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -6,7 +6,11 @@ import random import sys import time -import simplejson +import uuid +try: + import simplejson as json +except ImportError: + import json from functools import partial from Queue import Empty @@ -21,10 +25,9 @@ ) -BROKER_IDS_PATH = '/brokers/ids/' # Path where kafka stores broker info -PARTITIONER_PATH = '/python/kafka/' # Path to use for consumer co-ordination +BROKER_IDS_PATH = 'brokers/ids/' # Path where kafka stores broker info +PARTITIONER_PATH = 'python/kafka/' # Path to use for consumer co-ordination DEFAULT_TIME_BOUNDARY = 5 -CHECK_INTERVAL = 30 # Allocation states ALLOCATION_COMPLETED = -1 @@ -38,16 +41,17 @@ random.seed() -def _get_brokers(zkclient): +def _get_brokers(zkclient, chroot='/'): """ Get the list of available brokers registered in zookeeper """ brokers = [] + root = os.path.join(chroot, BROKER_IDS_PATH) - for broker_id in zkclient.get_children(BROKER_IDS_PATH): - path = os.path.join(BROKER_IDS_PATH, broker_id) + for broker_id in zkclient.get_children(root): + path = os.path.join(root, broker_id) info, _ = zkclient.get(path) - info = simplejson.loads(info) + info = json.loads(info) brokers.append((info['host'], info['port'])) log.debug("List of brokers fetched" + str(brokers)) @@ -56,11 +60,11 @@ def _get_brokers(zkclient): return brokers -def get_client(zkclient): +def get_client(zkclient, chroot='/'): """ Given a zookeeper client, return a KafkaClient instance for use """ - brokers = _get_brokers(zkclient) + brokers = _get_brokers(zkclient, chroot=chroot) client = None for host, port in brokers: @@ -70,7 +74,7 @@ def get_client(zkclient): log.error("Error while connecting to %s:%d" % (host, port), exc_info=sys.exc_info()) - raise exp + raise RuntimeError("Unable to find any running broker") # TODO: Make this a subclass of Producer later @@ -82,10 +86,11 @@ class ZProducer(object): hosts: Comma-separated list of hosts to connect to (e.g. 127.0.0.1:2181,127.0.0.1:2182) topic - The kafka topic to send messages to + chroot - The kafka subdirectory to search for brokers """ producer_kls = None - def __init__(self, hosts, topic, **kwargs): + def __init__(self, hosts, topic, chroot='/', **kwargs): if self.producer_kls is None: raise NotImplemented("Producer class needs to be mentioned") @@ -94,7 +99,7 @@ def __init__(self, hosts, topic, **kwargs): self.zkclient.start() # Start the producer instance - self.client = get_client(self.zkclient) + self.client = get_client(self.zkclient, chroot=chroot) self.producer = self.producer_kls(self.client, topic, **kwargs) # Stop Zookeeper @@ -148,6 +153,7 @@ class ZSimpleConsumer(object): (e.g. 127.0.0.1:2181,127.0.0.1:2182) group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume + chroot - The kafka subdirectory to search for brokers driver_type: The driver type to use for the consumer block_init: If True, the init method will block till the allocation is completed. If not, it will return immediately and user can invoke @@ -182,7 +188,7 @@ class ZSimpleConsumer(object): * After re-balancing, if the consumer does not get any partitions, ignore_non_allocation will control it's behaviour """ - def __init__(self, hosts, group, topic, + def __init__(self, hosts, group, topic, chroot='/', driver_type=KAFKA_PROCESS_DRIVER, block_init=True, time_boundary=DEFAULT_TIME_BOUNDARY, @@ -200,17 +206,16 @@ def __init__(self, hosts, group, topic, zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) zkclient.start() - self.client = get_client(zkclient) + self.client = get_client(zkclient, chroot=chroot) self.client._load_metadata_for_topics(topic) partitions = set(self.client.topic_partitions[topic]) # create consumer id hostname = self.driver.socket.gethostname() - self.identifier = "%s-%s-%s-%d" % (topic, group, hostname, os.getpid()) - - path = os.path.join(PARTITIONER_PATH, topic, group) - + self.identifier = "%s-%s-%s" % (topic, group, hostname) log.debug("Consumer id set to: %s" % self.identifier) + + path = os.path.join(chroot, PARTITIONER_PATH, topic, group) log.debug("Using path %s for co-ordination" % path) # Create a function which can be used for creating consumers @@ -246,7 +251,7 @@ def __init__(self, hosts, group, topic, # Start the worker self.proc = self.driver.Proc(target=self._check_and_allocate, args=(hosts, path, partitions, - self.allocated, CHECK_INTERVAL)) + self.allocated)) self.proc.daemon = True self.proc.start() @@ -299,7 +304,7 @@ def _set_partitions(self, array, partitions, filler): array[i] = filler i += 1 - def _set_consumer(self, block=False, timeout=0): + def _set_consumer(self, block=False, timeout=None): """ Check if a new consumer has to be created because of a re-balance """ @@ -348,7 +353,8 @@ def _set_consumer(self, block=False, timeout=0): self.consumer_state = ALLOCATION_COMPLETED log.info("Reinitialized consumer with %s" % partitions) - def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): + + def _check_and_allocate(self, hosts, path, partitions, array): """ Checks if a new allocation is needed for the partitions. If so, co-ordinates with Zookeeper to get a set of partitions @@ -361,11 +367,18 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) zkclient.start() + identifier = '%s-%d-%s' % (self.identifier, + os.getpid(), + uuid.uuid4().hex) + # Set up the partitioner partitioner = zkclient.SetPartitioner(path=path, set=partitions, - identifier=self.identifier, + identifier=identifier, time_boundary=self.time_boundary) + # Once allocation is done, sleep for some time between each checks + sleep_time = self.time_boundary / 2.0 + # Keep running the allocation logic till we are asked to exit while not self.exit.is_set(): if partitioner.acquired: @@ -374,7 +387,7 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): # If there is a change, notify for a consumer change if new != old: - log.info("Acquired partitions: %s", new) + log.info("Acquired partitions: %s" % str(new)) old = new with self.lock: self._set_partitions(array, new, ALLOCATION_MISSED) @@ -387,9 +400,9 @@ def _check_and_allocate(self, hosts, path, partitions, array, sleep_time): elif partitioner.release: # We have been asked to release the partitions log.info("Releasing partitions for reallocation") - old = [] + old = None with self.lock: - self._set_partitions(array, old, ALLOCATION_CHANGING) + self._set_partitions(array, [], ALLOCATION_CHANGING) self.changed.set() partitioner.release_set() diff --git a/test/test_integration.py b/test/test_integration.py index 1fe7cd60f..32173fa26 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -506,6 +506,248 @@ def test_multi_proc_pending_gevent(self): def test_multi_proc_pending_thread(self): return self.test_multi_proc_pending(driver_type=KAFKA_THREAD_DRIVER) + def test_zconsumer(self, driver_type=KAFKA_PROCESS_DRIVER): + hosts = '%s:%d' % (self.server2.zk_host, self.server2.zk_port) + queue = "test_zconsumer_%s" % (driver_type) + + # Produce 100 messages to partition 0 + produce1 = ProduceRequest(queue, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 100 messages to partition 1 + produce2 = ProduceRequest(queue, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Start a consumer + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + consumer.seek(-10, 2) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 10) + + consumer.seek(-13, 2) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 13) + + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest(queue, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + + consumer.stop() + + def test_zconsumer_gevent(self): + return self.test_zconsumer(driver_type=KAFKA_GEVENT_DRIVER) + + def test_zconsumer_thread(self): + return self.test_zconsumer(driver_type=KAFKA_THREAD_DRIVER) + + def test_zconsumer_pending(self, driver_type=KAFKA_PROCESS_DRIVER): + hosts = '%s:%d' % (self.server2.zk_host, self.server2.zk_port) + queue = "test_zookeeper_pending_%s" % (driver_type) + + # Produce 10 messages to partition 0 and 1 + produce1 = ProduceRequest(queue, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest(queue, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type) + + self.assertEquals(consumer.pending(), 20) + consumer.stop() + + def test_zconsumer_pending_gevent(self): + return self.test_zconsumer_pending(driver_type=KAFKA_GEVENT_DRIVER) + + def test_zconsumer_pending_thread(self): + return self.test_zconsumer_pending(driver_type=KAFKA_THREAD_DRIVER) + + + def test_zconsumer_rebalance(self, driver_type=KAFKA_PROCESS_DRIVER): + hosts = '%s:%d' % (self.server2.zk_host, self.server2.zk_port) + queue = "test_zconsumer_rebalance_%s" % (driver_type) + + # Produce 100 messages to partition 0 + produce1 = ProduceRequest(queue, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 100 messages to partition 1 + produce2 = ProduceRequest(queue, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumers = [] + + # Start first consumer (willing to have missed allocations) + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type, + ignore_non_allocation=True, + time_boundary=2) + + self.assertEquals(consumer.status(), 'ALLOCATED') + self.assertEquals(len(consumer.consumer.offsets), 2) + + consumers.append(consumer) + + # Start second consumer (willing to have missed allocations) + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type, + ignore_non_allocation=True, + time_boundary=2) + + consumers.append(consumer) + consumer.driver.sleep(15) + + for consumer in consumers: + self.assertEquals(consumer.status(), 'ALLOCATED') + self.assertEquals(len(consumer.consumer.offsets), 1) + + # Start a third consumer which is willing to have missed allocations + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type, + ignore_non_allocation=True, + time_boundary=2) + + consumers.append(consumer) + consumer.driver.sleep(15) + + allocated = [] + missed = [] + + for consumer in consumers: + if consumer.status() == 'ALLOCATED': + allocated.append(consumer) + self.assertEquals(len(consumer.consumer.offsets), 1) + elif consumer.status() == 'MISSED': + missed.append(consumer) + + self.assertEquals(len(allocated), 2) + self.assertEquals(len(missed), 1) + + all_messages = [] + + for consumer in allocated: + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + # Iterating through third consumer should be possible + missed_msgs = [] + + for consumer in missed: + for message in consumer: + missed_msgs.append(message) + + self.assertEquals(len(missed_msgs), 0) + + consumer1, consumer2 = allocated + consumer3, = missed + + # Stop the first consumer. This third one should pick it up + consumer1.stop() + self.assertEquals(consumer1.status(), 'INACTIVE') + + # Wait for a while for rebalancing + consumer2.driver.sleep(15) + + self.assertEquals(consumer2.status(), 'ALLOCATED') + self.assertEquals(consumer3.status(), 'ALLOCATED') + self.assertEquals(len(consumer2.consumer.offsets), 1) + self.assertEquals(len(consumer3.consumer.offsets), 1) + + # Stop the third consumer. This second one should pick up everything + consumer3.stop() + self.assertEquals(consumer3.status(), 'INACTIVE') + + # Wait for a while for rebalancing + consumer2.driver.sleep(15) + + self.assertEquals(consumer2.status(), 'ALLOCATED') + self.assertEquals(len(consumer2.consumer.offsets), 2) + + consumer2.stop() + self.assertEquals(consumer2.status(), 'INACTIVE') + + def test_zconsumer_rebalance_gevent(self): + return self.test_zconsumer_rebalance(driver_type=KAFKA_GEVENT_DRIVER) + + def test_zconsumer_rebalance_thread(self): + return self.test_zconsumer_rebalance(driver_type=KAFKA_THREAD_DRIVER) + if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) From f311cf41d728c4da914921052e53f21e44b82e88 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 13 Aug 2013 16:28:14 +0530 Subject: [PATCH 11/11] Fix minor bugs in zookeeper code --- kafka/client.py | 1 + kafka/zookeeper.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/client.py b/kafka/client.py index 8fae64cec..f66f1568f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -14,6 +14,7 @@ from kafka.common import * from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol +from kafka.util import KafkaConnectionError log = logging.getLogger("kafka") diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py index 08a10ad23..5ddf7eb23 100644 --- a/kafka/zookeeper.py +++ b/kafka/zookeeper.py @@ -107,7 +107,7 @@ def __init__(self, hosts, topic, chroot='/', **kwargs): self.zkclient.close() self.zkclient = None - def stop(): + def stop(self): self.producer.stop() self.client.close()