diff --git a/.travis.yml b/.travis.yml index c1b71d70f..f7d0ed4eb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,18 +1,19 @@ language: python python: - - 2.6 - 2.7 before_install: - git submodule update --init --recursive - cd kafka-src - - ./sbt update - - ./sbt package + - ./sbt clean update package assembly-package-dependency - cd - install: - pip install . + - pip install . + # Deal with issue on Travis builders re: multiprocessing.Queue :( + # See https://github.com/travis-ci/travis-cookbooks/issues/155 + - sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm script: - python -m test.test_unit diff --git a/AUTHORS.md b/AUTHORS.md index f8b3c6c7a..8f168c643 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -4,8 +4,21 @@ Ordered by contribution * David Arthur, [@mumrah](https://github.com/mumrah) * Mahendra M, [@mahendra](https://github.com/mahendra) +* Omar, [@rdiomar](https://github.com/rdiomar) - RIP, Omar. 2014 +* Marc Labbé, [@mrtheb](https://github.com/mrtheb) * Ivan Pouzyrevsky, [@sandello](https://github.com/sandello) +* Thomas Dimson, [@cosbynator](https://github.com/cosbynator) +* Jim Lim, [@jimjh](https://github.com/jimjh) +* Zack Dever, [@zever](https://github.com/zever) +* StevenLeRoux, [@StevenLeRoux](https://github.com/StevenLeRoux) +* Saulius Žemaitaitis, [frgtn](https://github.com/frgtn) +* Vadim Graboys, [@vadimg](https://github.com/vadimg) +* Joe Crobak, [@jcrobak](https://github.com/jcrobak) +* Niek Sanders, [@nieksand](https://github.com/nieksand) +* Greg Bowyer, [@GregBowyer](https://github.com/GregBowyer) +* Evan Klitzke, [@eklitzke](https://github.com/eklitzke) * [@anentropic](https://github.com/anentropic) +* Stephen Armstrong, [@stephenarmstrong](https://github.com/stephenarmstrong) * Ben Frederickson, [@benfred](https://github.com/benfred) Thanks, everyone! diff --git a/LICENSE b/LICENSE index a85fd9436..efe9d9682 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2012 David Arthur + Copyright 2013 David Arthur Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/POWERED-BY.md b/POWERED-BY.md new file mode 100644 index 000000000..f2e323c3e --- /dev/null +++ b/POWERED-BY.md @@ -0,0 +1,6 @@ +# Project/People/Companies using kafka-python + +If you're using this library and care to give us a shout out, please fork the project, +add yourself here, and submit a pull request. Thanks! + +* [@mumrah](https://github.com/mumrah), adding myself as an example diff --git a/README.md b/README.md index 01a241ceb..a315db6dc 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Kafka Python client -![travis](https://travis-ci.org/mumrah/kafka-python.png) +[![Build Status](https://travis-ci.org/mumrah/kafka-python.png)](https://travis-ci.org/mumrah/kafka-python) This module provides low-level protocol support for Apache Kafka as well as high-level consumer and producer classes. Request batching is supported by the @@ -17,9 +17,8 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE` # Status -I'm following the version numbers of Kafka, plus one number to indicate the -version of this project. The current version is 0.8.1-1. This version is under -development, APIs are subject to change. +The current version of this package is **0.9.0** and is compatible with +Kafka brokers running version **0.8.1**. # Usage @@ -30,27 +29,27 @@ from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer, KeyedProducer -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # To send messages synchronously -producer = SimpleProducer(kafka, "my-topic") -producer.send_messages("some message") -producer.send_messages("this method", "is variadic") +producer = SimpleProducer(kafka) +producer.send_messages("my-topic", "some message") +producer.send_messages("my-topic", "this method", "is variadic") # To send messages asynchronously -producer = SimpleProducer(kafka, "my-topic", async=True) -producer.send_messages("async message") +producer = SimpleProducer(kafka, async=True) +producer.send_messages("my-topic", "async message") # To wait for acknowledgements # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to # a local log before sending response # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed # by all in sync replicas before sending a response -producer = SimpleProducer(kafka, "my-topic", async=False, +producer = SimpleProducer(kafka, async=False, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - acks_timeout=2000) + ack_timeout=2000) -response = producer.send_messages("async message") +response = producer.send_messages("my-topic", "async message") if response: print(response[0].error) @@ -63,7 +62,7 @@ if response: # Notes: # * If the producer dies before the messages are sent, there will be losses # * Call producer.stop() to send the messages and cleanup -producer = SimpleProducer(kafka, "my-topic", batch_send=True, +producer = SimpleProducer(kafka, batch_send=True, batch_send_every_n=20, batch_send_every_t=60) @@ -81,14 +80,14 @@ from kafka.client import KafkaClient from kafka.producer import KeyedProducer from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # HashedPartitioner is default -producer = KeyedProducer(kafka, "my-topic") -producer.send("key1", "some message") -producer.send("key2", "this methode") +producer = KeyedProducer(kafka) +producer.send("my-topic", "key1", "some message") +producer.send("my-topic", "key2", "this methode") -producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) +producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) ``` ## Multiprocess consumer @@ -96,7 +95,7 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) from kafka.client import KafkaClient from kafka.consumer import MultiProcessConsumer -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) @@ -116,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4): ```python from kafka.client import KafkaClient -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") req = ProduceRequest(topic="my-topic", partition=1, messages=[KafkaProdocol.encode_message("some message")]) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) @@ -196,6 +195,7 @@ git submodule update cd kafka-src ./sbt update ./sbt package +./sbt assembly-package-dependency ``` And then run the tests. This will actually start up real local Zookeeper diff --git a/example.py b/example.py old mode 100644 new mode 100755 index 3a2dc928b..062761b02 --- a/example.py +++ b/example.py @@ -1,23 +1,48 @@ -import logging +#!/usr/bin/env python +import threading, logging, time -from kafka.client import KafkaClient, FetchRequest, ProduceRequest +from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer -def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") +class Producer(threading.Thread): + daemon = True -def consume_example(client): - consumer = SimpleConsumer(client, "test-group", "my-topic") - for message in consumer: - print(message) + def run(self): + client = KafkaClient("localhost:9092") + producer = SimpleProducer(client) + + while True: + producer.send_messages('my-topic', "test") + producer.send_messages('my-topic', "\xc2Hola, mundo!") + + time.sleep(1) + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost:9092") + consumer = SimpleConsumer(client, "test-group", "my-topic") + + for message in consumer: + print(message) def main(): - client = KafkaClient("localhost", 9092) - produce_example(client) - consume_example(client) + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(5) if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.DEBUG + ) main() diff --git a/kafka-src b/kafka-src index 9ff4e8eb1..15bb3961d 160000 --- a/kafka-src +++ b/kafka-src @@ -1 +1 @@ -Subproject commit 9ff4e8eb10e0ddd86f257e99d55971a132426605 +Subproject commit 15bb3961d9171c1c54c4c840a554ce2c76168163 diff --git a/kafka/NOTES.md b/kafka/NOTES.md index 540cdad3b..8fb0f4744 100644 --- a/kafka/NOTES.md +++ b/kafka/NOTES.md @@ -18,7 +18,7 @@ There are a few levels of abstraction: # Possible API - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") producer = KafkaProducer(client, "topic") producer.send_string("hello") diff --git a/kafka/__init__.py b/kafka/__init__.py index 73aa7603c..e446f58f1 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1,5 +1,5 @@ __title__ = 'kafka' -__version__ = '0.2-alpha' +__version__ = '0.9.0' __author__ = 'David Arthur' __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' diff --git a/kafka/client.py b/kafka/client.py index 7494b9198..39c89ba43 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,16 +1,17 @@ -import base64 +import copy +import logging + from collections import defaultdict from functools import partial -from itertools import count, cycle -import logging -from operator import attrgetter -import socket -import struct -import time -import zlib - -from kafka.common import * -from kafka.conn import KafkaConnection +from itertools import count + +from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition, + ConnectionError, FailedPayloadsError, + BrokerResponseError, PartitionUnavailableError, + LeaderUnavailableError, + KafkaUnavailableError) + +from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -21,109 +22,89 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + # NOTE: The timeout given to the client should always be greater than the + # one passed to SimpleConsumer.get_message(), otherwise you can get a + # socket timeout. + def __init__(self, hosts, client_id=CLIENT_ID, + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap - self.bufsize = bufsize self.client_id = client_id - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) - } + self.timeout = timeout + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] - self._load_metadata_for_topics() + self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] + self.load_metadata_for_topics() # bootstrap with all metadata ################## # Private API # ################## + def _get_conn(self, host, port): + "Get or create a connection to a broker using host and port" + + host_key = (host, port) + if host_key not in self.conns: + self.conns[host_key] = KafkaConnection(host, port) + + return self.conns[host_key] + def _get_conn_for_broker(self, broker): - "Get or create a connection to a broker" + """ + Get or create a connection to a broker + """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + KafkaConnection(broker.host, broker.port, timeout=self.timeout) - return self.conns[(broker.host, broker.port)] + return self._get_conn(broker.host, broker.port) def _get_leader_for_partition(self, topic, partition): + """ + Returns the leader for a partition or None if the partition exists + but has no leader. + + PartitionUnavailableError will be raised if the topic or partition + is not part of the metadata. + """ + key = TopicAndPartition(topic, partition) - if key not in self.topics_to_brokers: - self._load_metadata_for_topics(topic) + # reload metadata whether the partition is not available + # or has no leader (broker is None) + if self.topics_to_brokers.get(key) is None: + self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise Exception("Partition does not exist: %s" % str(key)) + raise PartitionUnavailableError("%s not available" % str(key)) return self.topics_to_brokers[key] - def _load_metadata_for_topics(self, *topics): + def _next_id(self): """ - Discover brokers and metadata for a set of topics. This method will - recurse in the event of a retry. + Generate a new correlation id """ - requestId = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - requestId, topics) - - response = self._send_broker_unaware_request(requestId, request) - if response is None: - raise Exception("All servers failed to process request") - - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) - - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) - - self.brokers = brokers - self.topics_to_brokers = {} - - for topic, partitions in topics.items(): - # Clear the list once before we add it. This removes stale entries - # and avoids duplicates - self.topic_partitions.pop(topic, None) - - if not partitions: - log.warn("Partition is unassigned, delay for 1s and retry. Have you created {} on zookeeper?".format(topic)) - time.sleep(1) - self._load_metadata_for_topics(topic) - break - - for partition, meta in partitions.items(): - if meta.leader == -1: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - else: - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) - - def _next_id(self): - "Generate a new correlation id" return KafkaClient.ID_GEN.next() - def _safe_conn_reinit(self, conn): - try: - conn.reinit() - except socket.error, e: - log.error("unsuccessful reinit", e) - def _send_broker_unaware_request(self, requestId, request): """ Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for conn in self.conns.values(): + for (host, port) in self.hosts: try: + conn = self._get_conn(host, port) conn.send(requestId, request) response = conn.recv(requestId) return response except Exception, e: - log.warning("Could not send request [%r] to server %s, " - "trying next server: %s" % (request, conn, e)) - self._safe_conn_reinit(conn) + log.warning("Could not send request [%r] to server %s:%i, " + "trying next server: %s" % (request, host, port, e)) continue - return None + raise KafkaUnavailableError("All servers failed to process request") def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -154,6 +135,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) + if leader is None: + raise LeaderUnavailableError( + "Leader not available for topic %s partition %s" % + (payload.topic, payload.partition)) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -171,40 +156,127 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) + failed = False # Send the request, recv the response try: conn.send(requestId, request) if decoder_fn is None: continue - response = conn.recv(requestId) - except Exception, e: - log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + try: + response = conn.recv(requestId) + except ConnectionError, e: + log.warning("Could not receive response to request [%s] " + "from server %s: %s", request, conn, e) + failed = True + except ConnectionError, e: + log.warning("Could not send request [%s] to server %s: %s", + request, conn, e) + failed = True + + if failed: failed_payloads += payloads - self._safe_conn_reinit(conn) - self.topics_to_brokers = {} + self.reset_all_metadata() continue for response in decoder_fn(response): acc[(response.topic, response.partition)] = response if failed_payloads: - raise FailedPayloadsException(failed_payloads) + raise FailedPayloadsError(failed_payloads) # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () + def __repr__(self): + return '' % (self.client_id) + + def _raise_on_response_error(self, resp): + if resp.error == ErrorMapping.NO_ERROR: + return + + if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, + ErrorMapping.NOT_LEADER_FOR_PARTITION): + self.reset_topic_metadata(resp.topic) + + raise BrokerResponseError( + "Request for %s failed with errorcode=%d (%s)" % + (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error])) + ################# # Public API # ################# + def reset_topic_metadata(self, *topics): + for topic in topics: + try: + partitions = self.topic_partitions[topic] + except KeyError: + continue + + for partition in partitions: + self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None) + + del self.topic_partitions[topic] + + def reset_all_metadata(self): + self.topics_to_brokers.clear() + self.topic_partitions.clear() + + def has_metadata_for_topic(self, topic): + return topic in self.topic_partitions def close(self): for conn in self.conns.values(): conn.close() + def copy(self): + """ + Create an inactive copy of the client object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + for k, v in c.conns.items(): + c.conns[k] = v.copy() + return c + def reinit(self): for conn in self.conns.values(): conn.reinit() + def load_metadata_for_topics(self, *topics): + """ + Discover brokers and metadata for a set of topics. This function is called + lazily whenever metadata is unavailable. + """ + request_id = self._next_id() + request = KafkaProtocol.encode_metadata_request(self.client_id, + request_id, topics) + + response = self._send_broker_unaware_request(request_id, request) + + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + + log.debug("Broker metadata: %s", brokers) + log.debug("Topic metadata: %s", topics) + + self.brokers = brokers + + for topic, partitions in topics.items(): + self.reset_topic_metadata(topic) + + if not partitions: + log.warning('No partitions for %s', topic) + continue + + self.topic_partitions[topic] = [] + for partition, meta in partitions.items(): + self.topic_partitions[topic].append(partition) + topic_part = TopicAndPartition(topic, partition) + if meta.leader == -1: + log.warning('No leader for topic %s partition %s', topic, partition) + self.topics_to_brokers[topic_part] = None + else: + self.topics_to_brokers[topic_part] = brokers[meta.leader] + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -228,8 +300,10 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, order of input payloads """ - encoder = partial(KafkaProtocol.encode_produce_request, - acks=acks, timeout=timeout) + encoder = partial( + KafkaProtocol.encode_produce_request, + acks=acks, + timeout=timeout) if acks == 0: decoder = None @@ -240,14 +314,9 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with " - "errorcode=%d" % ( - TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -267,19 +336,15 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, max_wait_time=max_wait_time, min_bytes=min_bytes) - resps = self._send_broker_aware_request(payloads, encoder, - KafkaProtocol.decode_fetch_response) + resps = self._send_broker_aware_request( + payloads, encoder, + KafkaProtocol.decode_fetch_response) out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("FetchRequest for %s failed with " - "errorcode=%d" % ( - TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -288,15 +353,15 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_offset_request, - KafkaProtocol.decode_offset_response) + resps = self._send_broker_aware_request( + payloads, + KafkaProtocol.encode_offset_request, + KafkaProtocol.decode_offset_response) out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: @@ -312,9 +377,8 @@ def send_offset_commit_request(self, group, payloads=[], out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with " - "errorcode=%s", resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) @@ -332,9 +396,8 @@ def send_offset_fetch_request(self, group, payloads=[], out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: diff --git a/kafka/codec.py b/kafka/codec.py index eb5d03cf9..206ddb491 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,5 +1,9 @@ from cStringIO import StringIO import gzip +import struct + +_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1) +_XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy @@ -36,13 +40,101 @@ def gzip_decode(payload): return result -def snappy_encode(payload): +def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): + """Encodes the given data with snappy if xerial_compatible is set then the + stream is encoded in a fashion compatible with the xerial snappy library + + The block size (xerial_blocksize) controls how frequent the blocking occurs + 32k is the default in the xerial library. + + The format winds up being + +-------------+------------+--------------+------------+--------------+ + | Header | Block1 len | Block1 data | Blockn len | Blockn data | + |-------------+------------+--------------+------------+--------------| + | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes | + +-------------+------------+--------------+------------+--------------+ + + It is important to not that the blocksize is the amount of uncompressed + data presented to snappy at each block, whereas the blocklen is the + number of bytes that will be present in the stream, that is the + length will always be <= blocksize. + """ + if not _has_snappy: raise NotImplementedError("Snappy codec is not available") - return snappy.compress(payload) + + if xerial_compatible: + def _chunker(): + for i in xrange(0, len(payload), xerial_blocksize): + yield payload[i:i+xerial_blocksize] + + out = StringIO() + + header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + + out.write(header) + for chunk in _chunker(): + block = snappy.compress(chunk) + block_size = len(block) + out.write(struct.pack('!i', block_size)) + out.write(block) + + out.seek(0) + return out.read() + + else: + return snappy.compress(payload) + + +def _detect_xerial_stream(payload): + """Detects if the data given might have been encoded with the blocking mode + of the xerial snappy library. + + This mode writes a magic header of the format: + +--------+--------------+------------+---------+--------+ + | Marker | Magic String | Null / Pad | Version | Compat | + |--------+--------------+------------+---------+--------| + | byte | c-string | byte | int32 | int32 | + |--------+--------------+------------+---------+--------| + | -126 | 'SNAPPY' | \0 | | | + +--------+--------------+------------+---------+--------+ + + The pad appears to be to ensure that SNAPPY is a valid cstring + The version is the version of this format as written by xerial, + in the wild this is currently 1 as such we only support v1. + + Compat is there to claim the miniumum supported version that + can read a xerial block stream, presently in the wild this is + 1. + """ + + if len(payload) > 16: + header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + return header == _XERIAL_V1_HEADER + return False def snappy_decode(payload): if not _has_snappy: raise NotImplementedError("Snappy codec is not available") - return snappy.decompress(payload) + + if _detect_xerial_stream(payload): + # TODO ? Should become a fileobj ? + out = StringIO() + byt = buffer(payload[16:]) + length = len(byt) + cursor = 0 + + while cursor < length: + block_size = struct.unpack_from('!i', byt[cursor:])[0] + # Skip the block size + cursor += 4 + end = cursor + block_size + out.write(snappy.decompress(byt[cursor:end])) + cursor = end + + out.seek(0) + return out.read() + else: + return snappy.decompress(payload) diff --git a/kafka/common.py b/kafka/common.py index 0658d8b8c..005e6dd06 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -48,38 +48,73 @@ TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +ErrorStrings = { + -1 : 'UNKNOWN', + 0 : 'NO_ERROR', + 1 : 'OFFSET_OUT_OF_RANGE', + 2 : 'INVALID_MESSAGE', + 3 : 'UNKNOWN_TOPIC_OR_PARTITON', + 4 : 'INVALID_FETCH_SIZE', + 5 : 'LEADER_NOT_AVAILABLE', + 6 : 'NOT_LEADER_FOR_PARTITION', + 7 : 'REQUEST_TIMED_OUT', + 8 : 'BROKER_NOT_AVAILABLE', + 9 : 'REPLICA_NOT_AVAILABLE', + 10 : 'MESSAGE_SIZE_TOO_LARGE', + 11 : 'STALE_CONTROLLER_EPOCH', + 12 : 'OFFSET_METADATA_TOO_LARGE', +} + class ErrorMapping(object): - # Many of these are not actually used by the client - UNKNOWN = -1 - NO_ERROR = 0 - OFFSET_OUT_OF_RANGE = 1 - INVALID_MESSAGE = 2 - UNKNOWN_TOPIC_OR_PARTITON = 3 - INVALID_FETCH_SIZE = 4 - LEADER_NOT_AVAILABLE = 5 - NOT_LEADER_FOR_PARTITION = 6 - REQUEST_TIMED_OUT = 7 - BROKER_NOT_AVAILABLE = 8 - REPLICA_NOT_AVAILABLE = 9 - MESSAGE_SIZE_TO_LARGE = 10 - STALE_CONTROLLER_EPOCH = 11 - OFFSET_METADATA_TOO_LARGE = 12 + pass + +for k, v in ErrorStrings.items(): + setattr(ErrorMapping, v, k) ################# # Exceptions # ################# -class FailedPayloadsException(Exception): + +class KafkaError(RuntimeError): + pass + + +class KafkaUnavailableError(KafkaError): + pass + + +class BrokerResponseError(KafkaError): + pass + + +class LeaderUnavailableError(KafkaError): pass -class BufferUnderflowError(Exception): + +class PartitionUnavailableError(KafkaError): pass -class ChecksumError(Exception): + +class FailedPayloadsError(KafkaError): pass -class ConsumerFetchSizeTooSmall(Exception): + +class ConnectionError(KafkaError): pass -class ConsumerNoMoreData(Exception): + +class BufferUnderflowError(KafkaError): + pass + + +class ChecksumError(KafkaError): + pass + + +class ConsumerFetchSizeTooSmall(KafkaError): + pass + + +class ConsumerNoMoreData(KafkaError): pass diff --git a/kafka/conn.py b/kafka/conn.py index 7103253d1..4fdeb17c7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,68 +1,99 @@ +import copy import logging import socket import struct +from random import shuffle from threading import local +from kafka.common import ConnectionError + log = logging.getLogger("kafka") +DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 +DEFAULT_KAFKA_PORT = 9092 + + +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionally + randomize the returned list. + """ + + if isinstance(hosts, basestring): + hosts = hosts.strip().split(',') + + result = [] + for host_port in hosts: + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + + class KafkaConnection(local): """ A socket connection to a single Kafka broker This class is _not_ thread safe. Each call to `send` must be followed - by a call to `recv` in order to get the correct response. Eventually, + by a call to `recv` in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. + + host: the host name or IP address of a kafka broker + port: the port number the kafka broker is listening on + timeout: default 120. The socket timeout for sending and receiving data + in seconds. None means no timeout, so a request can block forever. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + super(KafkaConnection, self).__init__() self.host = host self.port = port - self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) - self._sock.settimeout(10) + self.timeout = timeout + self._sock = None + + self.reinit() - def __str__(self): + def __repr__(self): return "" % (self.host, self.port) ################### # Private API # ################### - def _consume_response(self): - """ - Fully consumer the response iterator - """ - data = "" - for chunk in self._consume_response_iter(): - data += chunk - return data + def _raise_connection_error(self): + self._dirty = True + raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) - def _consume_response_iter(self): - """ - This method handles the response header and error messages. It - then returns an iterator for the chunks of the response - """ - log.debug("Handling response from Kafka") + def _read_bytes(self, num_bytes): + bytes_left = num_bytes + responses = [] - # Read the size off of the header - resp = self._sock.recv(4) - if resp == "": - raise Exception("Got no response from Kafka") - (size,) = struct.unpack('>i', resp) + log.debug("About to read %d bytes from Kafka", num_bytes) + if self._dirty: + self.reinit() - messageSize = size - 4 - log.debug("About to read %d bytes from Kafka", messageSize) + while bytes_left: + try: + data = self._sock.recv(min(bytes_left, 4096)) + except socket.error: + log.exception('Unable to receive data from Kafka') + self._raise_connection_error() - # Read the remainder of the response - total = 0 - while total < messageSize: - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise BufferUnderflowError("Not enough data to read this response") - total += len(resp) - yield resp + if data == '': + log.error("Not enough data to read this response") + self._raise_connection_error() + + bytes_left -= len(data) + log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) + responses.append(data) + + return ''.join(responses) ################## # Public API # @@ -70,28 +101,55 @@ def _consume_response_iter(self): # TODO multiplex socket communication to allow for multi-threaded clients - def send(self, requestId, payload): + def send(self, request_id, payload): "Send a request to Kafka" - log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), requestId)) - sent = self._sock.sendall(payload) - if sent != None: - raise RuntimeError("Kafka went away") + log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + try: + if self._dirty: + self.reinit() + sent = self._sock.sendall(payload) + if sent is not None: + self._raise_connection_error() + except socket.error: + log.exception('Unable to send payload to Kafka') + self._raise_connection_error() + + def recv(self, request_id): + """ + Get a response from Kafka + """ + log.debug("Reading response %d from Kafka" % request_id) + # Read the size off of the header + resp = self._read_bytes(4) + + (size,) = struct.unpack('>i', resp) + + # Read the remainder of the response + resp = self._read_bytes(size) + return str(resp) - def recv(self, requestId): - "Get a response from Kafka" - log.debug("Reading response %d from Kafka" % requestId) - self.data = self._consume_response() - return self.data + def copy(self): + """ + Create an inactive copy of the connection object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + c._sock = None + return c def close(self): - "Close this connection" - self._sock.close() + """ + Close this connection + """ + if self._sock: + self._sock.close() def reinit(self): """ Re-initialize the socket connection """ - self._sock.close() + self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) + self._sock.settimeout(self.timeout) + self._dirty = False diff --git a/kafka/consumer.py b/kafka/consumer.py index c338337ff..8ac28daf4 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,14 +1,15 @@ -from collections import defaultdict +from __future__ import absolute_import + from itertools import izip_longest, repeat import logging import time from threading import Lock -from multiprocessing import Process, Queue, Event, Value -from Queue import Empty +from multiprocessing import Process, Queue as MPQueue, Event, Value +from Queue import Empty, Queue from kafka.common import ( ErrorMapping, FetchRequest, - OffsetRequest, OffsetFetchRequest, OffsetCommitRequest, + OffsetRequest, OffsetCommitRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -22,6 +23,11 @@ FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 + +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 class FetchContext(object): @@ -32,13 +38,15 @@ def __init__(self, consumer, block, timeout): self.consumer = consumer self.block = block - if block and not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - - self.timeout = timeout * 1000 + if block: + if not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + self.timeout = timeout * 1000 def __enter__(self): """Set fetch values based on blocking status""" + self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time + self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes if self.block: self.consumer.fetch_max_wait_time = self.timeout self.consumer.fetch_min_bytes = 1 @@ -46,9 +54,9 @@ def __enter__(self): self.consumer.fetch_min_bytes = 0 def __exit__(self, type, value, traceback): - """Reset values to default""" - self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + """Reset values""" + self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time + self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes class Consumer(object): @@ -67,7 +75,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, self.client = client self.topic = topic self.group = group - self.client._load_metadata_for_topics(topic) + self.client.load_metadata_for_topics(topic) self.offsets = {} if not partitions: @@ -204,8 +212,14 @@ class SimpleConsumer(Consumer): before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -216,18 +230,33 @@ class SimpleConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES): - + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, + iter_timeout=None): + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) + + if max_buffer_size is not None and buffer_size > max_buffer_size: + raise ValueError("buffer_size (%d) is greater than " + "max_buffer_size (%d)" % + (buffer_size, max_buffer_size)) + self.buffer_size = buffer_size + self.max_buffer_size = max_buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes - self.fetch_started = defaultdict(bool) # defaults to false + self.fetch_offsets = self.offsets.copy() + self.iter_timeout = iter_timeout + self.queue = Queue() - super(SimpleConsumer, self).__init__(client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + def __repr__(self): + return '' % \ + (self.group, self.topic, str(self.offsets.keys())) def provide_partition_info(self): """ @@ -264,135 +293,218 @@ def seek(self, offset, whence): reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - # The API returns back the next available offset - # For eg: if the current offset is 18, the API will return - # back 19. So, if we have to seek 5 points before, we will - # end up going back to 14, instead of 13. Adjust this - deltas[partition] -= 1 else: pass resps = self.client.send_offset_request(reqs) for resp in resps: - self.offsets[resp.partition] = resp.offsets[0] + \ - deltas[resp.partition] + self.offsets[resp.partition] = \ + resp.offsets[0] + deltas[resp.partition] else: raise ValueError("Unexpected value for `whence`, %d" % whence) + # Reset queue and fetch offsets since they are invalid + self.fetch_offsets = self.offsets.copy() + if self.auto_commit: + self.count_since_commit += 1 + self.commit() + + self.queue = Queue() + def get_messages(self, count=1, block=True, timeout=0.1): """ Fetch the specified number of messages count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] - iterator = self.__iter__() - - # HACK: This splits the timeout between available partitions - timeout = timeout * 1.0 / len(self.offsets) - - with FetchContext(self, block, timeout): - while count > 0: - try: - messages.append(next(iterator)) - except StopIteration as exp: - break + if timeout is not None: + max_time = time.time() + timeout + + new_offsets = {} + while count > 0 and (timeout is None or timeout > 0): + result = self._get_message(block, timeout, get_partition_info=True, + update_offset=False) + if result: + partition, message = result + if self.partition_info: + messages.append(result) + else: + messages.append(message) + new_offsets[partition] = message.offset + 1 count -= 1 - + else: + # Ran out of messages for the last request. + if not block: + # If we're not blocking, break. + break + if timeout is not None: + # If we're blocking and have a timeout, reduce it to the + # appropriate value + timeout = max_time - time.time() + + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() return messages - def __iter__(self): + def get_message(self, block=True, timeout=0.1, get_partition_info=None): + return self._get_message(block, timeout, get_partition_info) + + def _get_message(self, block=True, timeout=0.1, get_partition_info=None, + update_offset=True): """ - Create an iterate per partition. Iterate through them calling next() - until they are all exhausted. + If no messages can be fetched, returns None. + If get_partition_info is None, it defaults to self.partition_info + If get_partition_info is True, returns (partition, message) + If get_partition_info is False, returns message """ - iters = {} - for partition, offset in self.offsets.items(): - iters[partition] = self.__iter_partition__(partition, offset) + if self.queue.empty(): + # We're out of messages, go grab some more. + with FetchContext(self, block, timeout): + self._fetch() + try: + partition, message = self.queue.get_nowait() - if len(iters) == 0: - return + if update_offset: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + if get_partition_info is None: + get_partition_info = self.partition_info + if get_partition_info: + return partition, message + else: + return message + except Empty: + return None + + def __iter__(self): + if self.iter_timeout is None: + timeout = ITER_TIMEOUT_SECONDS + else: + timeout = self.iter_timeout while True: - if len(iters) == 0: + message = self.get_message(True, timeout) + if message: + yield message + elif self.iter_timeout is None: + # We did not receive any message yet but we don't have a + # timeout, so give up the CPU for a while before trying again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + else: + # Timed out waiting for a message break - for partition, it in iters.items(): + def _fetch(self): + # Create fetch request payloads for all the partitions + requests = [] + partitions = self.fetch_offsets.keys() + while partitions: + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, + self.fetch_offsets[partition], + self.buffer_size)) + # Send request + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + + retry_partitions = set() + for resp in responses: + partition = resp.partition try: - if self.partition_info: - yield (partition, it.next()) + for message in resp.messages: + # Put the message in our queue + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 + except ConsumerFetchSizeTooSmall, e: + if (self.max_buffer_size is not None and + self.buffer_size == self.max_buffer_size): + log.error("Max fetch size %d too small", + self.max_buffer_size) + raise e + if self.max_buffer_size is None: + self.buffer_size *= 2 else: - yield it.next() + self.buffer_size = max(self.buffer_size * 2, + self.max_buffer_size) + log.warn("Fetch size too small, increase to %d (2x) " + "and retry", self.buffer_size) + retry_partitions.add(partition) + except ConsumerNoMoreData, e: + log.debug("Iteration was ended by %r", e) except StopIteration: + # Stop iterating through this partition log.debug("Done iterating over partition %s" % partition) - del iters[partition] + partitions = retry_partitions - # skip auto-commit since we didn't yield anything - continue - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - def __iter_partition__(self, partition, offset): - """ - Iterate over the messages in a partition. Create a FetchRequest - to get back a batch of messages, yield them one at a time. - After a batch is exhausted, start a new batch unless we've reached - the end of this partition. - """ +def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): + """ + A child process worker which consumes messages based on the + notifications given by the controller process - # The offset that is stored in the consumer is the offset that - # we have consumed. In subsequent iterations, we are supposed to - # fetch the next message (that is from the next offset) - # However, for the 0th message, the offset should be as-is. - # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is - # problematic, since 0 is offset of a message which we have not yet - # consumed. - if self.fetch_started[partition]: - offset += 1 + NOTE: Ideally, this should have been a method inside the Consumer + class. However, multiprocessing module has issues in windows. The + functionality breaks unless this function is kept outside of a class + """ - fetch_size = self.fetch_min_bytes + # Make the child processes open separate socket connections + client.reinit() + + # We will start consumers without auto-commit. Auto-commit will be + # done by the master controller process. + consumer = SimpleConsumer(client, group, topic, + partitions=chunk, + auto_commit=False, + auto_commit_every_n=None, + auto_commit_every_t=None) + + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() + + while True: + # Wait till the controller indicates us to start consumption + start.wait() + + # If we are asked to quit, do so + if exit.is_set(): + break + + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice + count = 0 + + message = consumer.get_message() + if message: + queue.put(message) + count += 1 + + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event + if count == size.value: + pause.wait() - while True: - req = FetchRequest(self.topic, partition, offset, fetch_size) + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - (resp,) = self.client.send_fetch_request([req], - max_wait_time=self.fetch_max_wait_time, - min_bytes=fetch_size) - - assert resp.topic == self.topic - assert resp.partition == partition - - next_offset = None - try: - for message in resp.messages: - next_offset = message.offset - - # update the offset before the message is yielded. This is - # so that the consumer state is not lost in certain cases. - # For eg: the message is yielded and consumed by the caller, - # but the caller does not come back into the generator again. - # The message will be consumed but the status will not be - # updated in the consumer - self.fetch_started[partition] = True - self.offsets[partition] = message.offset - yield message - except ConsumerFetchSizeTooSmall, e: - log.warn("Fetch size is too small, increasing by 1.5x and retrying") - fetch_size *= 1.5 - continue - except ConsumerNoMoreData, e: - log.debug("Iteration was ended by %r", e) - - if next_offset is None: - break - else: - offset = next_offset + 1 + consumer.stop() class MultiProcessConsumer(Consumer): @@ -426,15 +538,16 @@ def __init__(self, client, group, topic, auto_commit=True, num_procs=1, partitions_per_proc=0): # Initiate the base consumer class - super(MultiProcessConsumer, self).__init__(client, group, topic, - partitions=None, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + super(MultiProcessConsumer, self).__init__( + client, group, topic, + partitions=None, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue(1024) # Child consumers dump messages into this + self.queue = MPQueue(1024) # Child consumers dump messages into this self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown self.pause = Event() # Requests the consumers to pause fetch @@ -458,62 +571,19 @@ def __init__(self, client, group, topic, auto_commit=True, self.procs = [] for chunk in chunks: chunk = filter(lambda x: x is not None, chunk) - proc = Process(target=self._consume, args=(chunk,)) + args = (client.copy(), + group, topic, chunk, + self.queue, self.start, self.exit, + self.pause, self.size) + + proc = Process(target=_mp_consume, args=args) proc.daemon = True proc.start() self.procs.append(proc) - def _consume(self, partitions): - """ - A child process worker which consumes messages based on the - notifications given by the controller process - """ - - # Make the child processes open separate socket connections - self.client.reinit() - - # We will start consumers without auto-commit. Auto-commit will be - # done by the master controller process. - consumer = SimpleConsumer(self.client, self.group, self.topic, - partitions=partitions, - auto_commit=False, - auto_commit_every_n=None, - auto_commit_every_t=None) - - # Ensure that the consumer provides the partition information - consumer.provide_partition_info() - - while True: - # Wait till the controller indicates us to start consumption - self.start.wait() - - # If we are asked to quit, do so - if self.exit.is_set(): - break - - # Consume messages and add them to the queue. If the controller - # indicates a specific number of messages, follow that advice - count = 0 - - for partition, message in consumer: - self.queue.put((partition, message)) - count += 1 - - # We have reached the required size. The controller might have - # more than what he needs. Wait for a while. - # Without this logic, it is possible that we run into a big - # loop consuming all available messages before the controller - # can reset the 'start' event - if count == self.size.value: - self.pause.wait() - break - - # In case we did not receive any message, give up the CPU for - # a while before we try again - if count == 0: - time.sleep(0.1) - - consumer.stop() + def __repr__(self): + return '' % \ + (self.group, self.topic, len(self.procs)) def stop(self): # Set exit and start off all waiting consumers @@ -548,12 +618,11 @@ def __iter__(self): break # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.start.clear() - yield message - self.count_since_commit += 1 self._auto_commit() + yield message self.start.clear() @@ -563,8 +632,9 @@ def get_messages(self, count=1, block=True, timeout=10): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] @@ -575,7 +645,11 @@ def get_messages(self, count=1, block=True, timeout=10): self.size.value = count self.pause.clear() - while count > 0: + if timeout is not None: + max_time = time.time() + timeout + + new_offsets = {} + while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not # go into overdrive and keep consuming thousands of @@ -589,15 +663,18 @@ def get_messages(self, count=1, block=True, timeout=10): break messages.append(message) - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset - self.count_since_commit += 1 - self._auto_commit() + new_offsets[partition] = message.offset + 1 count -= 1 + if timeout is not None: + timeout = max_time - time.time() self.size.value = 0 self.start.clear() self.pause.set() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + return messages diff --git a/kafka/partitioner.py b/kafka/partitioner.py index 84db4d513..8190c34f9 100644 --- a/kafka/partitioner.py +++ b/kafka/partitioner.py @@ -22,7 +22,7 @@ def partition(self, key, partitions): may look like an overhead, but it will be useful (in future) when we handle cases like rebalancing """ - raise NotImplemented('partition function has to be implemented') + raise NotImplementedError('partition function has to be implemented') class RoundRobinPartitioner(Partitioner): @@ -31,7 +31,8 @@ class RoundRobinPartitioner(Partitioner): in a round robin fashion """ def __init__(self, partitions): - self._set_partitions(partitions) + super(RoundRobinPartitioner, self).__init__(partitions) + self.iterpart = cycle(partitions) def _set_partitions(self, partitions): self.partitions = partitions diff --git a/kafka/producer.py b/kafka/producer.py index f7962da8a..12a293401 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,14 +1,16 @@ +from __future__ import absolute_import + +import logging +import time + +from Queue import Empty from collections import defaultdict -from datetime import datetime, timedelta from itertools import cycle from multiprocessing import Queue, Process -from Queue import Empty -import logging -import sys -from kafka.common import ProduceRequest -from kafka.protocol import create_message +from kafka.common import ProduceRequest, TopicAndPartition from kafka.partitioner import HashedPartitioner +from kafka.protocol import create_message log = logging.getLogger("kafka") @@ -18,13 +20,67 @@ STOP_ASYNC_PRODUCER = -1 +def _send_upstream(queue, client, batch_time, batch_size, + req_acks, ack_timeout): + """ + Listen on the queue for a specified number of messages or till + a specified timeout and send them upstream to the brokers in one + request + + NOTE: Ideally, this should have been a method inside the Producer + class. However, multiprocessing module has issues in windows. The + functionality breaks unless this function is kept outside of a class + """ + stop = False + client.reinit() + + while not stop: + timeout = batch_time + count = batch_size + send_at = time.time() + timeout + msgset = defaultdict(list) + + # Keep fetching till we gather enough messages or a + # timeout is reached + while count > 0 and timeout >= 0: + try: + topic_partition, msg = queue.get(timeout=timeout) + + except Empty: + break + + # Check if the controller has requested us to stop + if topic_partition == STOP_ASYNC_PRODUCER: + stop = True + break + + # Adjust the timeout to match the remaining period + count -= 1 + timeout = send_at - time.time() + msgset[topic_partition].append(msg) + + # Send collected requests upstream + reqs = [] + for topic_partition, messages in msgset.items(): + req = ProduceRequest(topic_partition.topic, + topic_partition.partition, + messages) + reqs.append(req) + + try: + client.send_produce_request(reqs, + acks=req_acks, + timeout=ack_timeout) + except Exception: + log.exception("Unable to send message") + + class Producer(object): """ Base class to be used by producers Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -61,79 +117,39 @@ def __init__(self, client, async=False, self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout - self.batch_send = batch_send - self.batch_size = batch_send_every_n - self.batch_time = batch_send_every_t if self.async: self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=self._send_upstream, args=(self.queue,)) - self.proc.daemon = True # Process will die if main thread exits + self.proc = Process(target=_send_upstream, + args=(self.queue, + self.client.copy(), + batch_send_every_t, + batch_send_every_n, + self.req_acks, + self.ack_timeout)) + + # Process will die if main thread exits + self.proc.daemon = True self.proc.start() - def _send_upstream(self, queue): - """ - Listen on the queue for a specified number of messages or till - a specified timeout and send them upstream to the brokers in one - request - """ - stop = False - - while not stop: - timeout = self.batch_time - send_at = datetime.now() + timedelta(seconds=timeout) - count = self.batch_size - msgset = defaultdict(list) - - # Keep fetching till we gather enough messages or a - # timeout is reached - while count > 0 and timeout >= 0: - try: - partition, msg = queue.get(timeout=timeout) - except Empty: - break - - # Check if the controller has requested us to stop - if partition == STOP_ASYNC_PRODUCER: - stop = True - break - - # Adjust the timeout to match the remaining period - count -= 1 - timeout = (send_at - datetime.now()).total_seconds() - msgset[partition].append(msg) - - # Send collected requests upstream - reqs = [] - for partition, messages in msgset.items(): - req = ProduceRequest(self.topic, partition, messages) - reqs.append(req) - - try: - self.client.send_produce_request(reqs, acks=self.req_acks, - timeout=self.ack_timeout) - except Exception as exp: - self.client._load_metadata_for_topics(self.topic) - log.error("Error sending message", exc_info=sys.exc_info()) - - def send_messages(self, partition, *msg): + def send_messages(self, topic, partition, *msg): """ Helper method to send produce requests """ if self.async: for m in msg: - self.queue.put((partition, create_message(m))) + self.queue.put((TopicAndPartition(topic, partition), + create_message(m))) resp = [] else: messages = [create_message(m) for m in msg] - req = ProduceRequest(self.topic, partition, messages) + req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) - except Exception as exp: - self.client._load_metadata_for_topics(self.topic) - log.error("Error sending message", exc_info=sys.exc_info()) - raise exp + except Exception: + log.exception("Unable to send messages") + raise return resp def stop(self, timeout=1): @@ -155,7 +171,6 @@ class SimpleProducer(Producer): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -166,24 +181,31 @@ class SimpleProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, async=False, + def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client._load_metadata_for_topics(topic) - self.next_partition = cycle(client.topic_partitions[topic]) - + self.partition_cycles = {} super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send_messages(self, *msg): - partition = self.next_partition.next() - return super(SimpleProducer, self).send_messages(partition, *msg) + def _next_partition(self, topic): + if topic not in self.partition_cycles: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + return self.partition_cycles[topic].next() + + def send_messages(self, topic, *msg): + partition = self._next_partition(topic) + return super(SimpleProducer, self).send_messages(topic, partition, *msg) + + def __repr__(self): + return '' % self.async class KeyedProducer(Producer): @@ -192,7 +214,6 @@ class KeyedProducer(Producer): Args: client - The kafka client instance - topic - The kafka topic to send messages to partitioner - A partitioner class that will be used to get the partition to send the message to. Must be derived from Partitioner async - If True, the messages are sent asynchronously via another @@ -203,26 +224,34 @@ class KeyedProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, partitioner=None, async=False, + def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client._load_metadata_for_topics(topic) - if not partitioner: partitioner = HashedPartitioner - - self.partitioner = partitioner(client.topic_partitions[topic]) + self.partitioner_class = partitioner + self.partitioners = {} super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send(self, key, msg): - partitions = self.client.topic_partitions[self.topic] - partition = self.partitioner.partition(key, partitions) - return self.send_messages(partition, msg) + def _next_partition(self, topic, key): + if topic not in self.partitioners: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partitioners[topic] = \ + self.partitioner_class(self.client.topic_partitions[topic]) + partitioner = self.partitioners[topic] + return partitioner.partition(key, self.client.topic_partitions[topic]) + + def send(self, topic, key, msg): + partition = self._next_partition(topic, key) + return self.send_messages(topic, partition, msg) + + def __repr__(self): + return '' % self.async diff --git a/kafka/protocol.py b/kafka/protocol.py index c2b017ef6..25be023eb 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -25,12 +25,12 @@ class KafkaProtocol(object): This class does not have any state associated with it, it is purely for organization. """ - PRODUCE_KEY = 0 - FETCH_KEY = 1 - OFFSET_KEY = 2 - METADATA_KEY = 3 - OFFSET_COMMIT_KEY = 6 - OFFSET_FETCH_KEY = 7 + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + OFFSET_COMMIT_KEY = 8 + OFFSET_FETCH_KEY = 9 ATTRIBUTE_CODEC_MASK = 0x03 CODEC_NONE = 0x00 @@ -119,9 +119,17 @@ def _decode_message_set_iter(cls, data): read_message = True yield OffsetAndMessage(offset, message) except BufferUnderflowError: + # NOTE: Not sure this is correct error handling: + # Is it possible to get a BUE if the message set is somewhere + # in the middle of the fetch response? If so, we probably have + # an issue that's not fetch size too small. + # Aren't we ignoring errors if we fail to unpack data by + # raising StopIteration()? + # If _decode_message() raises a ChecksumError, couldn't that + # also be due to the fetch size being too small? if read_message is False: - # If we get a partial read of a message, but haven't yielded anyhting - # there's a problem + # If we get a partial read of a message, but haven't + # yielded anything there's a problem raise ConsumerFetchSizeTooSmall() else: raise StopIteration() @@ -171,7 +179,7 @@ def encode_produce_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of ProduceRequest acks: How "acky" you want the request to be 0: immediate response @@ -231,7 +239,7 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=None, Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of FetchRequest max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before @@ -274,14 +282,14 @@ def decode_fetch_response(cls, data): for i in range(num_partitions): ((partition, error, highwater_mark_offset), cur) = \ - relative_unpack('>ihq', data, cur) + relative_unpack('>ihq', data, cur) (message_set, cur) = read_int_string(data, cur) yield FetchResponse( - topic, partition, error, - highwater_mark_offset, - KafkaProtocol._decode_message_set_iter(message_set)) + topic, partition, error, + highwater_mark_offset, + KafkaProtocol._decode_message_set_iter(message_set)) @classmethod def encode_offset_request(cls, client_id, correlation_id, payloads=None): @@ -321,7 +329,7 @@ def decode_offset_response(cls, data): for i in range(num_partitions): ((partition, error, num_offsets,), cur) = \ - relative_unpack('>ihi', data, cur) + relative_unpack('>ihi', data, cur) offsets = [] for j in range(num_offsets): @@ -338,7 +346,7 @@ def encode_metadata_request(cls, client_id, correlation_id, topics=None): Params ====== client_id: string - correlation_id: string + correlation_id: int topics: list of strings """ topics = [] if topics is None else topics @@ -361,11 +369,11 @@ def decode_metadata_response(cls, data): ====== data: bytes to decode """ - ((correlation_id, numBrokers), cur) = relative_unpack('>ii', data, 0) + ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) # Broker info brokers = {} - for i in range(numBrokers): + for i in range(numbrokers): ((nodeId, ), cur) = relative_unpack('>i', data, cur) (host, cur) = read_short_string(data, cur) ((port,), cur) = relative_unpack('>i', data, cur) @@ -373,31 +381,35 @@ def decode_metadata_response(cls, data): # Topic info ((num_topics,), cur) = relative_unpack('>i', data, cur) - topicMetadata = {} + topic_metadata = {} for i in range(num_topics): - ((topicError,), cur) = relative_unpack('>h', data, cur) - (topicName, cur) = read_short_string(data, cur) + # NOTE: topic_error is discarded. Should probably be returned with + # the topic metadata. + ((topic_error,), cur) = relative_unpack('>h', data, cur) + (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - partitionMetadata = {} + partition_metadata = {} for j in range(num_partitions): - ((partitionErrorCode, partition, leader, numReplicas), cur) = \ - relative_unpack('>hiii', data, cur) + # NOTE: partition_error_code is discarded. Should probably be + # returned with the partition metadata. + ((partition_error_code, partition, leader, numReplicas), cur) = \ + relative_unpack('>hiii', data, cur) - (replicas, cur) = relative_unpack('>%di' % numReplicas, - data, cur) + (replicas, cur) = relative_unpack( + '>%di' % numReplicas, data, cur) - ((numIsr,), cur) = relative_unpack('>i', data, cur) - (isr, cur) = relative_unpack('>%di' % numIsr, data, cur) + ((num_isr,), cur) = relative_unpack('>i', data, cur) + (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) - partitionMetadata[partition] = \ - PartitionMetadata(topicName, partition, leader, - replicas, isr) + partition_metadata[partition] = \ + PartitionMetadata( + topic_name, partition, leader, replicas, isr) - topicMetadata[topicName] = partitionMetadata + topic_metadata[topic_name] = partition_metadata - return (brokers, topicMetadata) + return brokers, topic_metadata @classmethod def encode_offset_commit_request(cls, client_id, correlation_id, @@ -408,7 +420,7 @@ def encode_offset_commit_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ @@ -439,7 +451,6 @@ def decode_offset_commit_response(cls, data): data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) - (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) for i in xrange(num_topics): @@ -459,7 +470,7 @@ def encode_offset_fetch_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ @@ -490,7 +501,6 @@ def decode_offset_fetch_response(cls, data): """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) - (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) for i in range(num_topics): @@ -531,7 +541,7 @@ def create_gzip_message(payloads, key=None): key: bytes, a key used for partition routing (optional) """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload) for payload in payloads]) + [create_message(payload) for payload in payloads]) gzipped = gzip_encode(message_set) codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP @@ -552,7 +562,7 @@ def create_snappy_message(payloads, key=None): key: bytes, a key used for partition routing (optional) """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload) for payload in payloads]) + [create_message(payload) for payload in payloads]) snapped = snappy_encode(message_set) codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY diff --git a/kafka/queue.py b/kafka/queue.py index 41f1c313e..ada495f78 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from copy import copy import logging from multiprocessing import Process, Queue, Event @@ -25,8 +27,9 @@ def __init__(self, client, topic, partition, out_queue, barrier, Process.__init__(self) def __str__(self): - return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \ - (self.topic, self.partition, self.consumer_sleep) + return "[KafkaConsumerProcess: topic=%s, \ + partition=%s, sleep=%s]" % \ + (self.topic, self.partition, self.consumer_sleep) def run(self): self.barrier.wait() @@ -70,10 +73,12 @@ def __init__(self, client, topic, in_queue, barrier, Process.__init__(self) def __str__(self): - return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \ - flush_timeout=%s, timeout=%s]" % ( - self.topic, self.producer_flush_buffer, - self.producer_flush_timeout, self.producer_timeout) + return "[KafkaProducerProcess: topic=%s, \ + flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \ + (self.topic, + self.producer_flush_buffer, + self.producer_flush_timeout, + self.producer_timeout) def run(self): self.barrier.wait() @@ -104,8 +109,8 @@ def flush(messages): last_produce = time.time() try: - msg = KafkaClient.create_message(self.in_queue.get(True, - self.producer_timeout)) + msg = KafkaClient.create_message( + self.in_queue.get(True, self.producer_timeout)) messages.append(msg) except Empty: diff --git a/kafka/util.py b/kafka/util.py index 259e2854e..54052fb03 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,9 +1,8 @@ from collections import defaultdict -from itertools import groupby import struct from threading import Thread, Event -from common import * +from kafka.common import BufferUnderflowError def write_int_string(s): @@ -24,32 +23,34 @@ def read_short_string(data, cur): if len(data) < cur + 2: raise BufferUnderflowError("Not enough data left") - (strLen,) = struct.unpack('>h', data[cur:cur + 2]) - if strLen == -1: - return (None, cur + 2) + (strlen,) = struct.unpack('>h', data[cur:cur + 2]) + if strlen == -1: + return None, cur + 2 cur += 2 - if len(data) < cur + strLen: + if len(data) < cur + strlen: raise BufferUnderflowError("Not enough data left") - out = data[cur:cur + strLen] - return (out, cur + strLen) + out = data[cur:cur + strlen] + return out, cur + strlen def read_int_string(data, cur): if len(data) < cur + 4: - raise BufferUnderflowError("Not enough data left") + raise BufferUnderflowError( + "Not enough data left to read string len (%d < %d)" % + (len(data), cur + 4)) - (strLen,) = struct.unpack('>i', data[cur:cur + 4]) - if strLen == -1: - return (None, cur + 4) + (strlen,) = struct.unpack('>i', data[cur:cur + 4]) + if strlen == -1: + return None, cur + 4 cur += 4 - if len(data) < cur + strLen: + if len(data) < cur + strlen: raise BufferUnderflowError("Not enough data left") - out = data[cur:cur + strLen] - return (out, cur + strLen) + out = data[cur:cur + strlen] + return out, cur + strlen def relative_unpack(fmt, data, cur): @@ -58,7 +59,7 @@ def relative_unpack(fmt, data, cur): raise BufferUnderflowError("Not enough data left") out = struct.unpack(fmt, data[cur:cur + size]) - return (out, cur + size) + return out, cur + size def group_by_topic_and_partition(tuples): @@ -68,7 +69,6 @@ def group_by_topic_and_partition(tuples): return out - class ReentrantTimer(object): """ A timer that can be restarted, unlike threading.Timer diff --git a/load_example.py b/load_example.py new file mode 100755 index 000000000..1f8b41820 --- /dev/null +++ b/load_example.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +import threading, logging, time, collections + +from kafka.client import KafkaClient +from kafka.consumer import SimpleConsumer +from kafka.producer import SimpleProducer + +msg_size = 524288 + +class Producer(threading.Thread): + daemon = True + big_msg = "1" * msg_size + + def run(self): + client = KafkaClient("localhost:9092") + producer = SimpleProducer(client) + self.sent = 0 + + while True: + producer.send_messages('my-topic', self.big_msg) + self.sent += 1 + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost:9092") + consumer = SimpleConsumer(client, "test-group", "my-topic", + max_buffer_size = None, + ) + self.valid = 0 + self.invalid = 0 + + for message in consumer: + if len(message.message.value) == msg_size: + self.valid += 1 + else: + self.invalid += 1 + +def main(): + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(10) + print 'Messages sent: %d' % threads[0].sent + print 'Messages recvd: %d' % threads[1].valid + print 'Messages invalid: %d' % threads[1].invalid + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.DEBUG + ) + main() diff --git a/setup.py b/setup.py index 73143d484..8928dd455 100644 --- a/setup.py +++ b/setup.py @@ -1,11 +1,12 @@ -import os.path import sys from setuptools import setup, Command class Tox(Command): + user_options = [] + def initialize_options(self): pass @@ -19,10 +20,9 @@ def run(self): setup( name="kafka-quixey", - version="0.8.1-1", + version="0.9.0-q3", - install_requires=["distribute", "tox"], - tests_require=["tox"], + tests_require=["tox", "mock"], cmdclass={"test": Tox}, packages=["kafka"], diff --git a/test/fixtures.py b/test/fixtures.py index abaaa5cd3..9e283d3c5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -17,6 +17,7 @@ PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src") IVY_ROOT = os.path.expanduser("~/.ivy2/cache") +SCALA_VERSION = '2.8.0' if "PROJECT_ROOT" in os.environ: PROJECT_ROOT = os.environ["PROJECT_ROOT"] @@ -24,6 +25,8 @@ KAFKA_ROOT = os.environ["KAFKA_ROOT"] if "IVY_ROOT" in os.environ: IVY_ROOT = os.environ["IVY_ROOT"] +if "SCALA_VERSION" in os.environ: + SCALA_VERSION = os.environ["SCALA_VERSION"] def test_resource(file): @@ -33,16 +36,8 @@ def test_resource(file): def test_classpath(): # ./kafka-src/bin/kafka-run-class.sh is the authority. jars = ["."] - jars.append(IVY_ROOT + "/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar") - jars.append(IVY_ROOT + "/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar") - jars.append(IVY_ROOT + "/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar") - jars.append(IVY_ROOT + "/log4j/log4j/jars/log4j-1.2.15.jar") - jars.append(IVY_ROOT + "/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar") - jars.append(IVY_ROOT + "/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar") - jars.append(IVY_ROOT + "/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar") - jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-2.8.0/*.jar")) - jars.extend(glob.glob(KAFKA_ROOT + "/core/lib/*.jar")) - jars.extend(glob.glob(KAFKA_ROOT + "/perf/target/scala-2.8.0/kafka*.jar")) + # assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency" + jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-%s/*.jar" % SCALA_VERSION)) jars = filter(os.path.exists, map(os.path.abspath, jars)) return ":".join(jars) @@ -79,6 +74,8 @@ def render_template(source_file, target_file, binding): class ExternalService(object): def __init__(self, host, port): print("Using already running service at %s:%d" % (host, port)) + self.host = host + self.port = port def open(self): pass @@ -211,9 +208,12 @@ def __init__(self, host, port): self.tmp_dir = None self.child = None + def out(self, message): + print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message)) + def open(self): self.tmp_dir = tempfile.mkdtemp() - print("*** Running local Zookeeper instance...") + self.out("Running local instance...") print(" host = %s" % self.host) print(" port = %s" % self.port) print(" tmp_dir = %s" % self.tmp_dir) @@ -232,22 +232,22 @@ def open(self): self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt")) # Party! - print("*** Starting Zookeeper...") + self.out("Starting...") self.child.start() self.child.wait_for(r"Snapshotting") - print("*** Done!") + self.out("Done!") def close(self): - print("*** Stopping Zookeeper...") + self.out("Stopping...") self.child.stop() self.child = None - print("*** Done!") + self.out("Done!") shutil.rmtree(self.tmp_dir) class KafkaFixture(object): @staticmethod - def instance(broker_id, zk_host, zk_port, zk_chroot=None): + def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2): if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") if "KAFKA_URI" in os.environ: @@ -256,11 +256,11 @@ def instance(broker_id, zk_host, zk_port, zk_chroot=None): fixture = ExternalService(host, port) else: (host, port) = ("127.0.0.1", get_open_port()) - fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot) + fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions) fixture.open() return fixture - def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot): + def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2): self.host = host self.port = port @@ -270,19 +270,32 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot): self.zk_port = zk_port self.zk_chroot = zk_chroot + self.replicas = replicas + self.partitions = partitions + self.tmp_dir = None self.child = None + self.running = False + + def out(self, message): + print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message)) def open(self): + if self.running: + self.out("Instance already running") + return + self.tmp_dir = tempfile.mkdtemp() - print("*** Running local Kafka instance") - print(" host = %s" % self.host) - print(" port = %s" % self.port) - print(" broker_id = %s" % self.broker_id) - print(" zk_host = %s" % self.zk_host) - print(" zk_port = %s" % self.zk_port) - print(" zk_chroot = %s" % self.zk_chroot) - print(" tmp_dir = %s" % self.tmp_dir) + self.out("Running local instance...") + print(" host = %s" % self.host) + print(" port = %s" % self.port) + print(" broker_id = %s" % self.broker_id) + print(" zk_host = %s" % self.zk_host) + print(" zk_port = %s" % self.zk_port) + print(" zk_chroot = %s" % self.zk_chroot) + print(" replicas = %s" % self.replicas) + print(" partitions = %s" % self.partitions) + print(" tmp_dir = %s" % self.tmp_dir) # Create directories os.mkdir(os.path.join(self.tmp_dir, "logs")) @@ -301,25 +314,31 @@ def open(self): self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt")) # Party! - print("*** Creating Zookeeper chroot node...") + self.out("Creating Zookeeper chroot node...") proc = subprocess.Popen(kafka_run_class_args( "org.apache.zookeeper.ZooKeeperMain", "-server", "%s:%d" % (self.zk_host, self.zk_port), "create", "/%s" % self.zk_chroot, "kafka-python" )) if proc.wait() != 0: - print("*** Failed to create Zookeeper chroot node") + self.out("Failed to create Zookeeper chroot node") raise RuntimeError("Failed to create Zookeeper chroot node") - print("*** Done!") + self.out("Done!") - print("*** Starting Kafka...") + self.out("Starting...") self.child.start() - self.child.wait_for(r"\[Kafka Server \d+\], started") - print("*** Done!") + self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id) + self.out("Done!") + self.running = True def close(self): - print("*** Stopping Kafka...") + if not self.running: + self.out("Instance already stopped") + return + + self.out("Stopping...") self.child.stop() self.child = None - print("*** Done!") + self.out("Done!") shutil.rmtree(self.tmp_dir) + self.running = False diff --git a/test/resources/kafka.properties b/test/resources/kafka.properties index 2c8416f29..f8732fb46 100644 --- a/test/resources/kafka.properties +++ b/test/resources/kafka.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,7 +32,8 @@ socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dir={tmp_dir}/data -num.partitions=2 +num.partitions={partitions} +default.replication.factor={replicas} ############################# Log Flush Policy ############################# @@ -47,8 +48,8 @@ log.cleanup.interval.mins=1 ############################# Zookeeper ############################# -zk.connect={zk_host}:{zk_port}/{zk_chroot} -zk.connection.timeout.ms=1000000 +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} +zookeeper.connection.timeout.ms=1000000 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter diff --git a/test/test_integration.py b/test/test_integration.py index bf1acc8cd..3d6ccf60b 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,15 +8,41 @@ from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy +from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture -class TestKafkaClient(unittest.TestCase): +def random_string(l): + s = "".join(random.choice(string.letters) for i in xrange(l)) + return s + + +def ensure_topic_creation(client, topic_name): + times = 0 + while True: + times += 1 + client.load_metadata_for_topics(topic_name) + if client.has_metadata_for_topic(topic_name): + break + print "Waiting for %s topic to be created" % topic_name + time.sleep(1) + + if times > 30: + raise Exception("Unable to create topic %s" % topic_name) + + +class KafkaTestCase(unittest.TestCase): + def setUp(self): + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) + ensure_topic_creation(self.client, self.topic) + + +class TestKafkaClient(KafkaTestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server.host, cls.server.port) + cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) @classmethod def tearDownClass(cls): # noqa @@ -29,7 +55,8 @@ def tearDownClass(cls): # noqa ##################### def test_produce_many_simple(self): - produce = ProduceRequest("test_produce_many_simple", 0, messages=[ + + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -37,25 +64,25 @@ def test_produce_many_simple(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 100) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 100) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 200) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 300) def test_produce_10k_simple(self): - produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(10000) ]) @@ -63,7 +90,7 @@ def test_produce_10k_simple(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 10000) def test_produce_many_gzip(self): @@ -72,13 +99,13 @@ def test_produce_many_gzip(self): message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) def test_produce_many_snappy(self): @@ -87,13 +114,13 @@ def test_produce_many_snappy(self): message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) def test_produce_mixed(self): @@ -103,17 +130,17 @@ def test_produce_mixed(self): message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)]) message3 = create_snappy_message(["Snappy %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2, message3]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 201) def test_produce_100k_gzipped(self): - req1 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + req1 = ProduceRequest(self.topic, 0, messages=[ create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) ]) @@ -121,10 +148,10 @@ def test_produce_100k_gzipped(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 50000) - req2 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + req2 = ProduceRequest(self.topic, 0, messages=[ create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)]) ]) @@ -132,7 +159,7 @@ def test_produce_100k_gzipped(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 50000) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 100000) ##################### @@ -140,18 +167,18 @@ def test_produce_100k_gzipped(self): ##################### def test_consume_none(self): - fetch = FetchRequest("test_consume_none", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) - self.assertEquals(fetch_resp.topic, "test_consume_none") + self.assertEquals(fetch_resp.topic, self.topic) self.assertEquals(fetch_resp.partition, 0) messages = list(fetch_resp.messages) self.assertEquals(len(messages), 0) def test_produce_consume(self): - produce = ProduceRequest("test_produce_consume", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Just a test message"), create_message("Message with a key", "foo"), ]) @@ -160,7 +187,7 @@ def test_produce_consume(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - fetch = FetchRequest("test_produce_consume", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) @@ -175,7 +202,7 @@ def test_produce_consume(self): self.assertEquals(messages[1].message.key, "foo") def test_produce_consume_many(self): - produce = ProduceRequest("test_produce_consume_many", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -184,7 +211,7 @@ def test_produce_consume_many(self): self.assertEquals(resp.offset, 0) # 1024 is not enough for 100 messages... - fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) (fetch_resp1,) = self.client.send_fetch_request([fetch1]) @@ -194,7 +221,7 @@ def test_produce_consume_many(self): self.assertTrue(len(messages) < 100) # 10240 should be enough - fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240) + fetch2 = FetchRequest(self.topic, 0, 0, 10240) (fetch_resp2,) = self.client.send_fetch_request([fetch2]) self.assertEquals(fetch_resp2.error, 0) @@ -207,10 +234,10 @@ def test_produce_consume_many(self): self.assertEquals(message.message.key, None) def test_produce_consume_two_partitions(self): - produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Partition 0 %d" % i) for i in range(10) ]) - produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Partition 1 %d" % i) for i in range(10) ]) @@ -218,8 +245,8 @@ def test_produce_consume_two_partitions(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) - fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) self.assertEquals(fetch_resp1.error, 0) self.assertEquals(fetch_resp1.highwaterMark, 10) @@ -242,12 +269,13 @@ def test_produce_consume_two_partitions(self): # Offset Tests # #################### + @unittest.skip('commmit offset not supported in this version') def test_commit_fetch_offsets(self): - req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") + req = OffsetCommitRequest(self.topic, 0, 42, "metadata") (resp,) = self.client.send_offset_commit_request("group", [req]) self.assertEquals(resp.error, 0) - req = OffsetFetchRequest("test_commit_fetch_offsets", 0) + req = OffsetFetchRequest(self.topic, 0) (resp,) = self.client.send_offset_fetch_request("group", [req]) self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 42) @@ -256,8 +284,8 @@ def test_commit_fetch_offsets(self): # Producer Tests def test_simple_producer(self): - producer = SimpleProducer(self.client, "test_simple_producer") - resp = producer.send_messages("one", "two") + producer = SimpleProducer(self.client) + resp = producer.send_messages(self.topic, "one", "two") # Will go to partition 0 self.assertEquals(len(resp), 1) @@ -265,13 +293,13 @@ def test_simple_producer(self): self.assertEquals(resp[0].offset, 0) # offset of first msg # Will go to partition 1 - resp = producer.send_messages("three") + resp = producer.send_messages(self.topic, "three") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 0) # offset of first msg - fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) self.assertEquals(fetch_resp1.error, 0) @@ -287,7 +315,7 @@ def test_simple_producer(self): self.assertEquals(messages[0].message.value, "three") # Will go to partition 0 - resp = producer.send_messages("four", "five") + resp = producer.send_messages(self.topic, "four", "five") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 2) # offset of first msg @@ -295,15 +323,15 @@ def test_simple_producer(self): producer.stop() def test_round_robin_partitioner(self): - producer = KeyedProducer(self.client, "test_round_robin_partitioner", + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - producer.send("key1", "one") - producer.send("key2", "two") - producer.send("key3", "three") - producer.send("key4", "four") + producer.send(self.topic, "key1", "one") + producer.send(self.topic, "key2", "two") + producer.send(self.topic, "key3", "three") + producer.send(self.topic, "key4", "four") - fetch1 = FetchRequest("test_round_robin_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_round_robin_partitioner", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -329,15 +357,15 @@ def test_round_robin_partitioner(self): producer.stop() def test_hashed_partitioner(self): - producer = KeyedProducer(self.client, "test_hash_partitioner", + producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - producer.send(1, "one") - producer.send(2, "two") - producer.send(3, "three") - producer.send(4, "four") + producer.send(self.topic, 1, "one") + producer.send(self.topic, 2, "two") + producer.send(self.topic, 3, "three") + producer.send(self.topic, 4, "four") - fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -363,12 +391,12 @@ def test_hashed_partitioner(self): producer.stop() def test_acks_none(self): - producer = SimpleProducer(self.client, "test_acks_none", + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) - fetch = FetchRequest("test_acks_none", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -382,12 +410,12 @@ def test_acks_none(self): producer.stop() def test_acks_local_write(self): - producer = SimpleProducer(self.client, "test_acks_local_write", + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) - fetch = FetchRequest("test_acks_local_write", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -401,12 +429,13 @@ def test_acks_local_write(self): producer.stop() def test_acks_cluster_commit(self): - producer = SimpleProducer(self.client, "test_acks_cluster_commit", - req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) - resp = producer.send_messages("one") + producer = SimpleProducer( + self.client, + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) - fetch = FetchRequest("test_acks_cluster_commit", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -420,16 +449,14 @@ def test_acks_cluster_commit(self): producer.stop() def test_async_simple_producer(self): - producer = SimpleProducer(self.client, "test_async_simple_producer", - async=True) - - resp = producer.send_messages("one") + producer = SimpleProducer(self.client, async=True) + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) # Give it some time time.sleep(2) - fetch = FetchRequest("test_async_simple_producer", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -443,16 +470,15 @@ def test_async_simple_producer(self): producer.stop() def test_async_keyed_producer(self): - producer = KeyedProducer(self.client, "test_async_keyed_producer", - async=True) + producer = KeyedProducer(self.client, async=True) - resp = producer.send("key1", "one") + resp = producer.send(self.topic, "key1", "one") self.assertEquals(len(resp), 0) # Give it some time time.sleep(2) - fetch = FetchRequest("test_async_keyed_producer", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -466,14 +492,14 @@ def test_async_keyed_producer(self): producer.stop() def test_batched_simple_producer(self): - producer = SimpleProducer(self.client, "test_batched_simple_producer", + producer = SimpleProducer(self.client, batch_send=True, batch_send_every_n=10, batch_send_every_t=20) # Send 5 messages and do a fetch msgs = ["message-%d" % i for i in range(0, 5)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) # Batch mode is async. No ack self.assertEquals(len(resp), 0) @@ -481,8 +507,8 @@ def test_batched_simple_producer(self): # Give it some time time.sleep(2) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -496,13 +522,13 @@ def test_batched_simple_producer(self): # Send 5 more messages, wait for 2 seconds and do a fetch msgs = ["message-%d" % i for i in range(5, 10)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) # Give it some time time.sleep(2) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -516,12 +542,12 @@ def test_batched_simple_producer(self): # Send 7 messages and wait for 20 seconds msgs = ["message-%d" % i for i in range(10, 15)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) msgs = ["message-%d" % i for i in range(15, 17)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch1 = FetchRequest(self.topic, 0, 5, 1024) + fetch2 = FetchRequest(self.topic, 1, 5, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -533,8 +559,8 @@ def test_batched_simple_producer(self): # Give it some time time.sleep(22) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch1 = FetchRequest(self.topic, 0, 5, 1024) + fetch2 = FetchRequest(self.topic, 1, 5, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -546,13 +572,13 @@ def test_batched_simple_producer(self): producer.stop() -class TestConsumer(unittest.TestCase): +class TestConsumer(KafkaTestCase): @classmethod - def setUpClass(cls): # noqa + def setUpClass(cls): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port)) @classmethod def tearDownClass(cls): # noqa @@ -563,7 +589,7 @@ def tearDownClass(cls): # noqa def test_simple_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_simple_consumer", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -572,7 +598,7 @@ def test_simple_consumer(self): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_simple_consumer", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -581,7 +607,9 @@ def test_simple_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer") + consumer = SimpleConsumer(self.client, "group1", + self.topic, auto_commit=False, + iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) @@ -604,6 +632,13 @@ def test_simple_consumer(self): self.assertEquals(len(all_messages), 13) + consumer.stop() + + def test_simple_consumer_blocking(self): + consumer = SimpleConsumer(self.client, "group1", + self.topic, + auto_commit=False, iter_timeout=0) + # Blocking API start = datetime.now() messages = consumer.get_messages(block=True, timeout=5) @@ -612,13 +647,13 @@ def test_simple_consumer(self): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_simple_consumer", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) - self.assertEquals(resp.offset, 100) + self.assertEquals(resp.offset, 0) # Fetch 5 messages messages = consumer.get_messages(count=5, block=True, timeout=5) @@ -636,21 +671,22 @@ def test_simple_consumer(self): def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending") + consumer = SimpleConsumer(self.client, "group1", self.topic, + auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -658,7 +694,7 @@ def test_simple_consumer_pending(self): def test_multi_process_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -667,7 +703,7 @@ def test_multi_process_consumer(self): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -676,7 +712,7 @@ def test_multi_process_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer") + consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) all_messages = [] for message in consumer: all_messages.append(message) @@ -689,11 +725,11 @@ def test_multi_process_consumer(self): start = datetime.now() messages = consumer.get_messages(block=True, timeout=5) diff = (datetime.now() - start).total_seconds() - self.assertGreaterEqual(diff, 5) + self.assertGreaterEqual(diff, 4.999) self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_mpconsumer", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -716,7 +752,7 @@ def test_multi_process_consumer(self): def test_multi_proc_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_mppending", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -724,7 +760,7 @@ def test_multi_proc_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_mppending", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) @@ -732,7 +768,7 @@ def test_multi_proc_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending") + consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -742,30 +778,158 @@ def test_multi_proc_pending(self): def test_large_messages(self): # Produce 10 "normal" size messages messages1 = [create_message(random_string(1024)) for i in range(10)] - produce1 = ProduceRequest("test_large_messages", 0, messages1) + produce1 = ProduceRequest(self.topic, 0, messages1) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - # Produce 10 messages that are too large (bigger than default fetch size) - messages2=[create_message(random_string(5000)) for i in range(10)] - produce2 = ProduceRequest("test_large_messages", 0, messages2) + # Produce 10 messages that are large (bigger than default fetch size) + messages2 = [create_message(random_string(5000)) for i in range(10)] + produce2 = ProduceRequest(self.topic, 0, messages2) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 10) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", "test_large_messages") + consumer = SimpleConsumer(self.client, "group1", self.topic, + auto_commit=False, iter_timeout=0) all_messages = messages1 + messages2 for i, message in enumerate(consumer): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) -def random_string(l): - s = "".join(random.choice(string.letters) for i in xrange(l)) - return s + # Produce 1 message that is too large (bigger than max fetch size) + big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 + big_message = create_message(random_string(big_message_size)) + produce3 = ProduceRequest(self.topic, 0, [big_message]) + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 20) + + self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) + + # Create a consumer with no fetch size limit + big_consumer = SimpleConsumer(self.client, "group1", self.topic, + max_buffer_size=None, partitions=[0], + auto_commit=False, iter_timeout=0) + + # Seek to the last message + big_consumer.seek(-1, 2) + + # Consume giant message successfully + message = big_consumer.get_message(block=False, timeout=10) + self.assertIsNotNone(message) + self.assertEquals(message.message.value, big_message.value) + + +class TestFailover(KafkaTestCase): + + @classmethod + def setUpClass(cls): # noqa + zk_chroot = random_string(10) + replicas = 2 + partitions = 2 + + # mini zookeeper, 2 kafka brokers + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + + hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] + cls.client = KafkaClient(hosts) + + @classmethod + def tearDownClass(cls): + cls.client.close() + for broker in cls.brokers: + broker.close() + cls.zk.close() + + def test_switch_leader(self): + key, topic, partition = random_string(5), self.topic, 0 + producer = SimpleProducer(self.client) + + for i in range(1, 4): + + # XXX unfortunately, the conns dict needs to be warmed for this to work + # XXX unfortunately, for warming to work, we need at least as many partitions as brokers + self._send_random_messages(producer, self.topic, 10) + + # kil leader for partition 0 + broker = self._kill_leader(topic, partition) + + # expect failure, reload meta data + with self.assertRaises(FailedPayloadsError): + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') + time.sleep(1) + + # send to new leader + self._send_random_messages(producer, self.topic, 10) + + broker.open() + time.sleep(3) + + # count number of messages + count = self._count_messages('test_switch_leader group %s' % i, topic) + self.assertIn(count, range(20 * i, 22 * i + 1)) + + producer.stop() + + def test_switch_leader_async(self): + key, topic, partition = random_string(5), self.topic, 0 + producer = SimpleProducer(self.client, async=True) + + for i in range(1, 4): + + self._send_random_messages(producer, self.topic, 10) + + # kil leader for partition 0 + broker = self._kill_leader(topic, partition) + + # expect failure, reload meta data + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') + time.sleep(1) + + # send to new leader + self._send_random_messages(producer, self.topic, 10) + + broker.open() + time.sleep(3) + + # count number of messages + count = self._count_messages('test_switch_leader_async group %s' % i, topic) + self.assertIn(count, range(20 * i, 22 * i + 1)) + + producer.stop() + + def _send_random_messages(self, producer, topic, n): + for j in range(n): + resp = producer.send_messages(topic, random_string(10)) + if len(resp) > 0: + self.assertEquals(resp[0].error, 0) + time.sleep(1) # give it some time + + def _kill_leader(self, topic, partition): + leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] + broker = self.brokers[leader.nodeId] + broker.close() + time.sleep(1) # give it some time + return broker + + def _count_messages(self, group, topic): + hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(hosts) + consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) + all_messages = [] + for message in consumer: + all_messages.append(message) + consumer.stop() + client.close() + return len(all_messages) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) diff --git a/test/test_unit.py b/test/test_unit.py index c796c949f..8c0dd004f 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,12 +3,23 @@ import struct import unittest -from kafka.client import KafkaClient, ProduceRequest, FetchRequest +from mock import MagicMock, patch + +from kafka import KafkaClient +from kafka.common import ( + ProduceRequest, FetchRequest, Message, ChecksumError, + ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, + OffsetAndMessage, BrokerMetadata, PartitionMetadata, + TopicAndPartition, KafkaUnavailableError, + LeaderUnavailableError, PartitionUnavailableError +) from kafka.codec import ( - has_gzip, has_snappy, - gzip_encode, gzip_decode, + has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) +from kafka.protocol import ( + create_gzip_message, create_message, create_snappy_message, KafkaProtocol +) ITERATIONS = 1000 STRLEN = 100 @@ -19,16 +30,13 @@ def random_string(): class TestPackage(unittest.TestCase): - @unittest.expectedFailure + def test_top_level_namespace(self): import kafka as kafka1 self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient") - self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode") - self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode") self.assertEquals(kafka1.client.__name__, "kafka.client") self.assertEquals(kafka1.codec.__name__, "kafka.codec") - @unittest.expectedFailure def test_submodule_namespace(self): import kafka.client as client1 self.assertEquals(client1.__name__, "kafka.client") @@ -47,175 +55,620 @@ def test_submodule_namespace(self): from kafka import KafkaClient as KafkaClient2 self.assertEquals(KafkaClient2.__name__, "KafkaClient") - from kafka import gzip_encode as gzip_encode2 - self.assertEquals(gzip_encode2.__name__, "gzip_encode") - - from kafka import snappy_encode as snappy_encode2 - self.assertEquals(snappy_encode2.__name__, "snappy_encode") - - -class TestMisc(unittest.TestCase): - @unittest.expectedFailure - def test_length_prefix(self): - for i in xrange(ITERATIONS): - s1 = random_string() - s2 = length_prefix_message(s1) - self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1)) + from kafka.codec import snappy_encode + self.assertEquals(snappy_encode.__name__, "snappy_encode") class TestCodec(unittest.TestCase): + + @unittest.skipUnless(has_gzip(), "Gzip not available") def test_gzip(self): - if not has_gzip(): - return for i in xrange(ITERATIONS): s1 = random_string() s2 = gzip_decode(gzip_encode(s1)) self.assertEquals(s1, s2) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy(self): - if not has_snappy(): - return for i in xrange(ITERATIONS): s1 = random_string() s2 = snappy_decode(snappy_encode(s1)) self.assertEquals(s1, s2) - -# XXX(sandello): These really should be protocol tests. -class TestMessage(unittest.TestCase): - @unittest.expectedFailure - def test_create(self): - msg = KafkaClient.create_message("testing") - self.assertEquals(msg.payload, "testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 0) - self.assertEquals(msg.crc, -386704890) - - @unittest.expectedFailure + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_detect_xerial(self): + import kafka as kafka1 + _detect_xerial_stream = kafka1.codec._detect_xerial_stream + + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' + false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + short_data = b'\x01\x02\x03\x04' + + self.assertTrue(_detect_xerial_stream(header)) + self.assertFalse(_detect_xerial_stream(b'')) + self.assertFalse(_detect_xerial_stream(b'\x00')) + self.assertFalse(_detect_xerial_stream(false_header)) + self.assertFalse(_detect_xerial_stream(random_snappy)) + self.assertFalse(_detect_xerial_stream(short_data)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_decode_xerial(self): + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + block_len = len(random_snappy) + random_snappy2 = snappy_encode('XERIAL' * 50) + block_len2 = len(random_snappy2) + + to_test = header \ + + struct.pack('!i', block_len) + random_snappy \ + + struct.pack('!i', block_len2) + random_snappy2 \ + + self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_encode_xerial(self): + to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + + to_test = ('SNAPPY' * 50) + ('XERIAL' * 50) + + compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) + self.assertEquals(compressed, to_ensure) + +class TestProtocol(unittest.TestCase): + + def test_create_message(self): + payload = "test" + key = "key" + msg = create_message(payload, key) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, 0) + self.assertEqual(msg.key, key) + self.assertEqual(msg.value, payload) + + @unittest.skipUnless(has_gzip(), "Snappy not available") def test_create_gzip(self): - msg = KafkaClient.create_gzip_message("testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 1) - # Can't check the crc or payload for gzip since it's non-deterministic - (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload)) - inner = messages[0] - self.assertEquals(inner.magic, 1) - self.assertEquals(inner.attributes, 0) - self.assertEquals(inner.payload, "testing") - self.assertEquals(inner.crc, -386704890) - - @unittest.expectedFailure + payloads = ["v1", "v2"] + msg = create_gzip_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_GZIP) + self.assertEqual(msg.key, None) + # Need to decode to check since gzipped payload is non-deterministic + decoded = gzip_decode(msg.value) + expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2" + "\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff\xff" + "\xff\xff\x00\x00\x00\x02v2") + self.assertEqual(decoded, expect) + + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_create_snappy(self): - msg = KafkaClient.create_snappy_message("testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 2) - self.assertEquals(msg.crc, -62350868) - (messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload)) - inner = messages[0] - self.assertEquals(inner.magic, 1) - self.assertEquals(inner.attributes, 0) - self.assertEquals(inner.payload, "testing") - self.assertEquals(inner.crc, -386704890) - - @unittest.expectedFailure - def test_message_simple(self): - msg = KafkaClient.create_message("testing") - enc = KafkaClient.encode_message(msg) - expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" - self.assertEquals(enc, expect) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], msg) - - @unittest.expectedFailure - def test_message_list(self): - msgs = [ - KafkaClient.create_message("one"), - KafkaClient.create_message("two"), - KafkaClient.create_message("three") - ] - enc = KafkaClient.encode_message_set(msgs) - expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11" - "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three") - self.assertEquals(enc, expect) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_gzip(self): - msg = KafkaClient.create_gzip_message("one", "two", "three") - enc = KafkaClient.encode_message(msg) - # Can't check the bytes directly since Gzip is non-deterministic - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_snappy(self): - msg = KafkaClient.create_snappy_message("one", "two", "three") - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_simple_random(self): - for i in xrange(ITERATIONS): - n = random.randint(0, 10) - msgs = [KafkaClient.create_message(random_string()) for j in range(n)] - enc = KafkaClient.encode_message_set(msgs) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j], msgs[j]) - - @unittest.expectedFailure - def test_message_gzip_random(self): - for i in xrange(ITERATIONS): - n = random.randint(1, 10) - strings = [random_string() for j in range(n)] - msg = KafkaClient.create_gzip_message(*strings) - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j].payload, strings[j]) - - @unittest.expectedFailure - def test_message_snappy_random(self): - for i in xrange(ITERATIONS): - n = random.randint(1, 10) - strings = [random_string() for j in range(n)] - msg = KafkaClient.create_snappy_message(*strings) - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j].payload, strings[j]) - - -class TestRequests(unittest.TestCase): - @unittest.expectedFailure - def test_produce_request(self): - req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")]) - enc = KafkaClient.encode_produce_request(req) - expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" - self.assertEquals(enc, expect) - - @unittest.expectedFailure - def test_fetch_request(self): - req = FetchRequest("my-topic", 0, 0, 1024) - enc = KafkaClient.encode_fetch_request(req) - expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00" - self.assertEquals(enc, expect) + payloads = ["v1", "v2"] + msg = create_snappy_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_SNAPPY) + self.assertEqual(msg.key, None) + expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff" + "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff" + "\xff\xff\xff\x00\x00\x00\x02v2") + self.assertEqual(msg.value, expect) + + def test_encode_message_header(self): + expect = '\x00\n\x00\x00\x00\x00\x00\x04\x00\x07client3' + encoded = KafkaProtocol._encode_message_header("client3", 4, 10) + self.assertEqual(encoded, expect) + + def test_encode_message(self): + message = create_message("test", "key") + encoded = KafkaProtocol._encode_message(message) + expect = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" + self.assertEqual(encoded, expect) + + def test_encode_message_failure(self): + self.assertRaises(Exception, KafkaProtocol._encode_message, + Message(1, 0, "key", "test")) + + def test_encode_message_set(self): + message_set = [create_message("v1", "k1"), create_message("v2", "k2")] + encoded = KafkaProtocol._encode_message_set(message_set) + expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12W\xe7In\x00" + "\x00\x00\x00\x00\x02k1\x00\x00\x00\x02v1\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x12\xff\x06\x02I\x00\x00\x00" + "\x00\x00\x02k2\x00\x00\x00\x02v2") + self.assertEqual(encoded, expect) + + def test_decode_message(self): + encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" + offset = 10 + (returned_offset, decoded_message) = \ + list(KafkaProtocol._decode_message(encoded, offset))[0] + self.assertEqual(returned_offset, offset) + self.assertEqual(decoded_message, create_message("test", "key")) + + def test_decode_message_set(self): + encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' + '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v2') + iter = KafkaProtocol._decode_message_set_iter(encoded) + decoded = list(iter) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + @unittest.skipUnless(has_gzip(), "Gzip not available") + def test_decode_message_gzip(self): + gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' + '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01' + '\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8' + '\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00' + '\x00') + offset = 11 + decoded = list(KafkaProtocol._decode_message(gzip_encoded, offset)) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_decode_message_snappy(self): + snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00' + '\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5' + '\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2') + offset = 11 + decoded = list(KafkaProtocol._decode_message(snappy_encoded, offset)) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_checksum_error(self): + invalid_encoded_message = "This is not a valid encoded message" + iter = KafkaProtocol._decode_message(invalid_encoded_message, 0) + self.assertRaises(ChecksumError, list, iter) + + # NOTE: The error handling in _decode_message_set_iter() is questionable. + # If it's modified, the next two tests might need to be fixed. + def test_decode_message_set_fetch_size_too_small(self): + iter = KafkaProtocol._decode_message_set_iter('a') + self.assertRaises(ConsumerFetchSizeTooSmall, list, iter) + + def test_decode_message_set_stop_iteration(self): + encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' + '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v2') + iter = KafkaProtocol._decode_message_set_iter(encoded + "@#$%(Y!") + decoded = list(iter) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_encode_produce_request(self): + requests = [ProduceRequest("topic1", 0, [create_message("a"), + create_message("b")]), + ProduceRequest("topic2", 1, [create_message("c")])] + expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07' + 'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1' + '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff' + '\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00' + '\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01' + '\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00' + '\x01c') + encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, + 2, 100) + self.assertEqual(encoded, expect) + + def test_decode_produce_response(self): + t1 = "topic1" + t2 = "topic2" + encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)), + 2, 2, len(t1), t1, 2, 0, 0, 10L, 1, 1, 20L, + len(t2), t2, 1, 0, 0, 30L) + responses = list(KafkaProtocol.decode_produce_response(encoded)) + self.assertEqual(responses, + [ProduceResponse(t1, 0, 0, 10L), + ProduceResponse(t1, 1, 1, 20L), + ProduceResponse(t2, 0, 0, 30L)]) + + def test_encode_fetch_request(self): + requests = [FetchRequest("topic1", 0, 10, 1024), + FetchRequest("topic2", 1, 20, 100)] + expect = ('\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07' + 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00' + '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06' + 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00' + '\x00\x00\x14\x00\x00\x00d') + encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, + 100) + self.assertEqual(encoded, expect) + + def test_decode_fetch_response(self): + t1 = "topic1" + t2 = "topic2" + msgs = map(create_message, ["message1", "hi", "boo", "foo", "so fun!"]) + ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]]) + ms2 = KafkaProtocol._encode_message_set([msgs[2]]) + ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]]) + + encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' % + (len(t1), len(ms1), len(ms2), len(t2), len(ms3)), + 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1, + 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30, + len(ms3), ms3) + + responses = list(KafkaProtocol.decode_fetch_response(encoded)) + def expand_messages(response): + return FetchResponse(response.topic, response.partition, + response.error, response.highwaterMark, + list(response.messages)) + + expanded_responses = map(expand_messages, responses) + expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]), + OffsetAndMessage(0, msgs[1])]), + FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]), + FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]), + OffsetAndMessage(0, msgs[4])])] + self.assertEqual(expanded_responses, expect) + + def test_encode_metadata_request_no_topics(self): + encoded = KafkaProtocol.encode_metadata_request("cid", 4) + self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00' + '\x00\x04\x00\x03cid\x00\x00\x00\x00') + + def test_encode_metadata_request_with_topics(self): + encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"]) + self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00' + '\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02' + 't1\x00\x02t2') + + def _create_encoded_metadata_response(self, broker_data, topic_data, + topic_errors, partition_errors): + encoded = struct.pack('>ii', 3, len(broker_data)) + for node_id, broker in broker_data.iteritems(): + encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + len(broker.host), broker.host, broker.port) + + encoded += struct.pack('>i', len(topic_data)) + for topic, partitions in topic_data.iteritems(): + encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], + len(topic), topic, len(partitions)) + for partition, metadata in partitions.iteritems(): + encoded += struct.pack('>hiii', + partition_errors[(topic, partition)], + partition, metadata.leader, + len(metadata.replicas)) + if len(metadata.replicas) > 0: + encoded += struct.pack('>%di' % len(metadata.replicas), + *metadata.replicas) + + encoded += struct.pack('>i', len(metadata.isr)) + if len(metadata.isr) > 0: + encoded += struct.pack('>%di' % len(metadata.isr), + *metadata.isr) + + return encoded + + def test_decode_metadata_response(self): + node_brokers = { + 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), + 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), + 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) + } + topic_partitions = { + "topic1": { + 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), + 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1)) + }, + "topic2": { + 0: PartitionMetadata("topic2", 0, 0, (), ()) + } + } + topic_errors = {"topic1": 0, "topic2": 1} + partition_errors = { + ("topic1", 0): 0, + ("topic1", 1): 1, + ("topic2", 0): 0 + } + encoded = self._create_encoded_metadata_response(node_brokers, + topic_partitions, + topic_errors, + partition_errors) + decoded = KafkaProtocol.decode_metadata_response(encoded) + self.assertEqual(decoded, (node_brokers, topic_partitions)) + + @unittest.skip("Not Implemented") + def test_encode_offset_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_response(self): + pass + + + @unittest.skip("Not Implemented") + def test_encode_offset_commit_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_commit_response(self): + pass + + @unittest.skip("Not Implemented") + def test_encode_offset_fetch_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_fetch_response(self): + pass + + +class TestKafkaClient(unittest.TestCase): + + def test_init_with_list(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_init_with_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts='kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_init_with_unicode_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=u'kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_send_broker_unaware_request_fail(self): + 'Tests that call fails when all hosts are unavailable' + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock() + } + # inject KafkaConnection side effects + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") + + def mock_get_conn(host, port): + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, 'load_metadata_for_topics'): + with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) + + self.assertRaises( + KafkaUnavailableError, + client._send_broker_unaware_request, + 1, 'fake request') + + for key, conn in mocked_conns.iteritems(): + conn.send.assert_called_with(1, 'fake request') + + def test_send_broker_unaware_request(self): + 'Tests that call works when at least one of the host is available' + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock(), + ('kafka03', 9092): MagicMock() + } + # inject KafkaConnection side effects + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' + mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") + def mock_get_conn(host, port): + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, 'load_metadata_for_topics'): + with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_load_metadata(self, protocol, conn): + "Load metadata for all topics" + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(1, 'broker_1', 4567) + brokers[1] = BrokerMetadata(2, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) + } + topics['topic_noleader'] = { + 0: PartitionMetadata('topic_noleader', 0, -1, [], []), + 1: PartitionMetadata('topic_noleader', 1, -1, [], []) + } + topics['topic_no_partitions'] = {} + topics['topic_3'] = { + 0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]), + 2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + # client loads metadata at init + client = KafkaClient(hosts=['broker_1:4567']) + self.assertDictEqual({ + TopicAndPartition('topic_1', 0): brokers[1], + TopicAndPartition('topic_noleader', 0): None, + TopicAndPartition('topic_noleader', 1): None, + TopicAndPartition('topic_3', 0): brokers[0], + TopicAndPartition('topic_3', 1): brokers[1], + TopicAndPartition('topic_3', 2): brokers[0]}, + client.topics_to_brokers) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): + "Get leader for partitions reload metadata if it is not available" + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {'topic_no_partitions': {}} + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + # topic metadata is loaded but empty + self.assertDictEqual({}, client.topics_to_brokers) + + topics['topic_no_partitions'] = { + 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + leader = client._get_leader_for_partition('topic_no_partitions', 0) + + self.assertEqual(brokers[0], leader) + self.assertDictEqual({ + TopicAndPartition('topic_no_partitions', 0): brokers[0]}, + client.topics_to_brokers) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_get_leader_for_unassigned_partitions(self, protocol, conn): + "Get leader raises if no partitions is defined for a topic" + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {'topic_no_partitions': {}} + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + self.assertDictEqual({}, client.topics_to_brokers) + self.assertRaises( + PartitionUnavailableError, + client._get_leader_for_partition, + 'topic_no_partitions', 0) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_get_leader_returns_none_when_noleader(self, protocol, conn): + "Getting leader for partitions returns None when the partiion has no leader" + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_noleader'] = { + 0: PartitionMetadata('topic_noleader', 0, -1, [], []), + 1: PartitionMetadata('topic_noleader', 1, -1, [], []) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + self.assertDictEqual( + { + TopicAndPartition('topic_noleader', 0): None, + TopicAndPartition('topic_noleader', 1): None + }, + client.topics_to_brokers) + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) + + topics['topic_noleader'] = { + 0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) + self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_send_produce_request_raises_when_noleader(self, protocol, conn): + "Send producer request raises LeaderUnavailableError if leader is not available" + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_noleader'] = { + 0: PartitionMetadata('topic_noleader', 0, -1, [], []), + 1: PartitionMetadata('topic_noleader', 1, -1, [], []) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + requests = [ProduceRequest( + "topic_noleader", 0, + [create_message("a"), create_message("b")])] + + self.assertRaises( + LeaderUnavailableError, + client.send_produce_request, requests) if __name__ == '__main__': unittest.main() diff --git a/tox.ini b/tox.ini index f41911c62..0077c4d87 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,9 @@ [tox] envlist = py26, py27 [testenv] -deps = pytest +deps = + pytest + mock commands = py.test --basetemp={envtmpdir} [] setenv = PROJECT_ROOT = {toxinidir}