Skip to content

Support for multiple hosts on KafkaClient boostrap (improves on #70) #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 27, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# To send messages synchronously
producer = SimpleProducer(kafka)
Expand Down Expand Up @@ -80,7 +80,7 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# HashedPartitioner is default
producer = KeyedProducer(kafka)
Expand All @@ -95,7 +95,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
Expand All @@ -115,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):

```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
Expand Down
2 changes: 1 addition & 1 deletion example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def consume_example(client):
print(message)

def main():
client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")
produce_example(client)
consume_example(client)

Expand Down
2 changes: 1 addition & 1 deletion kafka/NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ There are a few levels of abstraction:

# Possible API

client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")

producer = KafkaProducer(client, "topic")
producer.send_string("hello")
Expand Down
25 changes: 18 additions & 7 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
BrokerResponseError, PartitionUnavailableError,
KafkaUnavailableError, KafkaRequestError)

from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol

log = logging.getLogger("kafka")
Expand All @@ -24,14 +24,15 @@ class KafkaClient(object):
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout.
def __init__(self, host, port, client_id=CLIENT_ID,
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
self.client_id = client_id
self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
(host, port): KafkaConnection(host, port, timeout=timeout)
}
self.hosts = collect_hosts(hosts)

# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
Expand All @@ -41,6 +42,15 @@ def __init__(self, host, port, client_id=CLIENT_ID,
# Private API #
##################

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"

host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port)

return self.conns[host_key]

def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
Expand All @@ -49,7 +59,7 @@ def _get_conn_for_broker(self, broker):
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, timeout=self.timeout)

return self.conns[(broker.host, broker.port)]
return self._get_conn(broker.host, broker.port)

def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
Expand All @@ -72,7 +82,8 @@ def _send_broker_unaware_request(self, requestId, request):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for conn in self.conns.values():
for (host, port) in self.hosts:
conn = self._get_conn(host, port)
try:
conn.send(requestId, request)
response = conn.recv(requestId)
Expand Down
28 changes: 27 additions & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,39 @@
import logging
import socket
import struct
from random import shuffle
from threading import local

from kafka.common import ConnectionError

log = logging.getLogger("kafka")

DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092


def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionnaly
randomize the returned list.
"""

if isinstance(hosts, str):
hosts = hosts.strip().split(',')

result = []
for host_port in hosts:

res = host_port.split(':')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want host_port.strip().split(':') to allow for spaces around the commas

host = res[0]
port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
result.append((host.strip(), port))

if randomize:
shuffle(result)

return result


class KafkaConnection(local):
"""
Expand Down Expand Up @@ -81,7 +107,7 @@ def send(self, request_id, payload):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
except socket.error, e:
except socket.error:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os.path
import sys

from setuptools import setup, Command


class Tox(Command):

user_options = []

def initialize_options(self):
Expand Down
34 changes: 19 additions & 15 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name):

class KafkaTestCase(unittest.TestCase):
def setUp(self):
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
ensure_topic_creation(self.client, self.topic)


Expand All @@ -42,7 +42,7 @@ class TestKafkaClient(KafkaTestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server.host, cls.server.port)
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))

@classmethod
def tearDownClass(cls): # noqa
Expand Down Expand Up @@ -578,7 +578,7 @@ def setUpClass(cls):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server2.host, cls.server2.port)
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port))

@classmethod
def tearDownClass(cls): # noqa
Expand Down Expand Up @@ -826,23 +826,26 @@ def test_large_messages(self):

class TestFailover(KafkaTestCase):

def setUp(self):
@classmethod
def setUpClass(cls): # noqa
zk_chroot = random_string(10)
replicas = 2
partitions = 2

# mini zookeeper, 2 kafka brokers
self.zk = ZookeeperFixture.instance()
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
super(TestFailover, self).setUp()

def tearDown(self):
self.client.close()
for broker in self.brokers:
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]

hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
cls.client = KafkaClient(hosts)

@classmethod
def tearDownClass(cls):
cls.client.close()
for broker in cls.brokers:
broker.close()
self.zk.close()
cls.zk.close()

def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
Expand Down Expand Up @@ -918,7 +921,8 @@ def _kill_leader(self, topic, partition):
return broker

def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer:
Expand Down
84 changes: 83 additions & 1 deletion test/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
import struct
import unittest

from mock import MagicMock, patch


from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata
)
from kafka.common import KafkaUnavailableError
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
Expand Down Expand Up @@ -362,7 +367,6 @@ def test_encode_offset_request(self):
def test_decode_offset_response(self):
pass


@unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self):
pass
Expand All @@ -380,5 +384,83 @@ def test_decode_offset_fetch_response(self):
pass


class TestKafkaClient(unittest.TestCase):

def test_init_with_list(self):

with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(
hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])

self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)

def test_init_with_csv(self):

with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(
hosts='kafka01:9092,kafka02:9092,kafka03:9092')

self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)

def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'

mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock()
}
# inject KafkaConnection side effects
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")

def mock_get_conn(host, port):
return mocked_conns[(host, port)]

# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):

client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])

self.assertRaises(
KafkaUnavailableError,
client._send_broker_unaware_request,
1, 'fake request')

for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')

def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available'

mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock(),
('kafka03', 9092): MagicMock()
}
# inject KafkaConnection side effects
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")

def mock_get_conn(host, port):
return mocked_conns[(host, port)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mock_get_conn is duplicated. It can be a separate method


# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):

client = KafkaClient(hosts='kafka01:9092,kafka02:9092')

resp = client._send_broker_unaware_request(1, 'fake request')

self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)


if __name__ == '__main__':
unittest.main()