diff --git a/kafka/client.py b/kafka/client.py index 965cbc5f5..c0a3cdb5c 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,9 +2,11 @@ from functools import partial from itertools import count import logging +import socket import time from kafka.common import ErrorMapping, TopicAndPartition +from kafka.common import ConnectionError, FailedPayloadsException from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -70,7 +72,7 @@ def _load_metadata_for_topics(self, *topics): log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) - self.brokers.update(brokers) + self.brokers = brokers self.topics_to_brokers = {} for topic, partitions in topics.items(): @@ -146,13 +148,15 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) # Accumulate the responses in a dictionary acc = {} + # keep a list of payloads that were failed to be sent to brokers + failed_payloads = [] + # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): conn = self._get_conn_for_broker(broker) @@ -161,15 +165,23 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): correlation_id=requestId, payloads=payloads) # Send the request, recv the response - conn.send(requestId, request) - - if decoder_fn is None: + try: + conn.send(requestId, request) + if decoder_fn is None: + continue + response = conn.recv(requestId) + except ConnectionError, e: # ignore BufferUnderflow for now + log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + failed_payloads += payloads + self.topics_to_brokers = {} # reset metadata continue - response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response + if failed_payloads: + raise FailedPayloadsException(failed_payloads) + # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () diff --git a/kafka/common.py b/kafka/common.py index 8f3154cc7..6f0dd322b 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -69,6 +69,11 @@ class ErrorMapping(object): # Exceptions # ################# +class FailedPayloadsException(Exception): + pass + +class ConnectionError(Exception): + pass class BufferUnderflowError(Exception): pass diff --git a/kafka/conn.py b/kafka/conn.py index e85fd110f..9356731a2 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -4,7 +4,7 @@ from threading import local from kafka.common import BufferUnderflowError - +from kafka.common import ConnectionError log = logging.getLogger("kafka") @@ -26,6 +26,7 @@ def __init__(self, host, port, bufsize=4096): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(10) + self._dirty = False def __str__(self): return "" % (self.host, self.port) @@ -53,7 +54,7 @@ def _consume_response_iter(self): # Read the size off of the header resp = self._sock.recv(4) if resp == "": - raise Exception("Got no response from Kafka") + self._raise_connection_error() (size,) = struct.unpack('>i', resp) messagesize = size - 4 @@ -71,6 +72,10 @@ def _consume_response_iter(self): total += len(resp) yield resp + def _raise_connection_error(self): + self._dirty = True + raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + ################## # Public API # ################## @@ -79,14 +84,16 @@ def _consume_response_iter(self): def send(self, request_id, payload): "Send a request to Kafka" - - log.debug( - "About to send %d bytes to Kafka, request %d" % - (len(payload), request_id)) - - sent = self._sock.sendall(payload) - if sent is not None: - raise RuntimeError("Kafka went away") + log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + try: + if self._dirty: + self.reinit() + sent = self._sock.sendall(payload) + if sent is not None: + self._raise_connection_error() + except socket.error: + log.exception('Unable to send payload to Kafka') + self._raise_connection_error() def recv(self, request_id): """ @@ -110,3 +117,4 @@ def reinit(self): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) self._sock.settimeout(10) + self._dirty = False diff --git a/kafka/producer.py b/kafka/producer.py index 5f23285ab..cceb58421 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -7,6 +7,7 @@ import sys from kafka.common import ProduceRequest +from kafka.common import FailedPayloadsException from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner @@ -113,7 +114,7 @@ def _send_upstream(self, queue): self.client.send_produce_request(reqs, acks=self.req_acks, timeout=self.ack_timeout) except Exception: - log.error("Error sending message", exc_info=sys.exc_info()) + log.exception("Unable to send message") def send_messages(self, partition, *msg): """ @@ -126,8 +127,12 @@ def send_messages(self, partition, *msg): else: messages = [create_message(m) for m in msg] req = ProduceRequest(self.topic, partition, messages) - resp = self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) + try: + resp = self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as e: + log.exception("Unable to send messages") + raise e return resp def stop(self, timeout=1): diff --git a/test/fixtures.py b/test/fixtures.py index 00c1afd00..946c64ffc 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -242,7 +242,7 @@ def close(self): class KafkaFixture(object): @staticmethod - def instance(broker_id, zk_host, zk_port, zk_chroot=None): + def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2): if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") if "KAFKA_URI" in os.environ: @@ -251,11 +251,11 @@ def instance(broker_id, zk_host, zk_port, zk_chroot=None): fixture = ExternalService(host, port) else: (host, port) = ("127.0.0.1", get_open_port()) - fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot) + fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions) fixture.open() return fixture - def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot): + def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2): self.host = host self.port = port @@ -265,19 +265,24 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot): self.zk_port = zk_port self.zk_chroot = zk_chroot + self.replicas = replicas + self.partitions = partitions + self.tmp_dir = None self.child = None def open(self): self.tmp_dir = tempfile.mkdtemp() print("*** Running local Kafka instance") - print(" host = %s" % self.host) - print(" port = %s" % self.port) - print(" broker_id = %s" % self.broker_id) - print(" zk_host = %s" % self.zk_host) - print(" zk_port = %s" % self.zk_port) - print(" zk_chroot = %s" % self.zk_chroot) - print(" tmp_dir = %s" % self.tmp_dir) + print(" host = %s" % self.host) + print(" port = %s" % self.port) + print(" broker_id = %s" % self.broker_id) + print(" zk_host = %s" % self.zk_host) + print(" zk_port = %s" % self.zk_port) + print(" zk_chroot = %s" % self.zk_chroot) + print(" replicas = %s" % self.replicas) + print(" partitions = %s" % self.partitions) + print(" tmp_dir = %s" % self.tmp_dir) # Create directories os.mkdir(os.path.join(self.tmp_dir, "logs")) diff --git a/test/resources/kafka.properties b/test/resources/kafka.properties index d42c0971e..f8732fb46 100644 --- a/test/resources/kafka.properties +++ b/test/resources/kafka.properties @@ -32,7 +32,8 @@ socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dir={tmp_dir}/data -num.partitions=2 +num.partitions={partitions} +default.replication.factor={replicas} ############################# Log Flush Policy ############################# diff --git a/test/test_integration.py b/test/test_integration.py index d8ead594a..a10dae243 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -770,6 +770,113 @@ def test_large_messages(self): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) +class TestFailover(unittest.TestCase): + + @classmethod + def setUpClass(cls): + + zk_chroot = random_string(10) + replicas = 2 + partitions = 2 + + # mini zookeeper, 2 kafka brokers + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port) + + @classmethod + def tearDownClass(cls): + cls.client.close() + for broker in cls.brokers: + broker.close() + cls.zk.close() + + def test_switch_leader(self): + + key, topic, partition = random_string(5), 'test_switch_leader', 0 + producer = SimpleProducer(self.client, topic) + + for i in range(1, 4): + + # XXX unfortunately, the conns dict needs to be warmed for this to work + # XXX unfortunately, for warming to work, we need at least as many partitions as brokers + self._send_random_messages(producer, 10) + + # kil leader for partition 0 + broker = self._kill_leader(topic, partition) + + # expect failure, reload meta data + with self.assertRaises(FailedPayloadsException): + producer.send_messages('part 1') + producer.send_messages('part 2') + time.sleep(1) + + # send to new leader + self._send_random_messages(producer, 10) + + broker.open() + time.sleep(3) + + # count number of messages + count = self._count_messages('test_switch_leader group %s' % i, topic) + self.assertIn(count, range(20 * i, 22 * i + 1)) + + producer.stop() + + def test_switch_leader_async(self): + + key, topic, partition = random_string(5), 'test_switch_leader_async', 0 + producer = SimpleProducer(self.client, topic, async=True) + + for i in range(1, 4): + + self._send_random_messages(producer, 10) + + # kil leader for partition 0 + broker = self._kill_leader(topic, partition) + + # expect failure, reload meta data + producer.send_messages('part 1') + producer.send_messages('part 2') + time.sleep(1) + + # send to new leader + self._send_random_messages(producer, 10) + + broker.open() + time.sleep(3) + + # count number of messages + count = self._count_messages('test_switch_leader_async group %s' % i, topic) + self.assertIn(count, range(20 * i, 22 * i + 1)) + + producer.stop() + + def _send_random_messages(self, producer, n): + for j in range(n): + resp = producer.send_messages(random_string(10)) + if len(resp) > 0: + self.assertEquals(resp[0].error, 0) + time.sleep(1) # give it some time + + def _kill_leader(self, topic, partition): + leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] + broker = self.brokers[leader.nodeId] + broker.close() + time.sleep(1) # give it some time + return broker + + def _count_messages(self, group, topic): + client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + consumer = SimpleConsumer(client, group, topic, auto_commit=False) + all_messages = [] + for message in consumer: + all_messages.append(message) + consumer.stop() + client.close() + return len(all_messages) + def random_string(l): s = "".join(random.choice(string.letters) for i in xrange(l))