diff --git a/README.md b/README.md index c9f782d12..b18cfe5d9 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,16 @@ consumer = SimpleConsumer(kafka, "my-group", "my-topic") for message in consumer: print(message) +# Gevent based consumer +from kafka import KAFKA_GEVENT_DRIVER +consumer = SimpleConsumer(kafka, "my-group", "my-topic", + driver_type=KAFKA_GEVENT_DRIVER) + +# Threaded consumer +from kafka import KAFKA_THREAD_DRIVER +consumer = SimpleConsumer(kafka, "my-group", "my-topic", + driver_type=KAFKA_THREAD_DRIVER) + kafka.close() ``` @@ -94,22 +104,33 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ## Multiprocess consumer ```python from kafka.client import KafkaClient -from kafka.consumer import MultiProcessConsumer +from kafka.consumer import MultiConsumer kafka = KafkaClient("localhost", 9092) -# This will split the number of partitions among two processes -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) +# This will split the number of partitions among two processes (drivers) +consumer = MultiConsumer(kafka, "my-group", "my-topic", num_drivers=2) # This will spawn processes such that each handles 2 partitions max -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", - partitions_per_proc=2) +consumer = MultiConsumer(kafka, "my-group", "my-topic", + partitions_per_driver=2) for message in consumer: print(message) for message in consumer.get_messages(count=5, block=True, timeout=4): print(message) + +# Gevent based consumer +from kafka import KAFKA_GEVENT_DRIVER +consumer = MultiConsumer(kafka, "my-group", "my-topic", num_drivers=2, + driver_type=KAFKA_GEVENT_DRIVER) + +# Threaded consumer +from kafka import KAFKA_THREAD_DRIVER +consumer = MultiConsumer(kafka, "my-group", "my-topic", + partitions_per_driver=2, + driver_type=KAFKA_THREAD_DRIVER) ``` ## Low level diff --git a/kafka/__init__.py b/kafka/__init__.py index 73aa7603c..e43edebd1 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -5,17 +5,24 @@ __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' from kafka.client import KafkaClient +from kafka.common import KAFKA_THREAD_DRIVER, \ + KAFKA_GEVENT_DRIVER, \ + KAFKA_PROCESS_DRIVER + from kafka.conn import KafkaConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message ) + from kafka.producer import SimpleProducer, KeyedProducer from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner -from kafka.consumer import SimpleConsumer, MultiProcessConsumer +from kafka.consumer import SimpleConsumer, MultiConsumer __all__ = [ - 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', - 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', - 'MultiProcessConsumer', 'create_message', 'create_gzip_message', - 'create_snappy_message' + 'KAFKA_THREAD_DRIVER', 'KAFKA_GEVENT_DRIVER', 'KAFKA_PROCESS_DRIVER', + 'KafkaClient', 'KafkaConnection', + 'SimpleProducer', 'KeyedProducer', + 'RoundRobinPartitioner', 'HashedPartitioner', + 'SimpleConsumer', 'MultiConsumer', + 'create_message', 'create_gzip_message', 'create_snappy_message' ] diff --git a/kafka/client.py b/kafka/client.py index 71ededaa0..7a167f488 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -4,6 +4,7 @@ from itertools import count import logging import socket +import struct import time from kafka.common import ErrorMapping, TopicAndPartition @@ -19,12 +20,17 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + def __init__(self, host, port, bufsize=4096, + client_id=CLIENT_ID, module=socket): # We need one connection to bootstrap + self.host = host + self.port = port self.bufsize = bufsize self.client_id = client_id + self.module = module + self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) + (host, port): KafkaConnection(host, port, bufsize, module=module) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -41,7 +47,8 @@ def _get_conn_for_broker(self, broker): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + KafkaConnection(broker.host, broker.port, self.bufsize, + module=self.module) return self.conns[(broker.host, broker.port)] @@ -199,14 +206,22 @@ def copy(self): Create an inactive copy of the client object A reinit() has to be done on the copy before it can be used again """ - c = copy.deepcopy(self) - for k, v in c.conns.items(): + c = copy.copy(self) + c.module = None + c.conns = {} + c = copy.deepcopy(c) + + for k, v in self.conns.items(): c.conns[k] = v.copy() - return c - def reinit(self): + return copy.deepcopy(c) + + def reinit(self, module=None): + if module is not None and self.module != module: + self.module = module + for conn in self.conns.values(): - conn.reinit() + conn.reinit(module=module) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): diff --git a/kafka/common.py b/kafka/common.py index 6f0dd322b..9536260cd 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -1,5 +1,16 @@ from collections import namedtuple +import Queue +import multiprocessing +import threading +import socket +import gevent +import gevent.event +import gevent.queue +import gevent.pool +import gevent.socket +import time + ############### # Structs # ############### @@ -48,6 +59,39 @@ TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +KAFKA_THREAD_DRIVER = 'thread' +KAFKA_PROCESS_DRIVER = 'process' +KAFKA_GEVENT_DRIVER = 'gevent' + + +class KafkaDriver(object): + def __init__(self, driver_type): + self.socket = socket + self.sleep = time.sleep + self.driver_type = driver_type + + if driver_type == KAFKA_THREAD_DRIVER: + self.Queue = Queue.Queue + self.Event = threading.Event + self.Proc = threading.Thread + + elif driver_type == KAFKA_PROCESS_DRIVER: + self.Queue = multiprocessing.Queue + self.Event = multiprocessing.Event + self.Proc = multiprocessing.Process + + elif driver_type == KAFKA_GEVENT_DRIVER: + self.Queue = gevent.queue.Queue + self.Event = gevent.event.Event + self.socket = gevent.socket + self.Proc = self.gevent_proc + self.sleep = gevent.sleep + + def gevent_proc(self, target=None, args=(), kwargs=None): + kwargs = {} if kwargs is None else kwargs + return gevent.Greenlet(target, *args, **kwargs) + + class ErrorMapping(object): # Many of these are not actually used by the client UNKNOWN = -1 diff --git a/kafka/conn.py b/kafka/conn.py index 0b1eacf51..bf440b2f7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -18,13 +18,18 @@ class KafkaConnection(object): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4096, module=socket): super(KafkaConnection, self).__init__() self.host = host self.port = port self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) + self.module = module + + self._init_sock() + + def _init_sock(self): + self._sock = self.module.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((self.host, self.port)) self._sock.settimeout(10) self._dirty = False @@ -108,9 +113,10 @@ def copy(self): Create an inactive copy of the connection object A reinit() has to be done on the copy before it can be used again """ - c = copy.deepcopy(self) + c = copy.copy(self) c._sock = None - return c + c.module = None + return copy.deepcopy(c) def close(self): """ @@ -119,12 +125,13 @@ def close(self): if self._sock: self._sock.close() - def reinit(self): + def reinit(self, module=None): """ Re-initialize the socket connection """ self.close() - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) - self._dirty = False + + if module is not None and self.module != module: + self.module = module + + self._init_sock() diff --git a/kafka/consumer.py b/kafka/consumer.py index 439f3f07c..620674dcb 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -3,13 +3,15 @@ import logging import time from Queue import Empty -from multiprocessing import Process, Queue, Event, Value, Array, \ - current_process +from multiprocessing import Array, Value +from collections import defaultdict from kafka.common import ( ErrorMapping, FetchRequest, - OffsetRequest, OffsetCommitRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + OffsetRequest, OffsetFetchRequest, OffsetCommitRequest, + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + KAFKA_PROCESS_DRIVER, KAFKA_THREAD_DRIVER, KAFKA_GEVENT_DRIVER, + KafkaDriver ) log = logging.getLogger("kafka") @@ -64,7 +66,7 @@ def __syncup(self): i = 0 for k, v in self.items(): self.array[i] = k - self.array[i+1] = v + self.array[i + 1] = v i += 2 def __setitem__(self, key, value): @@ -110,7 +112,8 @@ def _commit(client, group, topic, count, offsets, partitions=None): count.value = 0 -def _committer(client, group, topic, timeout, queue, event, count, offsets): +def _committer(client, group, topic, driver_type, timeout, + queue, event, count, offsets): """ The process thread which takes care of committing @@ -118,7 +121,8 @@ def _committer(client, group, topic, timeout, queue, event, count, offsets): class. However, multiprocessing module has issues in windows. The functionality breaks unless this function is kept outside of a class """ - client.reinit() + driver = KafkaDriver(driver_type) + client.reinit(module=driver.socket) if timeout is not None: timeout /= 1000.0 @@ -150,11 +154,18 @@ class Consumer(object): * Auto-commit logic * APIs for fetching pending message count """ - def __init__(self, client, group, topic, partitions=None, auto_commit=True, + def __init__(self, client, group, topic, partitions=None, + driver_type=KAFKA_PROCESS_DRIVER, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + slave=False): - self.client = client + # Find out the driver that we are supposed to use and prepare the + # connection socket accordingly + self.driver = KafkaDriver(driver_type) + self.client = client.copy() + self.client.reinit(module=self.driver.socket) self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) @@ -164,8 +175,8 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, partitions = self.client.topic_partitions[topic] # Variables for handling offset commits - self.commit_queue = Queue() - self.commit_event = Event() + self.commit_queue = self.driver.Queue() + self.commit_event = self.driver.Event() self.commit_timer = None self.count_since_commit = Value('i', 0) self.auto_commit = auto_commit @@ -203,15 +214,16 @@ def get_or_init_offset_callback(resp): self.offsets = Offsets(offsets) # Start committer only in the master/controller - if not current_process().daemon: + if not slave: args = (client.copy(), group, topic, + driver_type, self.auto_commit_every_t, self.commit_queue, self.commit_event, self.count_since_commit, self.offsets) - self.commit_timer = Process(target=_committer, args=args) + self.commit_timer = self.driver.Proc(target=_committer, args=args) self.commit_timer.daemon = True self.commit_timer.start() @@ -253,6 +265,8 @@ def stop(self): self.commit_queue.put(-1) self.commit_timer.join() + self.client.close() + def pending(self, partitions=None): """ Gets the pending message count @@ -287,6 +301,7 @@ class SimpleConsumer(Consumer): group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume partitions: An optional list of partitions to consume the data from + driver_type: The driver type to use for the consumer auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume @@ -302,10 +317,13 @@ class SimpleConsumer(Consumer): commit method on this class. A manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, + def __init__(self, client, group, topic, partitions=None, + driver_type=KAFKA_PROCESS_DRIVER, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES): + fetch_size_bytes=FETCH_MIN_BYTES, + slave=False): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME @@ -315,6 +333,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, + driver_type=driver_type, slave=slave, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, auto_commit_every_t=auto_commit_every_t) @@ -493,7 +512,8 @@ def __iter_partition__(self, partition, offset): offset = next_offset + 1 -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): +def _mp_consume(client, group, topic, driver_type, chunk, + queue, start, exit, pause, size): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -504,15 +524,18 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): """ # Make the child processes open separate socket connections - client.reinit() + driver = KafkaDriver(driver_type) + client.reinit(module=driver.socket) # We will start consumers without auto-commit. Auto-commit will be # done by the master controller process. consumer = SimpleConsumer(client, group, topic, partitions=chunk, + driver_type=driver_type, auto_commit=False, auto_commit_every_n=None, - auto_commit_every_t=None) + auto_commit_every_t=None, + slave=True) # Ensure that the consumer provides the partition information consumer.provide_partition_info() @@ -550,24 +573,25 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): consumer.stop() -class MultiProcessConsumer(Consumer): +class MultiConsumer(Consumer): """ A consumer implementation that consumes partitions for a topic in - parallel using multiple processes + parallel using multiple driver instances (process, threads or gevent) client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume + driver_type: The driver type to use for the consumer auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) + num_drivers: Number of driver instances to start for consuming messages. + The available partitions will be divided among these instances + partitions_per_driver: Number of partitions to be allocated per driver + (overrides num_drivers) Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -575,51 +599,65 @@ class MultiProcessConsumer(Consumer): commit method on this class. A manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=True, + def __init__(self, client, group, topic, + driver_type=KAFKA_PROCESS_DRIVER, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): + num_drivers=1, partitions_per_driver=0): # Initiate the base consumer class - super(MultiProcessConsumer, self).__init__( + super(MultiConsumer, self).__init__( client, group, topic, - partitions=None, + partitions=None, driver_type=driver_type, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue(1024) # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start fetch - self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause fetch - self.size = Value('i', 0) # Indicator of number of messages to fetch + + # Child consumers dump messages into this + self.queue = self.driver.Queue(1024) + + # Indicates the consumers to start fetch + self.start = self.driver.Event() + + # Requests the consumers to shutdown + self.exit = self.driver.Event() + + # Requests the consumers to pause fetch + self.pause = self.driver.Event() + + # Indicator of number of messages to fetch + self.size = Value('i', 0) partitions = self.offsets.keys() # If unspecified, start one consumer per partition # The logic below ensures that - # * we do not cross the num_procs limit + # * we do not cross the num_drivers limit # * we have an even distribution of partitions among processes - if not partitions_per_proc: - partitions_per_proc = round(len(partitions) * 1.0 / num_procs) - if partitions_per_proc < num_procs * 0.5: - partitions_per_proc += 1 + if not partitions_per_driver: + partitions_per_driver = round(len(partitions) * 1.0 / num_drivers) + if partitions_per_driver < num_drivers * 0.5: + partitions_per_driver += 1 # The final set of chunks chunker = lambda *x: [] + list(x) - chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) + chunks = map(chunker, *[iter(partitions)] * int(partitions_per_driver)) self.procs = [] + for chunk in chunks: chunk = filter(lambda x: x is not None, chunk) args = (client.copy(), - group, topic, chunk, + group, topic, + driver_type, chunk, self.queue, self.start, self.exit, self.pause, self.size) - proc = Process(target=_mp_consume, args=args) + proc = self.driver.Proc(target=_mp_consume, args=args) proc.daemon = True proc.start() self.procs.append(proc) @@ -632,9 +670,8 @@ def stop(self): for proc in self.procs: proc.join() - proc.terminate() - super(MultiProcessConsumer, self).stop() + super(MultiConsumer, self).stop() def __iter__(self): """ @@ -652,6 +689,7 @@ def __iter__(self): # a chance to run and put some messages in the queue # TODO: This is a hack and will make the consumer block for # at least one second. Need to find a better way of doing this + self.driver.sleep(0) partition, message = self.queue.get(block=True, timeout=1) except Empty: break diff --git a/kafka/producer.py b/kafka/producer.py index 7ef789684..044d9dc1b 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -4,6 +4,7 @@ from multiprocessing import Queue, Process from Queue import Empty import logging +import socket import sys from kafka.common import ProduceRequest @@ -31,7 +32,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, functionality breaks unless this function is kept outside of a class """ stop = False - client.reinit() + client.reinit(module=socket) while not stop: timeout = batch_time diff --git a/setup.py b/setup.py index a24691407..e383e81f2 100644 --- a/setup.py +++ b/setup.py @@ -21,8 +21,8 @@ def run(self): name="kafka-python", version="0.8.1-1", - install_requires=["distribute", "tox"], - tests_require=["tox"], + install_requires=["distribute", "tox", "gevent"], + tests_require=["tox", "gevent"], cmdclass={"test": Tox}, packages=["kafka"], diff --git a/test/test_integration.py b/test/test_integration.py index a10dae243..324b648f9 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -563,9 +563,11 @@ def tearDownClass(cls): # noqa cls.server2.close() cls.zk.close() - def test_simple_consumer(self): + def test_simple_consumer(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_simple_consumer_%s" % (driver_type) + # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_simple_consumer", 0, messages=[ + produce1 = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -574,7 +576,7 @@ def test_simple_consumer(self): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_simple_consumer", 1, messages=[ + produce2 = ProduceRequest(queue, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -583,7 +585,10 @@ def test_simple_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", queue, + auto_commit=False, + driver_type=driver_type) + all_messages = [] for message in consumer: all_messages.append(message) @@ -608,8 +613,11 @@ def test_simple_consumer(self): consumer.stop() - def test_simple_consumer_blocking(self): - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False) + def test_simple_consumer_blocking(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_simple_consumer_blocking_%s" % (driver_type) + + consumer = SimpleConsumer(self.client, "group1", queue, + auto_commit=False) # Blocking API start = datetime.now() @@ -619,7 +627,7 @@ def test_simple_consumer_blocking(self): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_simple_consumer_blocking", 0, messages=[ + produce = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -640,32 +648,52 @@ def test_simple_consumer_blocking(self): consumer.stop() - def test_simple_consumer_pending(self): - # Produce 10 messages to partition 0 and 1 + def test_simple_consumer_blocking_gevent(self): + return self.test_simple_consumer_blocking(driver_type=KAFKA_GEVENT_DRIVER) + + def test_simple_consumer_blocking_thread(self): + return self.test_simple_consumer_blocking(driver_type=KAFKA_THREAD_DRIVER) - produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + def test_simple_consumer_pending(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_simple_pending_%s" % (driver_type) + + # Produce 10 messages to partition 0 and 1 + produce1 = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + produce2 = ProduceRequest(queue, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", queue, + auto_commit=False, + driver_type=driver_type) + self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) consumer.stop() - def test_multi_process_consumer(self): + def test_simple_consumer_pending_gevent(self): + return self.test_simple_consumer_pending( + driver_type=KAFKA_GEVENT_DRIVER) + + def test_simple_consumer_pending_thread(self): + return self.test_simple_consumer_pending( + driver_type=KAFKA_THREAD_DRIVER) + + def test_multi_consumer(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_mpconsumer_%s" % (driver_type) + # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + produce1 = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -674,7 +702,7 @@ def test_multi_process_consumer(self): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + produce2 = ProduceRequest(queue, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -683,7 +711,10 @@ def test_multi_process_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False) + consumer = MultiConsumer(self.client, "grp1", queue, + auto_commit=False, + driver_type=driver_type) + all_messages = [] for message in consumer: all_messages.append(message) @@ -700,7 +731,7 @@ def test_multi_process_consumer(self): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_mpconsumer", 0, messages=[ + produce = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -721,9 +752,17 @@ def test_multi_process_consumer(self): consumer.stop() - def test_multi_proc_pending(self): + def test_multi_consumer_gevent(self): + return self.test_multi_consumer(driver_type=KAFKA_GEVENT_DRIVER) + + def test_multi_consumer_thread(self): + return self.test_multi_consumer(driver_type=KAFKA_THREAD_DRIVER) + + def test_multi_proc_pending(self, driver_type=KAFKA_PROCESS_DRIVER): + queue = "test_mppending_%s" % (driver_type) + # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_mppending", 0, messages=[ + produce1 = ProduceRequest(queue, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -731,7 +770,7 @@ def test_multi_proc_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_mppending", 1, messages=[ + produce2 = ProduceRequest(queue, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) @@ -739,13 +778,22 @@ def test_multi_proc_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False) + consumer = MultiConsumer(self.client, "group1", queue, + auto_commit=False, + driver_type=driver_type) + self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) consumer.stop() + def test_multi_proc_pending_gevent(self): + return self.test_multi_proc_pending(driver_type=KAFKA_GEVENT_DRIVER) + + def test_multi_proc_pending_thread(self): + return self.test_multi_proc_pending(driver_type=KAFKA_THREAD_DRIVER) + def test_large_messages(self): # Produce 10 "normal" size messages messages1 = [create_message(random_string(1024)) for i in range(10)]