From 46cb09a71cac4f3e0549946db9a3c661c908f98a Mon Sep 17 00:00:00 2001 From: sachin Date: Tue, 11 Jun 2019 17:22:34 +0530 Subject: [PATCH 1/2] FYR-11936: Changes for python3. --- Makefile | 4 +- build_integration.sh | 2 +- kafka/consumer/base.py | 35 ++- kafka/consumer/multiprocess.py | 10 +- kafka/consumer/simple.py | 51 +++- kafka/errors.py | 15 + kafka/zookeeper.py | 476 ++++++++++++++++++++++++++++++ pylint.rc | 2 +- test/test_consumer_integration.py | 155 +++++++--- test/test_producer.py | 4 +- tox.ini | 1 + 11 files changed, 686 insertions(+), 69 deletions(-) create mode 100644 kafka/zookeeper.py diff --git a/Makefile b/Makefile index b4dcbffc9..765cfb6a4 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ # Some simple testing tasks (sorry, UNIX only). FLAGS= -KAFKA_VERSION=0.11.0.2 -SCALA_VERSION=2.12 +KAFKA_VERSION=0.8.1.1 +SCALA_VERSION=2.10 setup: pip install -r requirements-dev.txt diff --git a/build_integration.sh b/build_integration.sh index c6df0b26b..b228611e9 100755 --- a/build_integration.sh +++ b/build_integration.sh @@ -1,6 +1,6 @@ #!/bin/bash -: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.1 0.11.0.2 1.0.2 1.1.1 2.0.1"} +: ${ALL_RELEASES:="0.8.1.1 0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.1 0.11.0.2 1.0.2 1.1.1 2.0.1"} : ${SCALA_VERSION:=2.11} : ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/} : ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git} diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index a77ce7ea0..82a4d9f31 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -12,7 +12,6 @@ OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload) from kafka.util import ReentrantTimer - log = logging.getLogger('kafka.consumer') AUTO_COMMIT_MSG_COUNT = 100 @@ -21,8 +20,8 @@ FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 -FETCH_BUFFER_SIZE_BYTES = 4096 -MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 +FETCH_BUFFER_SIZE_BYTES = 262144 +MAX_FETCH_BUFFER_SIZE_BYTES = 157286400 ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 @@ -66,7 +65,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, self.auto_commit = auto_commit self.auto_commit_every_n = auto_commit_every_n self.auto_commit_every_t = auto_commit_every_t - + self.on_stop_callback = None # Set up the auto-commit timer if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, @@ -88,6 +87,10 @@ def cleanup(obj): self.partition_info = False # Do not return partition info in msgs + def register_on_stop_callback(self, fn): + if self.on_stop_callback is None: + self.on_stop_callback = fn + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -149,11 +152,11 @@ def commit(self, partitions=None): if partitions is None: # commit all partitions partitions = list(self.offsets.keys()) - log.debug('Committing new offsets for %s, partitions %s', + log.info('Committing new offsets for %s, partitions %s', self.topic, partitions) for partition in partitions: offset = self.offsets[partition] - log.debug('Commit offset %d in SimpleConsumer: ' + log.info('Commit offset %d in SimpleConsumer: ' 'group=%s, topic=%s, partition=%s', offset, self.group, self.topic, partition) @@ -185,7 +188,12 @@ def stop(self): if self.commit_timer is not None: self.commit_timer.stop() self.commit() - + if not self.auto_commit and self.on_stop_callback: + try: + log.info('executing "on_stop_callback"') + self.on_stop_callback() + except: + log.exception('There was an error executing "on_stop_callback"') if hasattr(self, '_cleanup_func'): # Remove cleanup handler now that we've stopped @@ -206,6 +214,19 @@ def stop(self): del self._cleanup_func + + def commit_offsets(self, offsets): + assert not self.auto_commit, 'cannot manually commit offsets if autocommit is True' + with self.commit_lock: + reqs = [] + for partition, offset in offsets.iteritems(): + reqs.append(OffsetCommitRequestPayload(self.topic, partition, + offset, None)) + resps = self.client.send_offset_commit_request(self.group, reqs) + for resp in resps: + check_error(resp) + self.count_since_commit = 0 + def pending(self, partitions=None): """ Gets the pending message count diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 758bb92f8..4e2df12c8 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -222,12 +222,12 @@ def __iter__(self): # a chance to run and put some messages in the queue # TODO: This is a hack and will make the consumer block for # at least one second. Need to find a better way of doing this - partition, message = self.queue.get(block=True, timeout=1) + meta, message = self.queue.get(block=True, timeout=1) except queue.Empty: break # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + 1 + self.offsets[meta.partition] = message.offset + 1 self.events.start.clear() self.count_since_commit += 1 self._auto_commit() @@ -271,14 +271,14 @@ def get_messages(self, count=1, block=True, timeout=10): block_next_call = block is True or block > len(messages) try: - partition, message = self.queue.get(block_next_call, + meta, message = self.queue.get(block_next_call, timeout) except queue.Empty: break - _msg = (partition, message) if self.partition_info else message + _msg = (meta, message) if self.partition_info else message messages.append(_msg) - new_offsets[partition] = message.offset + 1 + new_offsets[meta.partition] = message.offset + 1 count -= 1 if timeout is not None: timeout = max_time - time.time() diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index a6a64a58f..2ac7a58be 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -26,15 +26,16 @@ ) from kafka.errors import ( KafkaError, ConsumerFetchSizeTooSmall, - UnknownTopicOrPartitionError, NotLeaderForPartitionError, - OffsetOutOfRangeError, FailedPayloadsError, check_error + UnknownTopicOrPartitionError, NotLeaderForPartitionError, DefaultSimpleConsumerException, + OffsetOutOfRangeError, FailedPayloadsError, check_error, BufferTooLargeError ) from kafka.protocol.message import PartialMessage from kafka.structs import FetchRequestPayload, OffsetRequestPayload - +from collections import namedtuple log = logging.getLogger(__name__) - +MAX_QUEUE_SIZE = 10 * 1024 +META = namedtuple("meta", ["partition", "high_water_mark"]) class FetchContext(object): """ @@ -118,7 +119,9 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None, - auto_offset_reset='largest'): + auto_offset_reset='largest', + partition_info=False, + skip_buffer_size_error=True): warnings.warn('deprecated - this class will be removed in a future' ' release. Use KafkaConsumer instead.', DeprecationWarning) @@ -134,13 +137,17 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, 'max_buffer_size (%d)' % (buffer_size, max_buffer_size)) self.buffer_size = buffer_size + self.partition_info = partition_info self.max_buffer_size = max_buffer_size self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout self.auto_offset_reset = auto_offset_reset - self.queue = queue.Queue() + self.queue = queue.Queue(maxsize=MAX_QUEUE_SIZE) + self.skip_buffer_size_error = skip_buffer_size_error + self.error = DefaultSimpleConsumerException() + def __repr__(self): return '' % \ @@ -255,7 +262,7 @@ def seek(self, offset, whence=None, partition=None): if self.auto_commit: self.commit() - self.queue = queue.Queue() + self.queue = queue.Queue(maxsize=MAX_QUEUE_SIZE) def get_messages(self, count=1, block=True, timeout=0.1): """ @@ -289,10 +296,10 @@ def get_messages(self, count=1, block=True, timeout=0.1): continue break - partition, message = result - _msg = (partition, message) if self.partition_info else message + meta, message = result + _msg = result if self.partition_info else message messages.append(_msg) - new_offsets[partition] = message.offset + 1 + new_offsets[meta.partition] = message.offset + 1 # Update and commit offsets if necessary self.offsets.update(new_offsets) @@ -318,16 +325,14 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None, log.debug('internal queue empty, fetching more messages') with FetchContext(self, block, timeout): self._fetch() - if not block or time.time() > (start_at + timeout): break try: - partition, message = self.queue.get_nowait() - + meta, message = self.queue.get_nowait() if update_offset: # Update partition offset - self.offsets[partition] = message.offset + 1 + self.offsets[meta.partition] = message.offset + 1 # Count, check and commit messages if necessary self.count_since_commit += 1 @@ -336,13 +341,16 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None, if get_partition_info is None: get_partition_info = self.partition_info if get_partition_info: - return partition, message + return meta, message else: return message except queue.Empty: log.debug('internal queue empty after fetch - returning None') return None + def stop(self): + super(SimpleConsumer, self).stop() + def __iter__(self): if self.iter_timeout is None: timeout = ITER_TIMEOUT_SECONDS @@ -410,6 +418,7 @@ def _fetch(self): continue partition = resp.partition + high_water_mark = resp.highwaterMark buffer_size = partitions[partition] # Check for partial message @@ -426,6 +435,15 @@ def _fetch(self): if self.max_buffer_size is None: buffer_size *= 2 + if self.skip_buffer_size_error: + if self.buffer_size > MAX_FETCH_BUFFER_SIZE_BYTES: + log.error('Message size exceeded maximum allowed of {0}'.format(MAX_FETCH_BUFFER_SIZE_BYTES)) + log.error('Current buffer_size is: {0}'.format(self.buffer_size)) + log.error('topic: {0}, partition: {1}, offset:{2}'.format(self.topic, partition, self.fetch_offsets[partition])) + old_offset = self.fetch_offsets[partition] + self.seek(1, 1, partition=partition) + log.error('Incremented offset. New offset is: {0}'.format(self.offsets[partition])) + raise BufferTooLargeError(self.topic, partition, old_offset, self.offsets[partition]) else: buffer_size = min(buffer_size * 2, self.max_buffer_size) log.warning('Fetch size too small, increase to %d (2x) ' @@ -439,6 +457,7 @@ def _fetch(self): message) continue # Put the message in our queue - self.queue.put((partition, message)) + meta = META(partition, high_water_mark) + self.queue.put((meta, message), block=True) self.fetch_offsets[partition] = message.offset + 1 partitions = retry_partitions diff --git a/kafka/errors.py b/kafka/errors.py index f13f97853..baa1c3723 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -450,6 +450,9 @@ class KafkaUnavailableError(KafkaError): class KafkaTimeoutError(KafkaError): pass +class DefaultSimpleConsumerException(Exception): + pass + class FailedPayloadsError(KafkaError): def __init__(self, payload, *args): @@ -497,6 +500,18 @@ class KafkaConfigurationError(KafkaError): class QuotaViolationError(KafkaError): pass +class BufferTooLargeError(KafkaError): + + def __init__(self, topic, partition, old_offset, new_offset): + self.topic = topic + self.partition = partition + self.old_offset = old_offset + self.new_offset = new_offset + + def __repr__(self): + return 'topic: {0}, partition: {1}, old_offset: {2}, new_offset: {3}'.format(self.topic, self.partition, self.old_offset, self.new_offset) + + class AsyncProducerQueueFull(KafkaError): def __init__(self, failed_msgs, *args): diff --git a/kafka/zookeeper.py b/kafka/zookeeper.py new file mode 100644 index 000000000..f8a9dfeec --- /dev/null +++ b/kafka/zookeeper.py @@ -0,0 +1,476 @@ +""" +This is originally from: + +https://github.com/mahendra/kafka-python/blob/zookeeper/kafka/zookeeper.py + +It is modified in a few places to work with more recent KafkaClient. + +Also, multiprocess is substituted for threading. Since threading +is gevent friendly, where multiprocess is not. +""" +import logging +import threading +import os +import random +import socket +import uuid +import json +from functools import partial +from kafka.client import SimpleClient +from kafka.producer import SimpleProducer, KeyedProducer +from kafka.consumer import SimpleConsumer +from kazoo.exceptions import SessionExpiredError +from kazoo.client import KazooClient +import time +import sys + +if 'gevent' in sys.modules: + from kazoo.handlers.gevent import SequentialGeventHandler as kazoo_handler +else: + from kazoo.handlers.threading import SequentialThreadingHandler as kazoo_handler + + +BROKER_IDS_PATH = 'brokers/ids/' # Path where kafka stores broker info +PARTITIONER_PATH = 'python/kafka/' # Path to use for consumer co-ordination +DEFAULT_TIME_BOUNDARY = 10 +# how many attempts to create a valid partition +MAX_PARTITION_ALLOCATION_ATTEMPTS = 100 +# how much time to wait to create a valid partition +MAX_PARTITION_ALLOCATION_TIME = 360 # in seconds + +# Allocation states +ALLOCATION_COMPLETED = -1 +ALLOCATION_CHANGING = -2 +ALLOCATION_FAILED = -3 +ALLOCATION_MISSED = -4 +ALLOCATION_INACTIVE = -5 + + +log = logging.getLogger("kafka") +random.seed() + + +def _get_brokers(zkclient, chroot='/'): + """ + Get the list of available brokers registered in zookeeper + """ + brokers = [] + root = os.path.join(chroot, BROKER_IDS_PATH) + + for broker_id in zkclient.get_children(root): + path = os.path.join(root, broker_id) + info, _ = zkclient.get(path) + info = json.loads(info) + brokers.append((info['host'], info['port'])) + + log.debug("List of brokers fetched" + str(brokers)) + + random.shuffle(brokers) + return brokers + + +def get_client(zkclient, chroot='/'): + """ + Given a zookeeper client, return a KafkaClient instance for use + """ + brokers = _get_brokers(zkclient, chroot=chroot) + brokers = ["%s:%s" % (host, port) for (host, port) in brokers] + return SimpleClient(brokers) + + +# TODO: Make this a subclass of Producer later +class ZProducer(object): + """ + A base Zookeeper producer to be used by other producer classes + + Args + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to + chroot - The kafka subdirectory to search for brokers + """ + producer_kls = None + + def __init__(self, hosts, topic, chroot='/', **kwargs): + + if self.producer_kls is None: + raise NotImplementedError("Producer class needs to be mentioned") + + self.zkclient = KazooClient(hosts=hosts) + self.zkclient.start() + + # Start the producer instance + self.client = get_client(self.zkclient, chroot=chroot) + + self.producer = self.producer_kls(self.client, topic, **kwargs) + + # Stop Zookeeper + self.zkclient.stop() + self.zkclient.close() + self.zkclient = None + + def stop(self): + self.producer.stop() + self.client.close() + + +class ZSimpleProducer(ZProducer): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + + Args: + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to + """ + producer_kls = SimpleProducer + + def send_messages(self, *msg): + self.producer.send_messages(*msg) + + +class ZKeyedProducer(ZProducer): + """ + A producer which distributes messages to partitions based on a + partitioner function (class) and the key + + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + topic - The kafka topic to send messages to + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + """ + producer_kls = KeyedProducer + + def send(self, key, msg): + self.producer.send_messages(key, msg) + + +class DefaultZSimpleConsumerException(Exception): + pass + + +class ZSimpleConsumer(object): + """ + A consumer that uses Zookeeper to co-ordinate and share the partitions + of a topic with other consumers + + hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182) + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + chroot - The kafka subdirectory to search for brokers + driver_type: The driver type to use for the consumer + block_init: If True, the init method will block till the allocation is + completed. If not, it will return immediately and user can invoke + consumer.status() to check the status. Default True. + time_boundary: The time interval, in seconds, to wait out before deciding + on consumer changes in zookeeper. A higher value will ensure that a + consumer restart will not cause two re-balances. + (Default 10s) + ignore_non_allocation: If set to True, the consumer will ignore the + case where no partitions were allocated to it. + This can be used to keep consumers in stand-by. They will take over + when another consumer fails. (Default False) + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + + Partition allocation details + * When the consumer is initialized, it blocks till it gets an allocation + * If ignore_non_allocation is False, the consumer will throw an error + in init or during other operations + * During re-balancing of partitions, the consumer will not return any + messages (iteration or get_messages) + * After re-balancing, if the consumer does not get any partitions, + ignore_non_allocation will control it's behaviour + """ + + def __init__(self, + hosts, + group, + topic, + chroot='/', + block_init=True, + time_boundary=DEFAULT_TIME_BOUNDARY, + ignore_non_allocation=False, + **kwargs): + + # User is not allowed to specify partitions + if 'partitions' in kwargs: + raise ValueError("Partitions cannot be specified") + + self.ignore_non_allocation = ignore_non_allocation + self.time_boundary = time_boundary + + self.zkclient = KazooClient(hosts, handler=kazoo_handler()) + self.zkclient.start() + + self.client = get_client(self.zkclient, chroot=chroot) + self.client.load_metadata_for_topics(topic) + self.partitions = set(self.client.topic_partitions[topic]) + + # self.allocated = [ALLOCATION_CHANGING] * len(partitions) + + self.path = os.path.join(chroot, PARTITIONER_PATH, topic, group) + log.debug("Using path %s for co-ordination" % self.path) + + # Create a function which can be used for creating consumers + self.consumer = None + self.consumer_fact = partial(SimpleConsumer, + self.client, + group, + topic, + **kwargs) + self.consumer_topic = topic + self.consumer_group = group + + # Keep monitoring for changes + + # Design: + # * We will have a worker which will keep monitoring for rebalance + # * The worker and main consumer will share data via shared memory + # protected by a lock + # * If the worker gets new allocations, it will SET an Event() + # * The main consumer will check this event to change itself + # * Main consumer will SET another Event() to indicate worker to exit + + # This event will notify the worker to exit + self.exit = threading.Event() + + # Used by the worker to indicate that allocation has changed + self.changed = threading.Event() + + # The shared memory and lock used for sharing allocation info + self.lock = threading.Lock() + + # Initialize the array + # self._set_partitions(self.allocated, [], ALLOCATION_CHANGING) + self.consumer_state = ALLOCATION_CHANGING + + # create consumer id + hostname = socket.gethostname() + self.identifier = "%s-%s-%s-%s-%s" % (topic, + group, + hostname, + os.getpid(), + uuid.uuid4().hex) + log.info("Consumer id set to: %s" % self.identifier) + + self.got_error = False + self.error = DefaultZSimpleConsumerException() + + # Start the worker + self.partioner_thread = threading.Thread( + target=self._check_and_allocate) + + self.partioner_thread.daemon = True + self.partioner_thread.start() + + self.on_stop_callback = None + + def status(self): + """ + Returns the status of the consumer + """ + raise NotImplementedError("Method not implemented") + if self.consumer_state == ALLOCATION_COMPLETED: + return 'ALLOCATED' + elif self.consumer_state == ALLOCATION_CHANGING: + return 'ALLOCATING' + elif self.consumer_state == ALLOCATION_FAILED: + return 'FAILED' + elif self.consumer_state == ALLOCATION_MISSED: + return 'MISSED' + elif self.consumer_state == ALLOCATION_INACTIVE: + return 'INACTIVE' + + def _get_new_partitioner(self): + return self.zkclient.SetPartitioner(path=self.path, + set=self.partitions, + identifier=self.identifier, + time_boundary=self.time_boundary) + + def _check_and_allocate(self): + """ + Checks if a new allocation is needed for the partitions. + If so, co-ordinates with Zookeeper to get a set of partitions + allocated for the consumer + """ + old = None + + # Set up the partitioner + partitioner = self._get_new_partitioner() + + # Once allocation is done, sleep for some time between each checks + sleep_time = self.time_boundary / 2.0 + + partition_allocation_attempt = 0 + partition_allocation_start_time = time.time() + # Keep running the allocation logic till we are asked to exit + while not self.exit.is_set(): + log.info("ZK Partitoner state: %s, topic: %s, group: %s" % ( + partitioner.state, self.consumer_topic, self.consumer_group)) + try: + if partitioner.acquired: + # A new set of partitions has been acquired + + new = list(partitioner) + + # If there is a change, notify for a consumer change + if new != old: + log.info("Acquired partitions: %s" % str(new)) + if len(new) > 0: + self.consumer = self.consumer_fact(partitions=new) + self.consumer.register_on_stop_callback( + self.on_stop_callback) + else: + self.consumer = None + old = new + + # Wait for a while before checking again. In the meantime + # wake up if the user calls for exit + self.exit.wait(sleep_time) + + elif partitioner.release: + # We have been asked to release the partitions + + log.info("Releasing partitions for reallocation") + old = None + if self.consumer is not None: + self.consumer.stop() + partitioner.release_set() + + elif partitioner.failed: + # Partition allocation failed + + # Failure means we need to create a new SetPartitioner: + # see: + # http://kazoo.readthedocs.org/en/latest/api/recipe/partitioner.html + + log.error("Partitioner Failed. Creating new partitioner.") + + partitioner = self._get_new_partitioner() + + elif partitioner.allocating: + # We have to wait till the partition is allocated + partition_allocation_attempt += 1 + partition_allocation_end_time = time.time() + log.info( + "Waiting for partition allocation, topic: {0}, group: {1}, count: {2}, time: {3}".format( + self.consumer_topic, + self.consumer_group, + partition_allocation_attempt, + partition_allocation_end_time - + partition_allocation_start_time)) + if partition_allocation_attempt > MAX_PARTITION_ALLOCATION_ATTEMPTS or \ + partition_allocation_end_time - partition_allocation_start_time > MAX_PARTITION_ALLOCATION_TIME: + # we are probably spinning in a loop waiting for allocation + # reset the partitioner + + # this is the fix put in that solved the issue in QA when there + # were multiple demuxers. TODO: unclear why it did not happen with + # chronos / picasso which also use zookeeper client and have multiple + # instances. (Maybe just luck?) + if self.consumer is not None: + self.consumer.stop() + partitioner.release_set() + old = None + + # cleanup old one + partitioner.finish() + # create new one + partitioner = self._get_new_partitioner() + log.info('Creating new partitioner, as the old one was in ALLOCATING state for too many attempts, topic: {0}, group: {1}, count: {2}, time: {3}'.format( + self.consumer_topic, self.consumer_group, partition_allocation_attempt, partition_allocation_end_time - partition_allocation_start_time)) + partition_allocation_attempt = 0 + partition_allocation_start_time = time.time() + + partitioner.wait_for_acquire(timeout=1) + except SessionExpiredError as e: + log.error("Zookeeper session expired. Error:%s" % e) + self.error = e + self.got_error = True + break + except Exception as e: + log.error( + "Exception raised in partitioner thread. Error:%s" % + e) + self.error = e + self.got_error = True + break + + # Clean up + partitioner.finish() + + def __iter__(self): + """ + Iterate through data available in partitions allocated to this + instance + """ + raise NotImplementedError("Method not implemented") + + def get_messages(self, count=1, block=True, timeout=0.1): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ + + if self.got_error: + raise self.error + + if self.consumer is None: + # This is needed in cases where gevent is used with + # a thread that does not have any calls that would yield. + # If we do not sleep here a greenlet could spin indefinitely. + time.sleep(0) + return [] + + return self.consumer.get_messages(count, block, timeout) + + def stop(self): + self.exit.set() + self.partioner_thread.join() + self.zkclient.stop() + self.zkclient.close() + self.client.close() + + def commit(self): + if self.consumer: + self.consumer.commit() + + def commit_offsets(self, offsets): + if self.consumer: + self.consumer.commit_offsets(offsets) + + def register_on_stop_callback(self, fn): + self.on_stop_callback = fn + + def seek(self, *args, **kwargs): + if self.consumer is None: + raise RuntimeError("Error in partition allocation") + elif not self.consumer: + raise RuntimeError("Waiting for partition allocation") + return self.consumer.seek(*args, **kwargs) + + def pending(self): + if self.consumer is None: + raise RuntimeError("Error in partition allocation") + elif not self.consumer: + # We are in a transition/suspended state + return 0 + + return self.consumer.pending() diff --git a/pylint.rc b/pylint.rc index 851275bcc..95839da69 100644 --- a/pylint.rc +++ b/pylint.rc @@ -4,4 +4,4 @@ ignored-modules=kafka.vendor.six.moves generated-members=py.* [MESSAGES CONTROL] -disable=E1129 +disable=E1129,E1102 diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index fdffd05a7..6014fedcb 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -22,7 +22,7 @@ from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp ) - +from kafka.zookeeper import ZSimpleConsumer from test.fixtures import ZookeeperFixture, KafkaFixture, random_string, version from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer @@ -156,18 +156,134 @@ def test_simple_consumer(self): # Start a consumer consumer = self.consumer() - self.assert_message_count([ message for message in consumer ], 200) consumer.stop() + def test_zconsumer(self): + hosts = '%s:%d' % (self.server2.zk_host, self.server2.zk_port) + print(hosts) + # Produce 100 messages to partition 0 + produce1 = self.send_messages(0, [("Test message 0 %d" % i) for i in range(100)]) + + # Produce 100 messages to partition 1 + produce2 = self.send_messages(1,[ + ("Test message 1 %d" % i) for i in range(100)]) + + + # Start a consumer + consumer = ZSimpleConsumer(hosts, "group1", self.topic, + chroot=self.server2.zk_chroot) + #all_messages = [] + #for message in consumer: + # all_messages.append(message) + #all_messages = consumer.get_messages(block=True, timeout=5) + #self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + #self.assertEquals(len(all_messages), len(set(all_messages))) + time.sleep(15) + consumer.seek(-10, 2) + all_messages = [] + all_messages = consumer.get_messages(count=10,block=True, timeout=5) + + self.assertEquals(len(all_messages), 10) + + consumer.seek(-13, 2) + all_messages = [] + all_messages = consumer.get_messages(count=13,block=True, timeout=5) + + self.assertEquals(len(all_messages), 13) + + # Blocking API + with Timer() as t: + messages = consumer.get_messages(block=True, timeout=5) + self.assertEqual(len(messages), 0) + self.assertGreaterEqual(t.interval, 5) + + # Send 10 messages + self.send_messages(1,[ + ("Test message 1 %d" % i) for i in range(10)]) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + with Timer() as t: + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + self.assertGreaterEqual(t.interval, 5) + + consumer.stop() + + def test_zconsumer_rebalance(self): + hosts = '%s:%d' % (self.server2.zk_host, self.server2.zk_port) + + # Produce 100 messages to partition 0 + produce1 = self.send_messages(0, [("Test message 0 %d" % i) for i in range(100)]) + + # Produce 100 messages to partition 1 + produce2 = self.send_messages(1,[ + ("Test message 1 %d" % i) for i in range(100)]) + + consumers = [] + + # Start first consumer (willing to have missed allocations) + consumer1 = ZSimpleConsumer(hosts, "group1", self.topic, + chroot=self.server2.zk_chroot, + ignore_non_allocation=True, + time_boundary=2) + time.sleep(5) + self.assertEquals(len(consumer1.consumer.offsets), 2) + + consumers.append(consumer1) + + # Start second consumer (willing to have missed allocations) + consumer2 = ZSimpleConsumer(hosts, "group1", self.topic, + chroot=self.server2.zk_chroot, + ignore_non_allocation=True, + time_boundary=2) + + consumers.append(consumer2) + time.sleep(5) + + for consumer in consumers: + self.assertEquals(len(consumer.consumer.offsets), 1) + + # Start a third consumer which is willing to have missed allocations + consumer3 = ZSimpleConsumer(hosts, "group1", self.topic, + chroot=self.server2.zk_chroot, + ignore_non_allocation=True, + time_boundary=2) + + consumers.append(consumer3) + time.sleep(5) + # Stop the first consumer. This third one should pick it up + consumer1.stop() + + # Wait for a while for rebalancing + time.sleep(5) + + self.assertEquals(len(consumer2.consumer.offsets), 1) + self.assertEquals(len(consumer3.consumer.offsets), 1) + + # Stop the third consumer. This second one should pick up everything + consumer3.stop() + + + # Wait for a while for rebalancing + time.sleep(5) + + self.assertEquals(len(consumer2.consumer.offsets), 2) + + consumer2.stop() + def test_simple_consumer_gzip(self): self.send_gzip_message(0, range(0, 100)) self.send_gzip_message(1, range(100, 200)) # Start a consumer consumer = self.consumer() - self.assert_message_count([ message for message in consumer ], 200) consumer.stop() @@ -421,44 +537,13 @@ def test_large_messages(self): # full MessageSet is smaller than max_bytes. # For that reason, we set the max buffer size to a little more # than the size of all large messages combined - consumer = self.consumer(max_buffer_size=60000) - + consumer = self.consumer(max_buffer_size=270000) expected_messages = set(small_messages + large_messages) actual_messages = set([ x.message.value for x in consumer ]) self.assertEqual(expected_messages, actual_messages) consumer.stop() - def test_huge_messages(self): - huge_message, = self.send_messages(0, [ - create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)), - ]) - - # Create a consumer with the default buffer size - consumer = self.consumer() - - # This consumer fails to get the message - with self.assertRaises(ConsumerFetchSizeTooSmall): - consumer.get_message(False, 0.1) - - consumer.stop() - - # Create a consumer with no fetch size limit - big_consumer = self.consumer( - max_buffer_size = None, - partitions = [0], - ) - - # Seek to the last message - big_consumer.seek(-1, 2) - - # Consume giant message successfully - message = big_consumer.get_message(block=False, timeout=10) - self.assertIsNotNone(message) - self.assertEqual(message.message.value, huge_message) - - big_consumer.stop() - @kafka_versions('>=0.8.1') def test_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) diff --git a/test/test_producer.py b/test/test_producer.py index 60b19bfb9..1951ecae5 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -23,7 +23,7 @@ def test_buffer_pool(): @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy']) def test_end_to_end(kafka_broker, compression): if compression == 'lz4': @@ -81,7 +81,7 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, diff --git a/tox.ini b/tox.ini index 48a143eea..d3ef7ea04 100644 --- a/tox.ini +++ b/tox.ini @@ -19,6 +19,7 @@ deps = lz4 xxhash crc32c + kazoo py26: unittest2 commands = py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} From b2911870106a20abf06808874a6efb94792374de Mon Sep 17 00:00:00 2001 From: sachin Date: Fri, 14 Jun 2019 13:16:22 +0530 Subject: [PATCH 2/2] FYR-11936: Removing redundant error. --- kafka/consumer/simple.py | 5 ++--- kafka/errors.py | 3 --- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 2ac7a58be..d95bcf592 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -26,7 +26,7 @@ ) from kafka.errors import ( KafkaError, ConsumerFetchSizeTooSmall, - UnknownTopicOrPartitionError, NotLeaderForPartitionError, DefaultSimpleConsumerException, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error, BufferTooLargeError ) from kafka.protocol.message import PartialMessage @@ -146,7 +146,6 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.auto_offset_reset = auto_offset_reset self.queue = queue.Queue(maxsize=MAX_QUEUE_SIZE) self.skip_buffer_size_error = skip_buffer_size_error - self.error = DefaultSimpleConsumerException() def __repr__(self): @@ -458,6 +457,6 @@ def _fetch(self): continue # Put the message in our queue meta = META(partition, high_water_mark) - self.queue.put((meta, message), block=True) + self.queue.put((meta, message)) self.fetch_offsets[partition] = message.offset + 1 partitions = retry_partitions diff --git a/kafka/errors.py b/kafka/errors.py index baa1c3723..2b324b703 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -450,9 +450,6 @@ class KafkaUnavailableError(KafkaError): class KafkaTimeoutError(KafkaError): pass -class DefaultSimpleConsumerException(Exception): - pass - class FailedPayloadsError(KafkaError): def __init__(self, payload, *args):