Skip to content

Support for multiple hosts in KafkaClient #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# To send messages synchronously
producer = SimpleProducer(kafka, "my-topic")
Expand Down Expand Up @@ -81,7 +81,7 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# HashedPartitioner is default
producer = KeyedProducer(kafka, "my-topic")
Expand All @@ -96,7 +96,7 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
Expand All @@ -116,7 +116,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):

```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
Expand Down
2 changes: 1 addition & 1 deletion example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def consume_example(client):
print(message)

def main():
client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")
produce_example(client)
consume_example(client)

Expand Down
2 changes: 1 addition & 1 deletion kafka/NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ There are a few levels of abstraction:

# Possible API

client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")

producer = KafkaProducer(client, "topic")
producer.send_string("hello")
Expand Down
45 changes: 23 additions & 22 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
from functools import partial
from itertools import count
import logging
import socket
import time

from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
from kafka.conn import KafkaConnection
from kafka.conn import collect_hosts, KafkaConnection
from kafka.protocol import KafkaProtocol

log = logging.getLogger("kafka")
Expand All @@ -19,13 +17,15 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()

def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
# We need one connection to bootstrap
self.bufsize = bufsize
self.client_id = client_id
self.conns = { # (host, port) -> KafkaConnection
(host, port): KafkaConnection(host, port, bufsize)
}

self.hosts = collect_hosts(hosts)

# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
Expand All @@ -35,15 +35,19 @@ def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
# Private API #
##################

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"

host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port, self.bufsize)

return self.conns[host_key]

def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, self.bufsize)
"Get or create a connection to a broker"

return self.conns[(broker.host, broker.port)]
return self._get_conn(broker.host, broker.port)

def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
Expand Down Expand Up @@ -82,16 +86,12 @@ def _load_metadata_for_topics(self, *topics):
self.topic_partitions.pop(topic, None)

if not partitions:
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
self._load_metadata_for_topics(topic)
log.info("%s has no partition", topic)
break

for partition, meta in partitions.items():
if meta.leader == -1:
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
self._load_metadata_for_topics(topic)
log.info("%s partition %s is unassigned", topic, str(partition))
else:
topic_part = TopicAndPartition(topic, partition)
self.topics_to_brokers[topic_part] = brokers[meta.leader]
Expand All @@ -108,7 +108,8 @@ def _send_broker_unaware_request(self, requestId, request):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for conn in self.conns.values():
for (host, port) in self.hosts:
conn = self._get_conn(host, port)
try:
conn.send(requestId, request)
response = conn.recv(requestId)
Expand Down Expand Up @@ -174,7 +175,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
self.topics_to_brokers = {} # reset metadata
continue

for response in decoder_fn(response):
Expand Down
33 changes: 26 additions & 7 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import socket
import struct
from random import shuffle
from threading import local

from kafka.common import BufferUnderflowError
Expand All @@ -10,6 +11,26 @@
log = logging.getLogger("kafka")


def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionnaly
randomize the returned list.
"""

result = []
for host_port in hosts.split(","):

res = host_port.split(':')
host = res[0]
port = int(res[1]) if len(res) > 1 else 9092
result.append((host.strip(), port))

if randomize:
shuffle(result)

return result


class KafkaConnection(local):
"""
A socket connection to a single Kafka broker
Expand All @@ -19,14 +40,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
def __init__(self, host, port, bufsize=4096):
def __init__(self, host, port, bufsize=4096, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
self.timeout = timeout

self._sock = socket.create_connection((host, port), timeout=timeout)
self._dirty = False

def __str__(self):
Expand Down Expand Up @@ -125,7 +146,5 @@ def reinit(self):
Re-initialize the socket connection
"""
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
self._dirty = False
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os.path
import sys

from setuptools import setup, Command


class Tox(Command):

user_options = []

def initialize_options(self):
pass

Expand All @@ -21,7 +22,7 @@ def run(self):
name="kafka-python",
version="0.8.1-1",

install_requires=["distribute", "tox"],
install_requires=["distribute", "tox", "mock"],
tests_require=["tox"],
cmdclass={"test": Tox},

Expand Down
23 changes: 14 additions & 9 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server.host, cls.server.port)
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))

@classmethod
def tearDownClass(cls): # noqa
Expand Down Expand Up @@ -554,7 +554,7 @@ def setUpClass(cls):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192)

@classmethod
def tearDownClass(cls): # noqa
Expand Down Expand Up @@ -770,20 +770,23 @@ def test_large_messages(self):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)


class TestFailover(unittest.TestCase):

@classmethod
def setUpClass(cls):

zk_chroot = random_string(10)
replicas = 2
replicas = 2
partitions = 2

# mini zookeeper, 2 kafka brokers
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)

hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
cls.client = KafkaClient(hosts)

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -858,17 +861,19 @@ def _send_random_messages(self, producer, n):
resp = producer.send_messages(random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time
time.sleep(1) # give it some time

def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
time.sleep(1) # give it some time
time.sleep(1) # give it some time
return broker

def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)

hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
all_messages = []
for message in consumer:
Expand Down
Loading