From de01c22a06c022efd0397c5dd57dac9862101036 Mon Sep 17 00:00:00 2001 From: nfraison Date: Mon, 5 Jan 2015 23:13:12 +0100 Subject: [PATCH] Add retry and queue size on async producer Add retry on async producer in case of send failure. Default set to 3 retry with a back retry set to 300 ms Add capabilities to limit the size of the async queue in order to avoid having too much memory consumption by this queue if the async process is not able to send messages fastly. --- .gitignore | 1 + kafka/producer/base.py | 60 +++++++++++++++++++++++++++++----------- kafka/producer/keyed.py | 26 +++++++++++++---- kafka/producer/simple.py | 22 +++++++++++++-- 4 files changed, 85 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index 8cf9c4e72..64fba3203 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ env servers/*/kafka-bin .coverage .noseids +.idea \ No newline at end of file diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92c9..84e7a478b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -4,9 +4,9 @@ import time try: - from queue import Empty + from queue import Empty, Full except ImportError: - from Queue import Empty + from Queue import Empty, Full from collections import defaultdict from multiprocessing import Queue, Process @@ -22,11 +22,17 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES = 0 +BATCH_SEND_QUEUE_MAX_WAIT = -1 + +BATCH_SEND_MAX_RETRY = 3 +BATCH_SEND_RETRY_BACKOFF_MS = 300 + STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout): + req_acks, ack_timeout, batch_send_max_retry, batch_send_retry_backoff_ms): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -73,12 +79,16 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages) reqs.append(req) - try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + for i in range(batch_send_max_retry): + try: + client.send_produce_request(reqs, + acks=req_acks, + timeout=ack_timeout) + except Exception: + log.exception("Unable to send message - retry {0}".format(i)) + time.sleep(float(batch_send_retry_backoff_ms) / float(1000)) + continue + break class Producer(object): @@ -99,11 +109,17 @@ class Producer(object): batch_send - If True, messages are send in batches batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout + batch_send_queue_buffering_max_messages - If set, maximum number of messages + allowed on the async queue + batch_send_queue_max_wait - If set, wait to put messages in the async queue + until free space or this timeout + batch_send_max_retry - Number of retry for async send, default: 3 + batch_send_retry_backoff_ms - sleep between retry, default: 300ms """ - ACK_NOT_REQUIRED = 0 # No ack is required - ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log - ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed + ACK_NOT_REQUIRED = 0 # No ack is required + ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log + ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed DEFAULT_ACK_TIMEOUT = 1000 @@ -113,7 +129,11 @@ def __init__(self, client, async=False, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT, + batch_send_max_retry=BATCH_SEND_MAX_RETRY, + batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS): if batch_send: async = True @@ -127,6 +147,7 @@ def __init__(self, client, async=False, self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout + self.batch_send_queue_max_wait = batch_send_queue_max_wait if codec is None: codec = CODEC_NONE @@ -139,7 +160,7 @@ def __init__(self, client, async=False, log.warning("async producer does not guarantee message delivery!") log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + self.queue = Queue(maxsize=batch_send_queue_buffering_max_messages) # Messages are sent through this queue self.proc = Process(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -147,7 +168,9 @@ def __init__(self, client, async=False, batch_send_every_t, batch_send_every_n, self.req_acks, - self.ack_timeout)) + self.ack_timeout, + batch_send_max_retry, + batch_send_retry_backoff_ms)) # Process will die if main thread exits self.proc.daemon = True @@ -188,7 +211,12 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + try: + self.queue.put((TopicAndPartition(topic, partition), m, key), block=True, + timeout=self.batch_send_queue_max_wait) + except Full: + log.exception('Queue full, failed to put message "{0}" in the async queue after {1} ms'.format( + m, self.batch_send_queue_max_wait)) resp = [] else: messages = create_message_set(msg, self.codec, key) diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 68c70d970..e2894be13 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -5,7 +5,8 @@ from kafka.partitioner import HashedPartitioner from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + BATCH_SEND_QUEUE_MAX_WAIT, BATCH_SEND_MAX_RETRY, BATCH_SEND_RETRY_BACKOFF_MS ) log = logging.getLogger("kafka") @@ -26,14 +27,25 @@ class KeyedProducer(Producer): batch_send - If True, messages are send in batches batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout + batch_send_queue_buffering_max_messages - If set, maximum number of messages + allowed on the async queue + batch_send_queue_max_wait - If set, wait to put messages in the async queue + until free space or this timeout + batch_send_max_retry - Number of retry for async send, default: 3 + batch_send_retry_backoff_ms - sleep between retry, default: 300ms """ + def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT, + batch_send_max_retry=BATCH_SEND_MAX_RETRY, + batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner @@ -42,7 +54,11 @@ def __init__(self, client, partitioner=None, async=False, super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + batch_send_queue_buffering_max_messages, + batch_send_queue_max_wait, + batch_send_max_retry, + batch_send_retry_backoff_ms) def _next_partition(self, topic, key): if topic not in self.partitioners: @@ -54,9 +70,9 @@ def _next_partition(self, topic, key): partitioner = self.partitioners[topic] return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) - def send_messages(self,topic,key,*msg): + def send_messages(self, topic, key, *msg): partition = self._next_partition(topic, key) - return self._send_messages(topic, partition, *msg,key=key) + return self._send_messages(topic, partition, *msg, key=key) def send(self, topic, key, msg): partition = self._next_partition(topic, key) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index a10fa8c96..3da959589 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -9,7 +9,8 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + BATCH_SEND_QUEUE_MAX_WAIT, BATCH_SEND_MAX_RETRY, BATCH_SEND_RETRY_BACKOFF_MS ) log = logging.getLogger("kafka") @@ -30,11 +31,18 @@ class SimpleProducer(Producer): batch_send - If True, messages are send in batches batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout + batch_send_queue_buffering_max_messages - If set, maximum number of messages + allowed on the async queue + batch_send_queue_max_wait - If set, wait to put messages in the async queue + until free space or this timeout + batch_send_max_retry - Number of retry for async send, default: 3 + batch_send_retry_backoff_ms - sleep between retry, default: 300ms random_start - If true, randomize the initial partition which the the first message block will be published to, otherwise if false, the first message block will always publish to partition 0 before cycling through each partition """ + def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, @@ -42,13 +50,21 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, + batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT, + batch_send_max_retry=BATCH_SEND_MAX_RETRY, + batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS, random_start=False): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + batch_send_queue_buffering_max_messages, + batch_send_queue_max_wait, + batch_send_max_retry, + batch_send_retry_backoff_ms) def _next_partition(self, topic): if topic not in self.partition_cycles: @@ -60,7 +76,7 @@ def _next_partition(self, topic): # Randomize the initial partition that is returned if self.random_start: num_partitions = len(self.client.get_partition_ids_for_topic(topic)) - for _ in xrange(random.randint(0, num_partitions-1)): + for _ in xrange(random.randint(0, num_partitions - 1)): next(self.partition_cycles[topic]) return next(self.partition_cycles[topic])