From 176eef64fd56fbd26e9ea12f910fc08cfdd5ca40 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Mon, 1 Jul 2013 13:03:04 +0530 Subject: [PATCH 1/6] Add support for gevent Basically this commit adds request for drivers. While initializing a consumer, you can specify a driver to use: * process - (default), uses different process instances for commit, etc. * thread - uses different threads for commit, MultiConsumer etc. * gevent - uses gevent This was made possible by using APIs/features common to the multiprocessing, threading and gevent libraries. The implementation was possible with almost no hacks. --- README.md | 31 ++++++++-- kafka/__init__.py | 10 +++- kafka/client.py | 31 ++++++++-- kafka/common.py | 44 ++++++++++++++ kafka/conn.py | 19 ++---- kafka/consumer.py | 124 ++++++++++++++++++++++++--------------- setup.py | 4 +- test/test_integration.py | 77 ++++++++++++++++++------ 8 files changed, 247 insertions(+), 93 deletions(-) diff --git a/README.md b/README.md index 83ec44748..a9ecc9412 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,16 @@ consumer = SimpleConsumer(kafka, "my-group", "my-topic") for message in consumer: print(message) +# Gevent based consumer +from kafka import KAFKA_GEVENT_DRIVER +consumer = SimpleConsumer(kafka, "my-group", "my-topic", + driver_type=KAFKA_GEVENT_DRIVER) + +# Threaded consumer +from kafka import KAFKA_THREAD_DRIVER +consumer = SimpleConsumer(kafka, "my-group", "my-topic", + driver_type=KAFKA_THREAD_DRIVER) + kafka.close() ``` @@ -59,14 +69,14 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ## Multiprocess consumer ```python -from kafka.consume import MultiProcessConsumer +from kafka.consume import MultiConsumer -# This will split the number of partitions among two processes -consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2) +# This will split the number of partitions among two processes (drivers) +consumer = MultiConsumer(kafka, "my-topic", "my-group", num_drivers=2) # This will spawn processes such that each handles 2 partitions max -consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", - partitions_per_proc=2) +consumer = MultiConsumer(kafka, "my-topic", "my-group", + partitions_per_driver=2) for message in consumer: print(message) @@ -75,6 +85,17 @@ for message in consumer.get_messages(count=5, block=True, timeout=4): print(message) ``` +# Gevent based consumer +from kafka import KAFKA_GEVENT_DRIVER +consumer = MultiConsumer(kafka, "my-group", "my-topic", num_drivers=2, + driver_type=KAFKA_GEVENT_DRIVER) + +# Threaded consumer +from kafka import KAFKA_THREAD_DRIVER +consumer = MultiConsumer(kafka, "my-group", "my-topic", + partitions_per_driver=2, + driver_type=KAFKA_THREAD_DRIVER) + ## Low level ```python diff --git a/kafka/__init__.py b/kafka/__init__.py index 13af699d2..49ea8d5f6 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -5,15 +5,21 @@ __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' from kafka.client import KafkaClient +from kafka.common import KAFKA_THREAD_DRIVER, \ + KAFKA_GEVENT_DRIVER, \ + KAFKA_PROCESS_DRIVER + from kafka.conn import KafkaConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message ) from kafka.producer import SimpleProducer -from kafka.consumer import SimpleConsumer, MultiProcessConsumer +from kafka.consumer import SimpleConsumer, MultiConsumer __all__ = [ + 'KAFKA_THREAD_DRIVER', 'KAFKA_GEVENT_DRIVER', 'KAFKA_PROCESS_DRIVER', 'KafkaClient', 'KafkaConnection', 'SimpleProducer', - 'SimpleConsumer', 'MultiProcessConsumer', + 'SimpleConsumer', 'MultiConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' ] + diff --git a/kafka/client.py b/kafka/client.py index 525551ef3..6eff6e101 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -4,6 +4,8 @@ from itertools import count, cycle import logging from operator import attrgetter +import os +import socket import struct import time import zlib @@ -20,11 +22,15 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4096, module=socket): # We need one connection to bootstrap + self.host = host + self.port = port self.bufsize = bufsize + self.module = module + self.pid = os.getpid() self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) + (host, port): KafkaConnection(host, port, bufsize, module=module) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -39,7 +45,8 @@ def _get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + KafkaConnection(broker.host, broker.port, self.bufsize, + module=self.module) return self.conns[(broker.host, broker.port)] @@ -176,9 +183,21 @@ def close(self): for conn in self.conns.values(): conn.close() - def reinit(self): - for conn in self.conns.values(): - conn.reinit() + def dup(self, module=socket, check_and_close_original=True): + """ + Create a duplicate of client instance with re-initialized + connections. Also if the dup is being done within a child + process, optionally close the original client's connections + + module - The module to use for initializing sockets + check_and_close_original - Indicates if the original client must + be closed before dup + """ + if check_and_close_original and os.getpid() != self.pid: + self.close() + + return KafkaClient(self.host, self.port, self.bufsize, + module=self.module) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): diff --git a/kafka/common.py b/kafka/common.py index 9aab8fc17..33f757328 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -1,5 +1,16 @@ from collections import namedtuple +import Queue +import multiprocessing +import threading +import socket +import gevent +import gevent.event +import gevent.queue +import gevent.pool +import gevent.socket +import time + ############### # Structs # ############### @@ -48,6 +59,39 @@ TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +KAFKA_THREAD_DRIVER = 'thread' +KAFKA_PROCESS_DRIVER = 'process' +KAFKA_GEVENT_DRIVER = 'gevent' + + +class KafkaDriver(object): + def __init__(self, driver_type): + self.socket = socket + self.sleep = time.sleep + self.driver_type = driver_type + + if driver_type == KAFKA_THREAD_DRIVER: + self.Queue = Queue.Queue + self.Event = threading.Event + self.Proc = threading.Thread + + elif driver_type == KAFKA_PROCESS_DRIVER: + self.Queue = multiprocessing.Queue + self.Event = multiprocessing.Event + self.Proc = multiprocessing.Process + + elif driver_type == KAFKA_GEVENT_DRIVER: + self.Queue = gevent.queue.Queue + self.Event = gevent.event.Event + self.socket = gevent.socket + self.Proc = self.gevent_proc + self.sleep = gevent.sleep + + def gevent_proc(self, target=None, args=(), kwargs=None): + kwargs = {} if kwargs is None else kwargs + return gevent.Greenlet(target, *args, **kwargs) + + class ErrorMapping(object): # Many of these are not actually used by the client UNKNOWN = -1 diff --git a/kafka/conn.py b/kafka/conn.py index 4c2f240d9..9f6fd8f76 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -13,11 +13,11 @@ class KafkaConnection(object): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4096, module=socket): self.host = host self.port = port self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock = module.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(10) @@ -59,7 +59,8 @@ def _consume_response_iter(self): resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": - raise BufferUnderflowError("Not enough data to read this response") + raise BufferUnderflowError("Not enough data to read " + "this response") total += len(resp) yield resp @@ -71,7 +72,8 @@ def _consume_response_iter(self): def send(self, requestId, payload): "Send a request to Kafka" - log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), requestId)) + log.debug("About to send %d bytes to Kafka, request %d" % + (len(payload), requestId)) sent = self._sock.sendall(payload) if sent != None: raise RuntimeError("Kafka went away") @@ -85,12 +87,3 @@ def recv(self, requestId): def close(self): "Close this connection" self._sock.close() - - def reinit(self): - """ - Re-initialize the socket connection - """ - self._sock.close() - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) diff --git a/kafka/consumer.py b/kafka/consumer.py index caeb11464..263766558 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -3,12 +3,14 @@ import logging import time from Queue import Empty -from multiprocessing import Process, Queue, Event, Value, Array, \ - current_process +from multiprocessing import Array, Value +from collections import defaultdict from kafka.common import ( ErrorMapping, FetchRequest, - OffsetRequest, OffsetFetchRequest, OffsetCommitRequest + OffsetRequest, OffsetFetchRequest, OffsetCommitRequest, + KAFKA_PROCESS_DRIVER, KAFKA_THREAD_DRIVER, KAFKA_GEVENT_DRIVER, + KafkaDriver ) log = logging.getLogger("kafka") @@ -91,11 +93,17 @@ class Consumer(object): * Auto-commit logic * APIs for fetching pending message count """ - def __init__(self, client, group, topic, partitions=None, auto_commit=True, + def __init__(self, client, group, topic, partitions=None, + driver_type=KAFKA_PROCESS_DRIVER, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + slave=False): - self.client = client + # Find out the driver that we are supposed to use and prepare the + # connection socket accordingly + self.driver = KafkaDriver(driver_type) + self.client = client.dup(module=self.driver.socket) self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) @@ -105,8 +113,8 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, partitions = self.client.topic_partitions[topic] # Variables for handling offset commits - self.commit_queue = Queue() - self.commit_event = Event() + self.commit_queue = self.driver.Queue() + self.commit_event = self.driver.Event() self.commit_timer = None self.count_since_commit = Value('i', 0) self.auto_commit = auto_commit @@ -139,9 +147,9 @@ def get_or_init_offset_callback(resp): self.offsets = Offsets(offsets) # Start committer only in the master/controller - if not current_process().daemon: - self.commit_timer = Process(target=self._committer, - args=(self.offsets,)) + if not slave: + self.commit_timer = self.driver.Proc(target=self._committer, + args=(self.offsets,)) self.commit_timer.daemon = True self.commit_timer.start() @@ -149,7 +157,9 @@ def _committer(self, offsets): """ The process thread which takes care of committing """ - self.client.reinit() + + # Prepare an alternate connection socket for use + client = self.client.dup() self.offsets = offsets timeout = self.auto_commit_every_t @@ -167,11 +177,14 @@ def _committer(self, offsets): partitions = None notify = False - self._commit(partitions) + self._commit(partitions, client) if notify: self.commit_event.set() + # Cleanup the client instance + client.close() + def commit(self, partitions=None, block=True, timeout=None): """ Commit offsets for this consumer @@ -187,7 +200,7 @@ def commit(self, partitions=None, block=True, timeout=None): if block: self.commit_event.wait(timeout) - def _commit(self, partitions=None): + def _commit(self, partitions=None, client=None): """ Commit offsets for this consumer @@ -208,7 +221,7 @@ def _commit(self, partitions=None): reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - resps = self.client.send_offset_commit_request(self.group, reqs) + resps = client.send_offset_commit_request(self.group, reqs) for resp in resps: assert resp.error == 0 @@ -232,6 +245,8 @@ def stop(self): self.commit_queue.put(-1) self.commit_timer.join() + self.client.close() + def pending(self, partitions=None): """ Gets the pending message count @@ -266,6 +281,7 @@ class SimpleConsumer(Consumer): group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume partitions: An optional list of partitions to consume the data from + driver_type: The driver type to use for the consumer auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume @@ -279,9 +295,12 @@ class SimpleConsumer(Consumer): commit method on this class. A manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, + def __init__(self, client, group, topic, partitions=None, + driver_type=KAFKA_PROCESS_DRIVER, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + slave=False): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME @@ -290,6 +309,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, super(SimpleConsumer, self).__init__(client, group, topic, partitions=partitions, + driver_type=driver_type, slave=slave, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, auto_commit_every_t=auto_commit_every_t) @@ -450,24 +470,25 @@ def __iter_partition__(self, partition, offset): offset = next_offset + 1 -class MultiProcessConsumer(Consumer): +class MultiConsumer(Consumer): """ A consumer implementation that consumes partitions for a topic in - parallel using multiple processes + parallel using multiple driver instances (process, threads or gevent) client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume + driver_type: The driver type to use for the consumer auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) + num_drivers: Number of driver instances to start for consuming messages. + The available partitions will be divided among these instances + partitions_per_driver: Number of partitions to be allocated per driver + (overrides num_drivers) Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -475,45 +496,57 @@ class MultiProcessConsumer(Consumer): commit method on this class. A manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=True, + def __init__(self, client, group, topic, + driver_type=KAFKA_PROCESS_DRIVER, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): + num_drivers=1, partitions_per_driver=0): # Initiate the base consumer class - super(MultiProcessConsumer, self).__init__(client, group, topic, - partitions=None, + super(MultiConsumer, self).__init__(client, group, topic, + partitions=None, driver_type=driver_type, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue(1024) # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start fetch - self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause fetch - self.size = Value('i', 0) # Indicator of number of messages to fetch + + # Child consumers dump messages into this + self.queue = self.driver.Queue(1024) + + # Indicates the consumers to start fetch + self.start = self.driver.Event() + + # Requests the consumers to shutdown + self.exit = self.driver.Event() + + # Requests the consumers to pause fetch + self.pause = self.driver.Event() + + # Indicator of number of messages to fetch + self.size = Value('i', 0) partitions = self.offsets.keys() # If unspecified, start one consumer per partition # The logic below ensures that - # * we do not cross the num_procs limit + # * we do not cross the num_drivers limit # * we have an even distribution of partitions among processes - if not partitions_per_proc: - partitions_per_proc = round(len(partitions) * 1.0 / num_procs) - if partitions_per_proc < num_procs * 0.5: - partitions_per_proc += 1 + if not partitions_per_driver: + partitions_per_driver = round(len(partitions) * 1.0 / num_drivers) + if partitions_per_driver < num_drivers * 0.5: + partitions_per_driver += 1 # The final set of chunks chunker = lambda *x: [] + list(x) - chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) + chunks = map(chunker, *[iter(partitions)] * int(partitions_per_driver)) self.procs = [] for chunk in chunks: chunk = filter(lambda x: x is not None, chunk) - proc = Process(target=self._consume, args=(chunk,)) + proc = self.driver.Proc(target=self._consume, args=(chunk,)) proc.daemon = True proc.start() self.procs.append(proc) @@ -524,16 +557,15 @@ def _consume(self, partitions): notifications given by the controller process """ - # Make the child processes open separate socket connections - self.client.reinit() - # We will start consumers without auto-commit. Auto-commit will be # done by the master controller process. consumer = SimpleConsumer(self.client, self.group, self.topic, partitions=partitions, auto_commit=False, auto_commit_every_n=None, - auto_commit_every_t=None) + auto_commit_every_t=None, + driver_type=self.driver.driver_type, + slave=True) # Ensure that the consumer provides the partition information consumer.provide_partition_info() @@ -566,7 +598,7 @@ def _consume(self, partitions): # In case we did not receive any message, give up the CPU for # a while before we try again if count == 0: - time.sleep(0.1) + self.driver.sleep(0.1) consumer.stop() @@ -578,9 +610,8 @@ def stop(self): for proc in self.procs: proc.join() - proc.terminate() - super(MultiProcessConsumer, self).stop() + super(MultiConsumer, self).stop() def __iter__(self): """ @@ -598,6 +629,7 @@ def __iter__(self): # a chance to run and put some messages in the queue # TODO: This is a hack and will make the consumer block for # at least one second. Need to find a better way of doing this + self.driver.sleep(0) partition, message = self.queue.get(block=True, timeout=1) except Empty: break diff --git a/setup.py b/setup.py index a24691407..e383e81f2 100644 --- a/setup.py +++ b/setup.py @@ -21,8 +21,8 @@ def run(self): name="kafka-python", version="0.8.1-1", - install_requires=["distribute", "tox"], - tests_require=["tox"], + install_requires=["distribute", "tox", "gevent"], + tests_require=["tox", "gevent"], cmdclass={"test": Tox}, packages=["kafka"], diff --git a/test/test_integration.py b/test/test_integration.py index 7908a34a3..1fe7cd60f 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -289,9 +289,11 @@ def tearDownClass(cls): # noqa cls.server2.close() cls.zk.close() - def test_simple_consumer(self): + def test_simple_consumer(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_simple_consumer_%s" % (driver_type) + # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_simple_consumer", 0, messages=[ + produce1 = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -300,7 +302,7 @@ def test_simple_consumer(self): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_simple_consumer", 1, messages=[ + produce2 = ProduceRequest(queue, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -309,7 +311,8 @@ def test_simple_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer") + consumer = SimpleConsumer(self.client, "group1", queue, + driver_type=driver_type) all_messages = [] for message in consumer: all_messages.append(message) @@ -340,7 +343,7 @@ def test_simple_consumer(self): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_simple_consumer", 0, messages=[ + produce = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -361,32 +364,51 @@ def test_simple_consumer(self): consumer.stop() - def test_simple_consumer_pending(self): - # Produce 10 messages to partition 0 and 1 + def test_simple_consumer_gevent(self): + return self.test_simple_consumer(driver_type=KAFKA_GEVENT_DRIVER) + + def test_simple_consumer_thread(self): + return self.test_simple_consumer(driver_type=KAFKA_THREAD_DRIVER) + + def test_simple_consumer_pending(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_simple_pending_%s" % (driver_type) - produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + # Produce 10 messages to partition 0 and 1 + produce1 = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + produce2 = ProduceRequest(queue, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending") + consumer = SimpleConsumer(self.client, "group1", queue, + driver_type=driver_type) + self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) consumer.stop() - def test_multi_process_consumer(self): + def test_simple_consumer_pending_gevent(self): + return self.test_simple_consumer_pending( + driver_type=KAFKA_GEVENT_DRIVER) + + def test_simple_consumer_pending_thread(self): + return self.test_simple_consumer_pending( + driver_type=KAFKA_THREAD_DRIVER) + + def test_multi_consumer(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_mpconsumer_%s" % (driver_type) + # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + produce1 = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -395,7 +417,7 @@ def test_multi_process_consumer(self): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + produce2 = ProduceRequest(queue, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -404,7 +426,9 @@ def test_multi_process_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer") + consumer = MultiConsumer(self.client, "grp1", queue, + driver_type=driver_type) + all_messages = [] for message in consumer: all_messages.append(message) @@ -421,7 +445,7 @@ def test_multi_process_consumer(self): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_mpconsumer", 0, messages=[ + produce = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -442,9 +466,17 @@ def test_multi_process_consumer(self): consumer.stop() - def test_multi_proc_pending(self): + def test_multi_consumer_gevent(self): + return self.test_multi_consumer(driver_type=KAFKA_GEVENT_DRIVER) + + def test_multi_consumer_thread(self): + return self.test_multi_consumer(driver_type=KAFKA_THREAD_DRIVER) + + def test_multi_proc_pending(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_mppending_%s" % (driver_type) + # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_mppending", 0, messages=[ + produce1 = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -452,7 +484,7 @@ def test_multi_proc_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_mppending", 1, messages=[ + produce2 = ProduceRequest(queue, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) @@ -460,13 +492,20 @@ def test_multi_proc_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending") + consumer = MultiConsumer(self.client, "group1", queue, + driver_type=driver_type) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) consumer.stop() + def test_multi_proc_pending_gevent(self): + return self.test_multi_proc_pending(driver_type=KAFKA_GEVENT_DRIVER) + + def test_multi_proc_pending_thread(self): + return self.test_multi_proc_pending(driver_type=KAFKA_THREAD_DRIVER) + if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) From 4cf0192240e1d7edf7927c555fc92ed41fc66943 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 2 Jul 2013 13:52:11 +0530 Subject: [PATCH 2/6] Markup correction --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a9ecc9412..62befbd04 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,6 @@ for message in consumer: for message in consumer.get_messages(count=5, block=True, timeout=4): print(message) -``` # Gevent based consumer from kafka import KAFKA_GEVENT_DRIVER @@ -95,6 +94,7 @@ from kafka import KAFKA_THREAD_DRIVER consumer = MultiConsumer(kafka, "my-group", "my-topic", partitions_per_driver=2, driver_type=KAFKA_THREAD_DRIVER) +``` ## Low level From 159d62f094cceafb2e263c120d9abb2286d48419 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 30 Jul 2013 08:32:40 +0530 Subject: [PATCH 3/6] Removed trailing whitespace --- kafka/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index 8b7e153ea..104df0249 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -26,4 +26,3 @@ 'SimpleConsumer', 'MultiProcessConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' ] - From dd2b9ddf2787d37470a5e7727c56eeea379a3dc0 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 9 Oct 2013 14:37:25 +0530 Subject: [PATCH 4/6] pep-ify --- kafka/consumer.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 226161b7a..6f16b05f1 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -66,7 +66,7 @@ def __syncup(self): i = 0 for k, v in self.items(): self.array[i] = k - self.array[i+1] = v + self.array[i + 1] = v i += 2 def __setitem__(self, key, value): @@ -326,12 +326,13 @@ def __init__(self, client, group, topic, partitions=None, self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false - super(SimpleConsumer, self).__init__(client, group, topic, - partitions=partitions, - driver_type=driver_type, slave=slave, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + driver_type=driver_type, slave=slave, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) def provide_partition_info(self): """ @@ -600,11 +601,12 @@ def __init__(self, client, group, topic, num_drivers=1, partitions_per_driver=0): # Initiate the base consumer class - super(MultiConsumer, self).__init__(client, group, topic, - partitions=None, driver_type=driver_type, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + super(MultiConsumer, self).__init__( + client, group, topic, + partitions=None, driver_type=driver_type, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master From 33c6c0124a01409e1e29fbb66fbd93588a0b558b Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 9 Oct 2013 14:54:19 +0530 Subject: [PATCH 5/6] ensure that connection reinit is proper in windows --- kafka/client.py | 9 +++++++-- kafka/conn.py | 7 ++++++- kafka/consumer.py | 13 +++++++++---- kafka/producer.py | 3 ++- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index fa963bf2b..7f519c310 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -209,11 +209,16 @@ def copy(self): c = copy.deepcopy(self) for k, v in c.conns.items(): c.conns[k] = v.copy() + + c.module = None return c - def reinit(self): + def reinit(self, module=None): + if module is not None and self.module != module: + self.module = module + for conn in self.conns.values(): - conn.reinit() + conn.reinit(module=module) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): diff --git a/kafka/conn.py b/kafka/conn.py index c9a12959e..d1c001da9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -115,6 +115,7 @@ def copy(self): """ c = copy.deepcopy(self) c._sock = None + c.module = None return c def close(self): @@ -124,9 +125,13 @@ def close(self): if self._sock: self._sock.close() - def reinit(self): + def reinit(self, module=None): """ Re-initialize the socket connection """ self.close() + + if module is not None and self.module != module: + self.module = module + self._init_sock() diff --git a/kafka/consumer.py b/kafka/consumer.py index 6f16b05f1..c8dc733bd 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -112,7 +112,8 @@ def _commit(client, group, topic, count, offsets, partitions=None): count.value = 0 -def _committer(client, group, topic, timeout, queue, event, count, offsets): +def _committer(client, group, topic, driver_type, timeout, + queue, event, count, offsets): """ The process thread which takes care of committing @@ -120,7 +121,8 @@ def _committer(client, group, topic, timeout, queue, event, count, offsets): class. However, multiprocessing module has issues in windows. The functionality breaks unless this function is kept outside of a class """ - client.reinit() + driver = KafkaDriver(driver_type) + client.reinit(module=driver.socket) if timeout is not None: timeout /= 1000.0 @@ -162,7 +164,8 @@ def __init__(self, client, group, topic, partitions=None, # Find out the driver that we are supposed to use and prepare the # connection socket accordingly self.driver = KafkaDriver(driver_type) - self.client = client.dup(module=self.driver.socket) + self.client = client.copy() + self.client.reinit(module=self.driver.socket) self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) @@ -213,6 +216,7 @@ def get_or_init_offset_callback(resp): # Start committer only in the master/controller if not slave: args = (client.copy(), group, topic, + driver_type, self.auto_commit_every_t, self.commit_queue, self.commit_event, @@ -520,7 +524,8 @@ def _mp_consume(client, group, topic, driver_type, chunk, """ # Make the child processes open separate socket connections - client.reinit() + driver = KafkaDriver(driver_type) + client.reinit(module=driver.socket) # We will start consumers without auto-commit. Auto-commit will be # done by the master controller process. diff --git a/kafka/producer.py b/kafka/producer.py index 7ef789684..044d9dc1b 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -4,6 +4,7 @@ from multiprocessing import Queue, Process from Queue import Empty import logging +import socket import sys from kafka.common import ProduceRequest @@ -31,7 +32,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, functionality breaks unless this function is kept outside of a class """ stop = False - client.reinit() + client.reinit(module=socket) while not stop: timeout = batch_time From 8a26fa1ab774603ce7a7d3dcd8fa992d224514b2 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 9 Oct 2013 16:55:03 +0530 Subject: [PATCH 6/6] get gevent and multithreaded modules to work again --- kafka/client.py | 11 +++++++---- kafka/conn.py | 4 ++-- kafka/consumer.py | 3 ++- test/test_integration.py | 15 +++++++++------ 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 7f519c310..7a167f488 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -206,12 +206,15 @@ def copy(self): Create an inactive copy of the client object A reinit() has to be done on the copy before it can be used again """ - c = copy.deepcopy(self) - for k, v in c.conns.items(): + c = copy.copy(self) + c.module = None + c.conns = {} + c = copy.deepcopy(c) + + for k, v in self.conns.items(): c.conns[k] = v.copy() - c.module = None - return c + return copy.deepcopy(c) def reinit(self, module=None): if module is not None and self.module != module: diff --git a/kafka/conn.py b/kafka/conn.py index d1c001da9..bf440b2f7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -113,10 +113,10 @@ def copy(self): Create an inactive copy of the connection object A reinit() has to be done on the copy before it can be used again """ - c = copy.deepcopy(self) + c = copy.copy(self) c._sock = None c.module = None - return c + return copy.deepcopy(c) def close(self): """ diff --git a/kafka/consumer.py b/kafka/consumer.py index c8dc733bd..620674dcb 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -534,7 +534,8 @@ def _mp_consume(client, group, topic, driver_type, chunk, driver_type=driver_type, auto_commit=False, auto_commit_every_n=None, - auto_commit_every_t=None) + auto_commit_every_t=None, + slave=True) # Ensure that the consumer provides the partition information consumer.provide_partition_info() diff --git a/test/test_integration.py b/test/test_integration.py index 4e9f119e6..324b648f9 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -613,8 +613,11 @@ def test_simple_consumer(self, driver_type=KAFKA_PROCESS_DRIVER): consumer.stop() - def test_simple_consumer_blocking(self): - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False) + def test_simple_consumer_blocking(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_simple_consumer_blocking_%s" % (driver_type) + + consumer = SimpleConsumer(self.client, "group1", queue, + auto_commit=False) # Blocking API start = datetime.now() @@ -645,11 +648,11 @@ def test_simple_consumer_blocking(self): consumer.stop() - def test_simple_consumer_gevent(self): - return self.test_simple_consumer(driver_type=KAFKA_GEVENT_DRIVER) + def test_simple_consumer_blocking_gevent(self): + return self.test_simple_consumer_blocking(driver_type=KAFKA_GEVENT_DRIVER) - def test_simple_consumer_thread(self): - return self.test_simple_consumer(driver_type=KAFKA_THREAD_DRIVER) + def test_simple_consumer_blocking_thread(self): + return self.test_simple_consumer_blocking(driver_type=KAFKA_THREAD_DRIVER) def test_simple_consumer_pending(self, driver_type=KAFKA_PROCESS_DRIVER): queue = "test_simple_pending_%s" % (driver_type)