diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de49a..dd059b6ee 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -5,9 +5,9 @@ import time try: - from queue import Empty, Queue + from queue import Empty, Queue, Full except ImportError: - from Queue import Empty, Queue + from Queue import Empty, Queue, Full from collections import defaultdict from threading import Thread, Event @@ -27,6 +27,8 @@ STOP_ASYNC_PRODUCER = -1 +ASYNC_QUEUE_MAXSIZE = 65536 + def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, stop_event): @@ -64,7 +66,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Send collected requests upstream reqs = [] + topics = set() for topic_partition, msg in msgset.items(): + topics.add(topic_partition.topic) messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, @@ -97,6 +101,7 @@ class Producer(object): batch_send: If True, messages are send in batches batch_send_every_n: If set, messages are send in batches of this size batch_send_every_t: If set, messages are send after this timeout + async_queue_maxsize: sets the upper-bound limit on the number of items that can be placed in the queue """ ACK_NOT_REQUIRED = 0 # No ack is required @@ -111,12 +116,14 @@ def __init__(self, client, async=False, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_queue_maxsize=None): if batch_send: async = True assert batch_send_every_n > 0 assert batch_send_every_t > 0 + assert async_queue_maxsize >= 0 else: batch_send_every_n = 1 batch_send_every_t = 3600 @@ -127,6 +134,9 @@ def __init__(self, client, async=False, self.ack_timeout = ack_timeout self.stopped = False + if async_queue_maxsize is None: + async_queue_maxsize = ASYNC_QUEUE_MAXSIZE + if codec is None: codec = CODEC_NONE elif codec not in ALL_CODECS: @@ -138,7 +148,7 @@ def __init__(self, client, async=False, log.warning("async producer does not guarantee message delivery!") log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + self.queue = Queue(async_queue_maxsize) # Messages are sent through this queue self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -200,7 +210,12 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + try: + item = (TopicAndPartition(topic, partition), m, key) + self.queue.put_nowait(item) + except Full: + log.exception('Async queue is full') + raise resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 2699cf2b6..6fb3b8992 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -24,6 +24,7 @@ class SimpleProducer(Producer): client: The Kafka client instance to use Keyword Arguments: + async_queue_maxsize: sets the upper-bound limit on the number of items that can be placed in the queue async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks: A value indicating the acknowledgements that the server must @@ -45,13 +46,15 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=True): + random_start=True, + async_queue_maxsize=None): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + async_queue_maxsize) def _next_partition(self, topic): if topic not in self.partition_cycles: diff --git a/test/test_producer.py b/test/test_producer.py index f6b3d6a1b..5414b8d64 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -2,9 +2,13 @@ import logging -from mock import MagicMock +try: + from queue import Full +except ImportError: + from Queue import Full +from mock import MagicMock, patch from . import unittest - +from six.moves import xrange from kafka.producer.base import Producer @@ -40,3 +44,51 @@ def partitions(topic): topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called + + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled_batch_send(self, mock): + queue_size = 2 + producer = Producer(MagicMock(), batch_send=True, async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(Full): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() + + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): + queue_size = 2 + producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(Full): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() + + def test_producer_async_queue_normal(self): + queue_size = 4 + producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + acceptable_size = (queue_size / 2 + 1) + + message_list = [message] * acceptable_size + resp = producer.send_messages(topic, partition, *message_list) + self.assertEqual(type(resp), list) + self.assertEqual(producer.queue.qsize(), acceptable_size) + self.assertFalse(producer.queue.full()) diff --git a/tox.ini b/tox.ini index fba7d8e28..d431e3adc 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = lint, py26, py27, pypy, py33, py34, docs +envlist = lint, py27, docs [testenv] deps =