diff --git a/README.md b/README.md index 587204a28..9f73cd5f0 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,39 @@ consumer = MultiConsumer(kafka, "my-group", "my-topic", driver_type=KAFKA_THREAD_DRIVER) ``` +## Zookeeper support +The Zookeeper supports creating a producer and SimpleConsumer. +The Zookeeper consumer takes care of rebalancing partitions for a topic +among a consumer-group. + +NOTE: This will work only with other kafka-python clients and will not +work with Java/Scala clients (this is a TODO) +```python +from kafka.zookeeper import ZSimpleProducer, ZKeyedProducer +from kafka.zookeeper import ZSimpleConsumer +from kafka.partitioner import HashedPartitioner + +# Zookeeper SimpleProducer +# Takes all arguments similar to SimpleProducer +producer = ZSimpleProducer("127.0.0.1:2181", "my-topic") +producer.send_messages("msg1", "msg2") + +# Zookeeper KeyedProducer +# Takes all arguments similar to KeyedProducer +producer = ZKeyedProducer("127.0.0.1:2181,127.0.0.1:2182", "my-topic", + partitioner=HashedPartitioner) +producer.send("key1", "msg1") + +# Zookeeper consumer. +# Takes all arguments similar to SimpleConsumer +consumer = ZSimpleConsumer("127.0.0.1:2181", "my-group", "my-topic") + +for msg in consumer: + print msg + +consumer.get_messages(block=True, timeout=10) +``` + ## Low level ```python diff --git a/kafka/__init__.py b/kafka/__init__.py index e43edebd1..9589dd3a5 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -17,6 +17,7 @@ from kafka.producer import SimpleProducer, KeyedProducer from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer, MultiConsumer +from kafka.zookeeper import ZSimpleProducer, ZKeyedProducer, ZSimpleConsumer __all__ = [ 'KAFKA_THREAD_DRIVER', 'KAFKA_GEVENT_DRIVER', 'KAFKA_PROCESS_DRIVER', @@ -24,5 +25,6 @@ 'SimpleProducer', 'KeyedProducer', 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', 'MultiConsumer', + 'ZSimpleProducer', 'ZKeyedProducer', 'ZSimpleConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' ] diff --git a/kafka/client.py b/kafka/client.py index 7e05e0f61..0cc17962a 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -7,12 +7,14 @@ import os import socket import struct +import sys import time import zlib from kafka.common import * from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol +from kafka.util import KafkaConnectionError log = logging.getLogger("kafka") @@ -170,12 +172,18 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): correlation_id=requestId, payloads=payloads) # Send the request, recv the response - conn.send(requestId, request) + try: + conn.send(requestId, request) + response = conn.recv(requestId) + except (socket.error, KafkaConnectionError) as exp: + log.error("Error in broker", exc_info=sys.exc_info()) + # Remove this broker information from the connection pool + self.conns.pop(broker, None) + raise if decoder_fn is None: continue - response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response diff --git a/kafka/common.py b/kafka/common.py index 199db92fc..218fc1553 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -1,6 +1,8 @@ from collections import namedtuple import Queue +from kazoo.handlers.threading import SequentialThreadingHandler +from kazoo.handlers.gevent import SequentialGeventHandler import multiprocessing import threading import socket @@ -9,6 +11,7 @@ import gevent.queue import gevent.pool import gevent.socket +import gevent.coros import time ############### @@ -74,11 +77,15 @@ def __init__(self, driver_type): self.Queue = Queue.Queue self.Event = threading.Event self.Proc = threading.Thread + self.kazoo_handler = SequentialThreadingHandler + self.Lock = threading.Lock elif driver_type == KAFKA_PROCESS_DRIVER: self.Queue = multiprocessing.Queue self.Event = multiprocessing.Event self.Proc = multiprocessing.Process + self.kazoo_handler = SequentialThreadingHandler + self.Lock = multiprocessing.Lock elif driver_type == KAFKA_GEVENT_DRIVER: self.Queue = gevent.queue.Queue @@ -86,6 +93,8 @@ def __init__(self, driver_type): self.socket = gevent.socket self.Proc = self.gevent_proc self.sleep = gevent.sleep + self.kazoo_handler = SequentialGeventHandler + self.Lock = gevent.coros.Semaphore def gevent_proc(self, target=None, args=(), kwargs=None): kwargs = {} if kwargs is None else kwargs diff --git a/kafka/conn.py b/kafka/conn.py index fca914dcc..c8ad397f1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,8 +2,11 @@ import socket import struct +from kafka.util import KafkaConnectionError, BufferUnderflowError + log = logging.getLogger("kafka") + class KafkaConnection(object): """ A socket connection to a single Kafka broker @@ -47,7 +50,7 @@ def _consume_response_iter(self): # Read the size off of the header resp = self._sock.recv(4) if resp == "": - raise Exception("Got no response from Kafka") + raise KafkaConnectionError("Got no response from Kafka") (size,) = struct.unpack('>i', resp) messageSize = size - 4 @@ -75,8 +78,9 @@ def send(self, requestId, payload): log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), requestId)) sent = self._sock.sendall(payload) + if sent != None: - raise RuntimeError("Kafka went away") + raise KafkaConnectionError("Kafka went away") def recv(self, requestId): "Get a response from Kafka" diff --git a/kafka/util.py b/kafka/util.py index 8318ac5c3..18195c350 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -71,3 +71,7 @@ class BufferUnderflowError(Exception): class ChecksumError(Exception): pass + + +class KafkaConnectionError(Exception): + pass diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py new file mode 100644 index 000000000..5ddf7eb23 --- /dev/null +++ b/kafka/zookeeper.py @@ -0,0 +1,493 @@ +# Zookeeper support for kafka clients + +import logging +import multiprocessing +import os +import random +import sys +import time +import uuid +try: + import simplejson as json +except ImportError: + import json + +from functools import partial +from Queue import Empty + +from kafka.client import KafkaClient +from kafka.producer import SimpleProducer, KeyedProducer +from kafka.consumer import SimpleConsumer +from kazoo.client import KazooClient +from kafka.common import ( + KAFKA_PROCESS_DRIVER, KAFKA_THREAD_DRIVER, KAFKA_GEVENT_DRIVER, + KafkaDriver +) + + +BROKER_IDS_PATH = 'brokers/ids/' # Path where kafka stores broker info +PARTITIONER_PATH = 'python/kafka/' # Path to use for consumer co-ordination +DEFAULT_TIME_BOUNDARY = 5 + +# Allocation states +ALLOCATION_COMPLETED = -1 +ALLOCATION_CHANGING = -2 +ALLOCATION_FAILED = -3 +ALLOCATION_MISSED = -4 +ALLOCATION_INACTIVE = -5 + + +log = logging.getLogger("kafka") +random.seed() + + +def _get_brokers(zkclient, chroot='/'): + """ + Get the list of available brokers registered in zookeeper + """ + brokers = [] + root = os.path.join(chroot, BROKER_IDS_PATH) + + for broker_id in zkclient.get_children(root): + path = os.path.join(root, broker_id) + info, _ = zkclient.get(path) + info = json.loads(info) + brokers.append((info['host'], info['port'])) + + log.debug("List of brokers fetched" + str(brokers)) + + random.shuffle(brokers) + return brokers + + +def get_client(zkclient, chroot='/'): + """ + Given a zookeeper client, return a KafkaClient instance for use + """ + brokers = _get_brokers(zkclient, chroot=chroot) + client = None + + for host, port in brokers: + try: + return KafkaClient(host, port) + except Exception as exp: + log.error("Error while connecting to %s:%d" % (host, port), + exc_info=sys.exc_info()) + + raise RuntimeError("Unable to find any running broker") + + +# TODO: Make this a subclass of Producer later +class ZProducer(object): + """ + A base Zookeeper producer to be used by other producer classes + + Args + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to + chroot - The kafka subdirectory to search for brokers + """ + producer_kls = None + + def __init__(self, hosts, topic, chroot='/', **kwargs): + + if self.producer_kls is None: + raise NotImplemented("Producer class needs to be mentioned") + + self.zkclient = KazooClient(hosts=hosts) + self.zkclient.start() + + # Start the producer instance + self.client = get_client(self.zkclient, chroot=chroot) + self.producer = self.producer_kls(self.client, topic, **kwargs) + + # Stop Zookeeper + self.zkclient.stop() + self.zkclient.close() + self.zkclient = None + + def stop(self): + self.producer.stop() + self.client.close() + + +class ZSimpleProducer(ZProducer): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + + Args: + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to + """ + producer_kls = SimpleProducer + + def send_messages(self, *msg): + self.producer.send_messages(*msg) + + +class ZKeyedProducer(ZProducer): + """ + A producer which distributes messages to partitions based on a + partitioner function (class) and the key + + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + """ + producer_kls = KeyedProducer + + def send(self, key, msg): + self.producer.send(key, msg) + + +class ZSimpleConsumer(object): + """ + A consumer that uses Zookeeper to co-ordinate and share the partitions + of a topic with other consumers + + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + chroot - The kafka subdirectory to search for brokers + driver_type: The driver type to use for the consumer + block_init: If True, the init method will block till the allocation is + completed. If not, it will return immediately and user can invoke + consumer.status() to check the status. Default True. + time_boundary: The time interval, in seconds, to wait out before deciding + on consumer changes in zookeeper. A higher value will ensure that a + consumer restart will not cause two re-balances. + (Default 10s) + ignore_non_allocation: If set to True, the consumer will ignore the + case where no partitions were allocated to it. + This can be used to keep consumers in stand-by. They will take over + when another consumer fails. (Default False) + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + + Partition allocation details + * When the consumer is initialized, it blocks till it gets an allocation + * If ignore_non_allocation is False, the consumer will throw an error + in init or during other operations + * During re-balancing of partitions, the consumer will not return any + messages (iteration or get_messages) + * After re-balancing, if the consumer does not get any partitions, + ignore_non_allocation will control it's behaviour + """ + def __init__(self, hosts, group, topic, chroot='/', + driver_type=KAFKA_PROCESS_DRIVER, + block_init=True, + time_boundary=DEFAULT_TIME_BOUNDARY, + ignore_non_allocation=False, + **kwargs): + + # User is not allowed to specify partitions + if 'partitions' in kwargs: + raise ValueError("Partitions cannot be specified") + + self.driver = KafkaDriver(driver_type) + self.ignore_non_allocation = ignore_non_allocation + self.time_boundary = time_boundary + + zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) + zkclient.start() + + self.client = get_client(zkclient, chroot=chroot) + self.client._load_metadata_for_topics(topic) + partitions = set(self.client.topic_partitions[topic]) + + # create consumer id + hostname = self.driver.socket.gethostname() + self.identifier = "%s-%s-%s" % (topic, group, hostname) + log.debug("Consumer id set to: %s" % self.identifier) + + path = os.path.join(chroot, PARTITIONER_PATH, topic, group) + log.debug("Using path %s for co-ordination" % path) + + # Create a function which can be used for creating consumers + self.consumer = [] + self.consumer_fact = partial(SimpleConsumer, + self.client, group, topic, + driver_type=driver_type, **kwargs) + + # Keep monitoring for changes + + # Design: + # * We will have a worker which will keep monitoring for rebalance + # * The worker and main consumer will share data via shared memory + # protected by a lock + # * If the worker gets new allocations, it will SET an Event() + # * The main consumer will check this event to change itself + # * Main consumer will SET another Event() to indicate worker to exit + + # This event will notify the worker to exit + self.exit = self.driver.Event() + + # Used by the worker to indicate that allocation has changed + self.changed = self.driver.Event() + + # The shared memory and lock used for sharing allocation info + self.lock = self.driver.Lock() + self.allocated = multiprocessing.Array('i', len(partitions)) + + # Initialize the array + self._set_partitions(self.allocated, [], ALLOCATION_CHANGING) + self.consumer_state = ALLOCATION_CHANGING + + # Start the worker + self.proc = self.driver.Proc(target=self._check_and_allocate, + args=(hosts, path, partitions, + self.allocated)) + self.proc.daemon = True + self.proc.start() + + # Stop the Zookeeper client (worker will create one itself) + zkclient.stop() + zkclient.close() + + # Do the setup once and block till we get an allocation + self._set_consumer(block=block_init, timeout=None) + + def __repr__(self): + """ + Give a string representation of the consumer + """ + partitions = filter(lambda x: x>=0, self.allocated) + message = ','.join([str(i) for i in partitions]) + + if not message: + message = self.status() + + return u'ZSimpleConsumer<%s>' % message + + def status(self): + """ + Returns the status of the consumer + """ + self._set_consumer(block=False) + + if self.consumer_state == ALLOCATION_COMPLETED: + return 'ALLOCATED' + elif self.consumer_state == ALLOCATION_CHANGING: + return 'ALLOCATING' + elif self.consumer_state == ALLOCATION_FAILED: + return 'FAILED' + elif self.consumer_state == ALLOCATION_MISSED: + return 'MISSED' + elif self.consumer_state == ALLOCATION_INACTIVE: + return 'INACTIVE' + + def _set_partitions(self, array, partitions, filler): + """ + Update partition info in the shared memory array + """ + i = 0 + for partition in partitions: + array[i] = partition + i += 1 + + while i < len(array): + array[i] = filler + i += 1 + + def _set_consumer(self, block=False, timeout=None): + """ + Check if a new consumer has to be created because of a re-balance + """ + if not block: + timeout = 0 + + if not self.changed.wait(timeout=timeout): + return + + # There is a change. Get our new partitions + with self.lock: + partitions = [p for p in self.allocated] + self.changed.clear() + + # If we have a consumer clean it up + if self.consumer: + self.consumer.stop() + + self.consumer = None + self.consumer_state = partitions[0] + + if self.consumer_state == ALLOCATION_MISSED: + # Check if we must change the consumer + if not self.ignore_non_allocation: + raise RuntimeError("Did not get any partition allocation") + else: + log.info("No partitions allocated. Ignoring") + self.consumer = [] + + elif self.consumer_state == ALLOCATION_FAILED: + # Allocation has failed. Nothing we can do about it + raise RuntimeError("Error in partition allocation") + + elif self.consumer_state == ALLOCATION_CHANGING: + # Allocation is changing because of consumer changes + self.consumer = [] + log.info("Partitions are being reassigned") + return + + elif self.consumer_state == ALLOCATION_INACTIVE: + log.info("Consumer is inactive") + else: + # Create a new consumer + partitions = filter(lambda x: x >= 0, partitions) + self.consumer = self.consumer_fact(partitions=partitions) + self.consumer_state = ALLOCATION_COMPLETED + log.info("Reinitialized consumer with %s" % partitions) + + + def _check_and_allocate(self, hosts, path, partitions, array): + """ + Checks if a new allocation is needed for the partitions. + If so, co-ordinates with Zookeeper to get a set of partitions + allocated for the consumer + """ + + old = None + + # Start zookeeper connection again + zkclient = KazooClient(hosts, handler=self.driver.kazoo_handler()) + zkclient.start() + + identifier = '%s-%d-%s' % (self.identifier, + os.getpid(), + uuid.uuid4().hex) + + # Set up the partitioner + partitioner = zkclient.SetPartitioner(path=path, set=partitions, + identifier=identifier, + time_boundary=self.time_boundary) + + # Once allocation is done, sleep for some time between each checks + sleep_time = self.time_boundary / 2.0 + + # Keep running the allocation logic till we are asked to exit + while not self.exit.is_set(): + if partitioner.acquired: + # A new set of partitions has been acquired + new = list(partitioner) + + # If there is a change, notify for a consumer change + if new != old: + log.info("Acquired partitions: %s" % str(new)) + old = new + with self.lock: + self._set_partitions(array, new, ALLOCATION_MISSED) + self.changed.set() + + # Wait for a while before checking again. In the meantime + # wake up if the user calls for exit + self.exit.wait(sleep_time) + + elif partitioner.release: + # We have been asked to release the partitions + log.info("Releasing partitions for reallocation") + old = None + with self.lock: + self._set_partitions(array, [], ALLOCATION_CHANGING) + self.changed.set() + + partitioner.release_set() + + elif partitioner.failed: + # Partition allocation failed + old = [] + with self.lock: + self._set_partitions(array, old, ALLOCATION_FAILED) + self.changed.set() + break + + elif partitioner.allocating: + # We have to wait till the partition is allocated + log.info("Waiting for partition allocation") + partitioner.wait_for_acquire() + + # Clean up + partitioner.finish() + + with self.lock: + self._set_partitions(array, [], ALLOCATION_INACTIVE) + self.changed.set() + + zkclient.stop() + zkclient.close() + + def __iter__(self): + """ + Iterate through data available in partitions allocated to this + instance + """ + self._set_consumer(block=False) + + if self.consumer is None: + raise RuntimeError("Error in partition allocation") + + for msg in self.consumer: + yield msg + self._set_consumer(block=False) + + def get_messages(self, count=1, block=True, timeout=0.1): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ + self._set_consumer(block, timeout) + + if self.consumer is None: + raise RuntimeError("Error in partition allocation") + elif not self.consumer: + return [] + + return self.consumer.get_messages(count, block, timeout) + + def stop(self): + self.exit.set() + self.proc.join() + self._set_consumer(block=True) + self.client.close() + + def commit(self): + if self.consumer: + self.consumer.commit() + self._set_consumer(block=False) + + def seek(self, *args, **kwargs): + self._set_consumer() + + if self.consumer is None: + raise RuntimeError("Error in partition allocation") + elif not self.consumer: + raise RuntimeError("Waiting for partition allocation") + + return self.consumer.seek(*args, **kwargs) + + def pending(self): + if self.consumer is None: + raise RuntimeError("Error in partition allocation") + elif not self.consumer: + # We are in a transition/suspended state + return 0 + + return self.consumer.pending() diff --git a/setup.py b/setup.py index e383e81f2..c3d225ee9 100644 --- a/setup.py +++ b/setup.py @@ -21,8 +21,8 @@ def run(self): name="kafka-python", version="0.8.1-1", - install_requires=["distribute", "tox", "gevent"], - tests_require=["tox", "gevent"], + install_requires=["distribute", "tox", "gevent", "kazoo"], + tests_require=["tox", "gevent", "kazoo"], cmdclass={"test": Tox}, packages=["kafka"], diff --git a/test/test_integration.py b/test/test_integration.py index 6de33dc15..a0644476b 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -778,15 +778,252 @@ def test_multi_proc_pending_gevent(self): def test_multi_proc_pending_thread(self): return self.test_multi_proc_pending(driver_type=KAFKA_THREAD_DRIVER) - def test_large_messages(self): - # Produce 10 "normal" size messages - messages1 = [create_message(random_string(1024)) for i in range(10)] - produce1 = ProduceRequest("test_large_messages", 0, messages1) + def test_zconsumer(self, driver_type=KAFKA_PROCESS_DRIVER): + hosts = '%s:%d' % (self.server2.zk_host, self.server2.zk_port) + queue = "test_zconsumer_%s" % (driver_type) + + # Produce 100 messages to partition 0 + produce1 = ProduceRequest(queue, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 100 messages to partition 1 + produce2 = ProduceRequest(queue, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Start a consumer + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + consumer.seek(-10, 2) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 10) + + consumer.seek(-13, 2) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 13) + + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest(queue, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + + consumer.stop() + + def test_zconsumer_gevent(self): + return self.test_zconsumer(driver_type=KAFKA_GEVENT_DRIVER) + + def test_zconsumer_thread(self): + return self.test_zconsumer(driver_type=KAFKA_THREAD_DRIVER) + + def test_zconsumer_pending(self, driver_type=KAFKA_PROCESS_DRIVER): + hosts = '%s:%d' % (self.server2.zk_host, self.server2.zk_port) + queue = "test_zookeeper_pending_%s" % (driver_type) + + # Produce 10 messages to partition 0 and 1 + produce1 = ProduceRequest(queue, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest(queue, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type) + + self.assertEquals(consumer.pending(), 20) + consumer.stop() + + def test_zconsumer_pending_gevent(self): + return self.test_zconsumer_pending(driver_type=KAFKA_GEVENT_DRIVER) + + def test_zconsumer_pending_thread(self): + return self.test_zconsumer_pending(driver_type=KAFKA_THREAD_DRIVER) + + + def test_zconsumer_rebalance(self, driver_type=KAFKA_PROCESS_DRIVER): + hosts = '%s:%d' % (self.server2.zk_host, self.server2.zk_port) + queue = "test_zconsumer_rebalance_%s" % (driver_type) + + # Produce 100 messages to partition 0 + produce1 = ProduceRequest(queue, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) + # Produce 100 messages to partition 1 + produce2 = ProduceRequest(queue, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumers = [] + + # Start first consumer (willing to have missed allocations) + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type, + ignore_non_allocation=True, + time_boundary=2) + + self.assertEquals(consumer.status(), 'ALLOCATED') + self.assertEquals(len(consumer.consumer.offsets), 2) + + consumers.append(consumer) + + # Start second consumer (willing to have missed allocations) + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type, + ignore_non_allocation=True, + time_boundary=2) + + consumers.append(consumer) + consumer.driver.sleep(15) + + for consumer in consumers: + self.assertEquals(consumer.status(), 'ALLOCATED') + self.assertEquals(len(consumer.consumer.offsets), 1) + + # Start a third consumer which is willing to have missed allocations + consumer = ZSimpleConsumer(hosts, "group1", queue, + chroot=self.server2.zk_chroot, + driver_type=driver_type, + ignore_non_allocation=True, + time_boundary=2) + + consumers.append(consumer) + consumer.driver.sleep(15) + + allocated = [] + missed = [] + + for consumer in consumers: + if consumer.status() == 'ALLOCATED': + allocated.append(consumer) + self.assertEquals(len(consumer.consumer.offsets), 1) + elif consumer.status() == 'MISSED': + missed.append(consumer) + + self.assertEquals(len(allocated), 2) + self.assertEquals(len(missed), 1) + + all_messages = [] + + for consumer in allocated: + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + # Iterating through third consumer should be possible + missed_msgs = [] + + for consumer in missed: + for message in consumer: + missed_msgs.append(message) + + self.assertEquals(len(missed_msgs), 0) + + consumer1, consumer2 = allocated + consumer3, = missed + + # Stop the first consumer. This third one should pick it up + consumer1.stop() + self.assertEquals(consumer1.status(), 'INACTIVE') + + # Wait for a while for rebalancing + consumer2.driver.sleep(15) + + self.assertEquals(consumer2.status(), 'ALLOCATED') + self.assertEquals(consumer3.status(), 'ALLOCATED') + self.assertEquals(len(consumer2.consumer.offsets), 1) + self.assertEquals(len(consumer3.consumer.offsets), 1) + + # Stop the third consumer. This second one should pick up everything + consumer3.stop() + self.assertEquals(consumer3.status(), 'INACTIVE') + + # Wait for a while for rebalancing + consumer2.driver.sleep(15) + + self.assertEquals(consumer2.status(), 'ALLOCATED') + self.assertEquals(len(consumer2.consumer.offsets), 2) + + consumer2.stop() + self.assertEquals(consumer2.status(), 'INACTIVE') + + def test_zconsumer_rebalance_gevent(self): + return self.test_zconsumer_rebalance(driver_type=KAFKA_GEVENT_DRIVER) + + def test_zconsumer_rebalance_thread(self): + return self.test_zconsumer_rebalance(driver_type=KAFKA_THREAD_DRIVER) + + def test_large_messages(self): + # Produce 10 "normal" size messages + messages1 = [create_message(random_string(1024)) for i in range(10)] + produce1 = ProduceRequest("test_large_messages", 0, messages1) # Produce 10 messages that are too large (bigger than default fetch size) messages2=[create_message(random_string(5000)) for i in range(10)] produce2 = ProduceRequest("test_large_messages", 0, messages2)