Skip to content

Improve fault tolerance by handling leadership election and other metadata changes #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 7, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from functools import partial
from itertools import count
import logging
import socket
import time

from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol

Expand Down Expand Up @@ -70,7 +72,7 @@ def _load_metadata_for_topics(self, *topics):
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)

self.brokers.update(brokers)
self.brokers = brokers
self.topics_to_brokers = {}

for topic, partitions in topics.items():
Expand Down Expand Up @@ -146,13 +148,15 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)

payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))

# Accumulate the responses in a dictionary
acc = {}

# keep a list of payloads that were failed to be sent to brokers
failed_payloads = []

# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn_for_broker(broker)
Expand All @@ -161,15 +165,23 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
correlation_id=requestId, payloads=payloads)

# Send the request, recv the response
conn.send(requestId, request)

if decoder_fn is None:
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
response = conn.recv(requestId)
except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
continue

response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response

if failed_payloads:
raise FailedPayloadsException(failed_payloads)

# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()

Expand Down
5 changes: 5 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class ErrorMapping(object):
# Exceptions #
#################

class FailedPayloadsException(Exception):
pass

class ConnectionError(Exception):
pass

class BufferUnderflowError(Exception):
pass
Expand Down
28 changes: 18 additions & 10 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from threading import local

from kafka.common import BufferUnderflowError

from kafka.common import ConnectionError

log = logging.getLogger("kafka")

Expand All @@ -26,6 +26,7 @@ def __init__(self, host, port, bufsize=4096):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
self._dirty = False

def __str__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
Expand Down Expand Up @@ -53,7 +54,7 @@ def _consume_response_iter(self):
# Read the size off of the header
resp = self._sock.recv(4)
if resp == "":
raise Exception("Got no response from Kafka")
self._raise_connection_error()
(size,) = struct.unpack('>i', resp)

messagesize = size - 4
Expand All @@ -71,6 +72,10 @@ def _consume_response_iter(self):
total += len(resp)
yield resp

def _raise_connection_error(self):
self._dirty = True
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))

##################
# Public API #
##################
Expand All @@ -79,14 +84,16 @@ def _consume_response_iter(self):

def send(self, request_id, payload):
"Send a request to Kafka"

log.debug(
"About to send %d bytes to Kafka, request %d" %
(len(payload), request_id))

sent = self._sock.sendall(payload)
if sent is not None:
raise RuntimeError("Kafka went away")
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
try:
if self._dirty:
self.reinit()
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
except socket.error:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()

def recv(self, request_id):
"""
Expand All @@ -110,3 +117,4 @@ def reinit(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
self._dirty = False
11 changes: 8 additions & 3 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys

from kafka.common import ProduceRequest
from kafka.common import FailedPayloadsException
from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner

Expand Down Expand Up @@ -113,7 +114,7 @@ def _send_upstream(self, queue):
self.client.send_produce_request(reqs, acks=self.req_acks,
timeout=self.ack_timeout)
except Exception:
log.error("Error sending message", exc_info=sys.exc_info())
log.exception("Unable to send message")

def send_messages(self, partition, *msg):
"""
Expand All @@ -126,8 +127,12 @@ def send_messages(self, partition, *msg):
else:
messages = [create_message(m) for m in msg]
req = ProduceRequest(self.topic, partition, messages)
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
except Exception as e:
log.exception("Unable to send messages")
raise e
return resp

def stop(self, timeout=1):
Expand Down
25 changes: 15 additions & 10 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def close(self):

class KafkaFixture(object):
@staticmethod
def instance(broker_id, zk_host, zk_port, zk_chroot=None):
def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
Expand All @@ -251,11 +251,11 @@ def instance(broker_id, zk_host, zk_port, zk_chroot=None):
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot)
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
fixture.open()
return fixture

def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
self.host = host
self.port = port

Expand All @@ -265,19 +265,24 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
self.zk_port = zk_port
self.zk_chroot = zk_chroot

self.replicas = replicas
self.partitions = partitions

self.tmp_dir = None
self.child = None

def open(self):
self.tmp_dir = tempfile.mkdtemp()
print("*** Running local Kafka instance")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" broker_id = %s" % self.broker_id)
print(" zk_host = %s" % self.zk_host)
print(" zk_port = %s" % self.zk_port)
print(" zk_chroot = %s" % self.zk_chroot)
print(" tmp_dir = %s" % self.tmp_dir)
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" broker_id = %s" % self.broker_id)
print(" zk_host = %s" % self.zk_host)
print(" zk_port = %s" % self.zk_port)
print(" zk_chroot = %s" % self.zk_chroot)
print(" replicas = %s" % self.replicas)
print(" partitions = %s" % self.partitions)
print(" tmp_dir = %s" % self.tmp_dir)

# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
Expand Down
3 changes: 2 additions & 1 deletion test/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ socket.request.max.bytes=104857600
############################# Log Basics #############################

log.dir={tmp_dir}/data
num.partitions=2
num.partitions={partitions}
default.replication.factor={replicas}

############################# Log Flush Policy #############################

Expand Down
107 changes: 107 additions & 0 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,113 @@ def test_large_messages(self):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)

class TestFailover(unittest.TestCase):

@classmethod
def setUpClass(cls):

zk_chroot = random_string(10)
replicas = 2
partitions = 2

# mini zookeeper, 2 kafka brokers
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)

@classmethod
def tearDownClass(cls):
cls.client.close()
for broker in cls.brokers:
broker.close()
cls.zk.close()

def test_switch_leader(self):

key, topic, partition = random_string(5), 'test_switch_leader', 0
producer = SimpleProducer(self.client, topic)

for i in range(1, 4):

# XXX unfortunately, the conns dict needs to be warmed for this to work
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
self._send_random_messages(producer, 10)

# kil leader for partition 0
broker = self._kill_leader(topic, partition)

# expect failure, reload meta data
with self.assertRaises(FailedPayloadsException):
producer.send_messages('part 1')
producer.send_messages('part 2')
time.sleep(1)

# send to new leader
self._send_random_messages(producer, 10)

broker.open()
time.sleep(3)

# count number of messages
count = self._count_messages('test_switch_leader group %s' % i, topic)
self.assertIn(count, range(20 * i, 22 * i + 1))

producer.stop()

def test_switch_leader_async(self):

key, topic, partition = random_string(5), 'test_switch_leader_async', 0
producer = SimpleProducer(self.client, topic, async=True)

for i in range(1, 4):

self._send_random_messages(producer, 10)

# kil leader for partition 0
broker = self._kill_leader(topic, partition)

# expect failure, reload meta data
producer.send_messages('part 1')
producer.send_messages('part 2')
time.sleep(1)

# send to new leader
self._send_random_messages(producer, 10)

broker.open()
time.sleep(3)

# count number of messages
count = self._count_messages('test_switch_leader_async group %s' % i, topic)
self.assertIn(count, range(20 * i, 22 * i + 1))

producer.stop()

def _send_random_messages(self, producer, n):
for j in range(n):
resp = producer.send_messages(random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time

def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
time.sleep(1) # give it some time
return broker

def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
all_messages = []
for message in consumer:
all_messages.append(message)
consumer.stop()
client.close()
return len(all_messages)


def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
Expand Down