diff --git a/.gitignore b/.gitignore index 8cf9c4e72..64fba3203 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ env servers/*/kafka-bin .coverage .noseids +.idea \ No newline at end of file diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92c9..84e7a478b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -4,9 +4,9 @@ import time try: - from queue import Empty + from queue import Empty, Full except ImportError: - from Queue import Empty + from Queue import Empty, Full from collections import defaultdict from multiprocessing import Queue, Process @@ -22,11 +22,17 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES = 0 +BATCH_SEND_QUEUE_MAX_WAIT = -1 + +BATCH_SEND_MAX_RETRY = 3 +BATCH_SEND_RETRY_BACKOFF_MS = 300 + STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout): + req_acks, ack_timeout, batch_send_max_retry, batch_send_retry_backoff_ms): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -73,12 +79,16 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages) reqs.append(req) - try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + for i in range(batch_send_max_retry): + try: + client.send_produce_request(reqs, + acks=req_acks, + timeout=ack_timeout) + except Exception: + log.exception("Unable to send message - retry {0}".format(i)) + time.sleep(float(batch_send_retry_backoff_ms) / float(1000)) + continue + break class Producer(object): @@ -99,11 +109,17 @@ class Producer(object): batch_send - If True, messages are send in batches batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout + batch_send_queue_buffering_max_messages - If set, maximum number of messages + allowed on the async queue + batch_send_queue_max_wait - If set, wait to put messages in the async queue + until free space or this timeout + batch_send_max_retry - Number of retry for async send, default: 3 + batch_send_retry_backoff_ms - sleep between retry, default: 300ms """ - ACK_NOT_REQUIRED = 0 # No ack is required - ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log - ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed + ACK_NOT_REQUIRED = 0 # No ack is required + ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log + ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed DEFAULT_ACK_TIMEOUT = 1000 @@ -113,7 +129,11 @@ def __init__(self, client, async=False, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT, + batch_send_max_retry=BATCH_SEND_MAX_RETRY, + batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS): if batch_send: async = True @@ -127,6 +147,7 @@ def __init__(self, client, async=False, self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout + self.batch_send_queue_max_wait = batch_send_queue_max_wait if codec is None: codec = CODEC_NONE @@ -139,7 +160,7 @@ def __init__(self, client, async=False, log.warning("async producer does not guarantee message delivery!") log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + self.queue = Queue(maxsize=batch_send_queue_buffering_max_messages) # Messages are sent through this queue self.proc = Process(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -147,7 +168,9 @@ def __init__(self, client, async=False, batch_send_every_t, batch_send_every_n, self.req_acks, - self.ack_timeout)) + self.ack_timeout, + batch_send_max_retry, + batch_send_retry_backoff_ms)) # Process will die if main thread exits self.proc.daemon = True @@ -188,7 +211,12 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + try: + self.queue.put((TopicAndPartition(topic, partition), m, key), block=True, + timeout=self.batch_send_queue_max_wait) + except Full: + log.exception('Queue full, failed to put message "{0}" in the async queue after {1} ms'.format( + m, self.batch_send_queue_max_wait)) resp = [] else: messages = create_message_set(msg, self.codec, key) diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 68c70d970..e2894be13 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -5,7 +5,8 @@ from kafka.partitioner import HashedPartitioner from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + BATCH_SEND_QUEUE_MAX_WAIT, BATCH_SEND_MAX_RETRY, BATCH_SEND_RETRY_BACKOFF_MS ) log = logging.getLogger("kafka") @@ -26,14 +27,25 @@ class KeyedProducer(Producer): batch_send - If True, messages are send in batches batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout + batch_send_queue_buffering_max_messages - If set, maximum number of messages + allowed on the async queue + batch_send_queue_max_wait - If set, wait to put messages in the async queue + until free space or this timeout + batch_send_max_retry - Number of retry for async send, default: 3 + batch_send_retry_backoff_ms - sleep between retry, default: 300ms """ + def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT, + batch_send_max_retry=BATCH_SEND_MAX_RETRY, + batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner @@ -42,7 +54,11 @@ def __init__(self, client, partitioner=None, async=False, super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + batch_send_queue_buffering_max_messages, + batch_send_queue_max_wait, + batch_send_max_retry, + batch_send_retry_backoff_ms) def _next_partition(self, topic, key): if topic not in self.partitioners: @@ -54,9 +70,9 @@ def _next_partition(self, topic, key): partitioner = self.partitioners[topic] return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) - def send_messages(self,topic,key,*msg): + def send_messages(self, topic, key, *msg): partition = self._next_partition(topic, key) - return self._send_messages(topic, partition, *msg,key=key) + return self._send_messages(topic, partition, *msg, key=key) def send(self, topic, key, msg): partition = self._next_partition(topic, key) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index a10fa8c96..3da959589 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -9,7 +9,8 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + BATCH_SEND_QUEUE_MAX_WAIT, BATCH_SEND_MAX_RETRY, BATCH_SEND_RETRY_BACKOFF_MS ) log = logging.getLogger("kafka") @@ -30,11 +31,18 @@ class SimpleProducer(Producer): batch_send - If True, messages are send in batches batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout + batch_send_queue_buffering_max_messages - If set, maximum number of messages + allowed on the async queue + batch_send_queue_max_wait - If set, wait to put messages in the async queue + until free space or this timeout + batch_send_max_retry - Number of retry for async send, default: 3 + batch_send_retry_backoff_ms - sleep between retry, default: 300ms random_start - If true, randomize the initial partition which the the first message block will be published to, otherwise if false, the first message block will always publish to partition 0 before cycling through each partition """ + def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, @@ -42,13 +50,21 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT, + batch_send_max_retry=BATCH_SEND_MAX_RETRY, + batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS, random_start=False): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + batch_send_queue_buffering_max_messages, + batch_send_queue_max_wait, + batch_send_max_retry, + batch_send_retry_backoff_ms) def _next_partition(self, topic): if topic not in self.partition_cycles: @@ -60,7 +76,7 @@ def _next_partition(self, topic): # Randomize the initial partition that is returned if self.random_start: num_partitions = len(self.client.get_partition_ids_for_topic(topic)) - for _ in xrange(random.randint(0, num_partitions-1)): + for _ in xrange(random.randint(0, num_partitions - 1)): next(self.partition_cycles[topic]) return next(self.partition_cycles[topic])