Skip to content

Added upper-bound size limit to the kafka producer #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import time

try:
from queue import Empty, Queue
from queue import Empty, Queue, Full
except ImportError:
from Queue import Empty, Queue
from Queue import Empty, Queue, Full
from collections import defaultdict

from threading import Thread, Event
Expand All @@ -27,6 +27,8 @@

STOP_ASYNC_PRODUCER = -1

ASYNC_QUEUE_MAXSIZE = 65536


def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, stop_event):
Expand Down Expand Up @@ -64,7 +66,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,

# Send collected requests upstream
reqs = []
topics = set()
for topic_partition, msg in msgset.items():
topics.add(topic_partition.topic)
messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
Expand Down Expand Up @@ -97,6 +101,7 @@ class Producer(object):
batch_send: If True, messages are send in batches
batch_send_every_n: If set, messages are send in batches of this size
batch_send_every_t: If set, messages are send after this timeout
maxsize: sets the upper-bound limit on the number of items that can be placed in the queue
"""

ACK_NOT_REQUIRED = 0 # No ack is required
Expand All @@ -111,12 +116,14 @@ def __init__(self, client, async=False,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
maxsize=None):

if batch_send:
async = True
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert maxsize >= 0
else:
batch_send_every_n = 1
batch_send_every_t = 3600
Expand All @@ -127,6 +134,9 @@ def __init__(self, client, async=False,
self.ack_timeout = ack_timeout
self.stopped = False

if maxsize is None:
maxsize = ASYNC_QUEUE_MAXSIZE

if codec is None:
codec = CODEC_NONE
elif codec not in ALL_CODECS:
Expand All @@ -138,7 +148,7 @@ def __init__(self, client, async=False,
log.warning("async producer does not guarantee message delivery!")
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
self.queue = Queue(maxsize) # Messages are sent through this queue
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
Expand Down Expand Up @@ -200,7 +210,12 @@ def _send_messages(self, topic, partition, *msg, **kwargs):

if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition), m, key))
try:
item = (TopicAndPartition(topic, partition), m, key)
self.queue.put_nowait(item)
except Full:
log.exception('Async queue is full')
raise
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key)
Expand Down
7 changes: 5 additions & 2 deletions kafka/producer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class SimpleProducer(Producer):
client: The Kafka client instance to use

Keyword Arguments:
maxsize: sets the upper-bound limit on the number of items that can be placed in the queue
async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks: A value indicating the acknowledgements that the server must
Expand All @@ -45,13 +46,15 @@ def __init__(self, client, async=False,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
random_start=True):
random_start=True,
maxsize=None):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)
batch_send_every_t,
maxsize)

def _next_partition(self, topic):
if topic not in self.partition_cycles:
Expand Down
56 changes: 54 additions & 2 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import logging

from mock import MagicMock
try:
from queue import Full
except ImportError:
from Queue import Full
from mock import MagicMock, patch
from . import unittest

from six.moves import xrange
from kafka.producer.base import Producer


Expand Down Expand Up @@ -40,3 +44,51 @@ def partitions(topic):
topic = b"test-topic"
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called

@patch('kafka.producer.base._send_upstream')
def test_producer_async_queue_overfilled_batch_send(self, mock):
queue_size = 2
producer = Producer(MagicMock(), batch_send=True, maxsize=queue_size)

topic = b'test-topic'
partition = 0
message = b'test-message'

with self.assertRaises(Full):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)
self.assertEqual(producer.queue.qsize(), queue_size)
for _ in xrange(producer.queue.qsize()):
producer.queue.get()

@patch('kafka.producer.base._send_upstream')
def test_producer_async_queue_overfilled(self, mock):
queue_size = 2
producer = Producer(MagicMock(), async=True, maxsize=queue_size)

topic = b'test-topic'
partition = 0
message = b'test-message'

with self.assertRaises(Full):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)
self.assertEqual(producer.queue.qsize(), queue_size)
for _ in xrange(producer.queue.qsize()):
producer.queue.get()

def test_producer_async_queue_normal(self):
queue_size = 4
producer = Producer(MagicMock(), async=True, maxsize=queue_size)

topic = b'test-topic'
partition = 0
message = b'test-message'

acceptable_size = (queue_size / 2 + 1)

message_list = [message] * acceptable_size
resp = producer.send_messages(topic, partition, *message_list)
self.assertEqual(type(resp), list)
self.assertEqual(producer.queue.qsize(), acceptable_size)
self.assertFalse(producer.queue.full())
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = lint, py26, py27, pypy, py33, py34, docs
envlist = lint, py27, docs

[testenv]
deps =
Expand Down