From 68f5506c20d936257f3c11aa12cb692c8c732ed0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 4 Jun 2015 13:24:45 -0700 Subject: [PATCH 01/12] PR 331 fixup: Fix kafka.common imports from kafka.producer.base (one import block, add ProduceResponse) --- kafka/producer/base.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 5cbc5219c..2542df51d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -17,10 +17,9 @@ from kafka.common import ( ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions, kafka_errors, UnsupportedCodecError, FailedPayloadsError, - RequestTimedOutError, AsyncProducerQueueFull, UnknownError + RequestTimedOutError, AsyncProducerQueueFull, UnknownError, + RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES ) -from kafka.common import ( - RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.util import kafka_bytestring From 6faa7c0e697b3096391453e50149c0dac59b05e0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 4 Jun 2015 13:28:48 -0700 Subject: [PATCH 02/12] PR 331 fixup: fix _handle_error closure --- kafka/producer/base.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2542df51d..ef81a69b1 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -89,17 +89,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, continue reqs_to_retry, error_cls = [], None - do_backoff, do_refresh = False, False - - def _handle_error(error_cls, reqs, all_retries): - if ((error_cls == RequestTimedOutError and - retry_options.retry_on_timeouts) or - error_cls in RETRY_ERROR_TYPES): - all_retries += reqs - if error_cls in RETRY_BACKOFF_ERROR_TYPES: - do_backoff = True - if error_cls in RETRY_REFRESH_ERROR_TYPES: - do_refresh = True + retry_state = { + 'do_backoff': False, + 'do_refresh': False + } + + def _handle_error(error_cls, request): + if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)): + reqs_to_retry.append(request) + if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES): + retry_state['do_backoff'] |= True + if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): + retry_state['do_refresh'] |= True try: reply = client.send_produce_request(reqs.keys(), @@ -108,26 +109,26 @@ def _handle_error(error_cls, reqs, all_retries): fail_on_error=False) for i, response in enumerate(reply): if isinstance(response, FailedPayloadsError): - _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry) + _handle_error(FailedPayloadsError, response.payload) elif isinstance(response, ProduceResponse) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) - _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry) + _handle_error(error_cls, reqs.keys()[i]) except Exception as ex: error_cls = kafka_errors.get(type(ex), UnknownError) - _handle_error(error_cls, reqs.keys(), reqs_to_retry) + _handle_error(error_cls, reqs.keys()) if not reqs_to_retry: reqs = {} continue # doing backoff before next retry - if do_backoff and retry_options.backoff_ms: + if retry_state['do_backoff'] and retry_options.backoff_ms: log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) # refresh topic metadata before next retry - if do_refresh: + if retry_state['do_refresh']: client.load_metadata_for_topics() reqs = dict((key, count + 1) for (key, count) in reqs.items() From 3fdd7b78dcf2c29b72f301eec0bf71e74c1672b3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 4 Jun 2015 13:29:46 -0700 Subject: [PATCH 03/12] PR 331 fixup: Support unlimited retries with async_retry_limit=None Async producer defaults are now retry everything always w/ 100ms backoff --- kafka/producer/base.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index ef81a69b1..05adb5e66 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -32,10 +32,10 @@ # unlimited ASYNC_QUEUE_MAXSIZE = 0 ASYNC_QUEUE_PUT_TIMEOUT = 0 -# no retries by default -ASYNC_RETRY_LIMIT = 0 -ASYNC_RETRY_BACKOFF_MS = 0 -ASYNC_RETRY_ON_TIMEOUTS = False +# unlimited retries by default +ASYNC_RETRY_LIMIT = None +ASYNC_RETRY_BACKOFF_MS = 100 +ASYNC_RETRY_ON_TIMEOUTS = True STOP_ASYNC_PRODUCER = -1 @@ -131,8 +131,10 @@ def _handle_error(error_cls, request): if retry_state['do_refresh']: client.load_metadata_for_topics() - reqs = dict((key, count + 1) for (key, count) in reqs.items() - if key in reqs_to_retry and count < retry_options.limit) + reqs = dict((key, count + 1) + for (key, count) in reqs.items() + if key in reqs_to_retry + and (retry_options.limit is None or (count < retry_options.limit))) class Producer(object): From 48e278941206815d680b4d4c81d7f1fd2637255c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 4 Jun 2015 13:31:30 -0700 Subject: [PATCH 04/12] PR 331 fixup: log warnings on async producer backoff and metadata refresh --- kafka/producer/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 05adb5e66..c1bc0c59c 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -124,11 +124,12 @@ def _handle_error(error_cls, request): # doing backoff before next retry if retry_state['do_backoff'] and retry_options.backoff_ms: - log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) + log.warn('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) # refresh topic metadata before next retry if retry_state['do_refresh']: + log.warn('Async producer forcing metadata refresh metadata before retrying') client.load_metadata_for_topics() reqs = dict((key, count + 1) From d96a9b732ececb2f319e9e37ad4e040b366ce80b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 5 Jun 2015 12:42:07 -0700 Subject: [PATCH 05/12] PR 331 fixup: Dont need try/except when calling send_produce_requests with fail_on_error=False --- kafka/producer/base.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c1bc0c59c..0bb0c81b2 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -102,21 +102,22 @@ def _handle_error(error_cls, request): if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): retry_state['do_refresh'] |= True - try: - reply = client.send_produce_request(reqs.keys(), - acks=req_acks, - timeout=ack_timeout, - fail_on_error=False) - for i, response in enumerate(reply): - if isinstance(response, FailedPayloadsError): - _handle_error(FailedPayloadsError, response.payload) - elif isinstance(response, ProduceResponse) and response.error: - error_cls = kafka_errors.get(response.error, UnknownError) - _handle_error(error_cls, reqs.keys()[i]) - - except Exception as ex: - error_cls = kafka_errors.get(type(ex), UnknownError) - _handle_error(error_cls, reqs.keys()) + reply = client.send_produce_request(request_tries.keys(), + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + for i, response in enumerate(reply): + error_cls = None + if isinstance(response, FailedPayloadsError): + error_cls = response.__class__ + orig_req = response.payload + + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + orig_req = request_tries.keys()[i] + + if error_cls: + _handle_error(error_cls, orig_req) if not reqs_to_retry: reqs = {} From 794ab5bba4807888839c2030d9b97422bddc3cc9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 5 Jun 2015 12:44:39 -0700 Subject: [PATCH 06/12] PR 331 fixup: Rename reqs dict to request_tries --- kafka/producer/base.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0bb0c81b2..15768be31 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -47,7 +47,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, a specified timeout and send them upstream to the brokers in one request """ - reqs = {} + request_tries = {} client.reinit() while not stop_event.is_set(): @@ -55,7 +55,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # it's a simplification: we're comparing message sets and # messages: each set can contain [1..batch_size] messages - count = batch_size - len(reqs) + count = batch_size - len(request_tries) send_at = time.time() + timeout msgset = defaultdict(list) @@ -83,9 +83,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, req = ProduceRequest(topic_partition.topic, topic_partition.partition, tuple(messages)) - reqs[req] = 0 + request_tries[req] = 0 - if not reqs: + if not request_tries: continue reqs_to_retry, error_cls = [], None @@ -120,7 +120,7 @@ def _handle_error(error_cls, request): _handle_error(error_cls, orig_req) if not reqs_to_retry: - reqs = {} + request_tries = {} continue # doing backoff before next retry @@ -133,10 +133,14 @@ def _handle_error(error_cls, request): log.warn('Async producer forcing metadata refresh metadata before retrying') client.load_metadata_for_topics() - reqs = dict((key, count + 1) - for (key, count) in reqs.items() - if key in reqs_to_retry - and (retry_options.limit is None or (count < retry_options.limit))) + # Apply retry limit, dropping messages that are over + request_tries = dict( + (key, count + 1) + for (key, count) in request_tries.items() + if key in reqs_to_retry + and (retry_options.limit is None + or (count < retry_options.limit)) + ) class Producer(object): From 9712f613c9e7e4b0436f501b513249eab4edc4e9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 6 Jun 2015 16:09:30 -0700 Subject: [PATCH 07/12] PR 331 fixup: do not attempt to get new messages if there are pending retries --- kafka/producer/base.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 15768be31..2f47d879b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -52,13 +52,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while not stop_event.is_set(): timeout = batch_time - - # it's a simplification: we're comparing message sets and - # messages: each set can contain [1..batch_size] messages - count = batch_size - len(request_tries) + count = batch_size send_at = time.time() + timeout msgset = defaultdict(list) + # Merging messages will require a bit more work to manage correctly + # for now, dont look for new batches if we have old ones to retry + if request_tries: + count = 0 + log.debug('Skipping new batch collection to handle retries') + else: + log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout)) + # Keep fetching till we gather enough messages or a # timeout is reached while count > 0 and timeout >= 0: From 0f1579b047fc63c09596897cc1c83730bd0ddb94 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 5 Jun 2015 12:50:43 -0700 Subject: [PATCH 08/12] Log retries and failed messages in async producer (configurable as full messages or hash()) --- kafka/producer/base.py | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2f47d879b..cd14ab6eb 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -36,12 +36,14 @@ ASYNC_RETRY_LIMIT = None ASYNC_RETRY_BACKOFF_MS = 100 ASYNC_RETRY_ON_TIMEOUTS = True +ASYNC_LOG_MESSAGES_ON_ERROR = True STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, retry_options, stop_event): + req_acks, ack_timeout, retry_options, stop_event, + log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -123,6 +125,10 @@ def _handle_error(error_cls, request): if error_cls: _handle_error(error_cls, orig_req) + log.error('Error sending ProduceRequest to %s:%d with msgs %s', + orig_req.topic, orig_req.partition, + orig_req.messages if log_messages_on_error + else hash(orig_req.messages)) if not reqs_to_retry: request_tries = {} @@ -147,6 +153,13 @@ def _handle_error(error_cls, request): or (count < retry_options.limit)) ) + # Log messages we are going to retry + for orig_req in request_tries.keys(): + log.info('Retrying ProduceRequest to %s:%d with msgs %s', + orig_req.topic, orig_req.partition, + orig_req.messages if log_messages_on_error + else hash(orig_req.messages)) + class Producer(object): """ @@ -185,7 +198,8 @@ def __init__(self, client, async=False, async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, - async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): if batch_send: async = True @@ -218,16 +232,14 @@ def __init__(self, client, async=False, backoff_ms=async_retry_backoff_ms, retry_on_timeouts=async_retry_on_timeouts) self.thread_stop_event = Event() - self.thread = Thread(target=_send_upstream, - args=(self.queue, - self.client.copy(), - self.codec, - batch_send_every_t, - batch_send_every_n, - self.req_acks, - self.ack_timeout, - async_retry_options, - self.thread_stop_event)) + self.thread = Thread( + target=_send_upstream, + args=(self.queue, self.client.copy(), self.codec, + batch_send_every_t, batch_send_every_n, + self.req_acks, self.ack_timeout, + async_retry_options, self.thread_stop_event), + kwargs={'log_messages_on_error': async_log_messages_on_error} + ) # Thread will die if main thread exits self.thread.daemon = True From aa217e05448b4eced017b5ecdcb020a4411f863f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 5 Jun 2015 14:12:14 -0700 Subject: [PATCH 09/12] Deprecate async producer batch_send kwarg -- use 'async' instead --- docs/usage.rst | 2 +- kafka/producer/base.py | 62 ++++++++++++++++++++----------- kafka/producer/keyed.py | 4 +- kafka/producer/simple.py | 4 +- test/test_producer.py | 17 --------- test/test_producer_integration.py | 4 +- 6 files changed, 47 insertions(+), 46 deletions(-) diff --git a/docs/usage.rst b/docs/usage.rst index 150d1212b..cdacfdcb9 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -47,7 +47,7 @@ SimpleProducer # Notes: # * If the producer dies before the messages are sent, there will be losses # * Call producer.stop() to send the messages and cleanup - producer = SimpleProducer(kafka, batch_send=True, + producer = SimpleProducer(kafka, async=True, batch_send_every_n=20, batch_send_every_t=60) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index cd14ab6eb..9f4942b79 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -166,32 +166,54 @@ class Producer(object): Base class to be used by producers Arguments: - client: The Kafka client instance to use - async: If set to true, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - WARNING!!! current implementation of async producer does not - guarantee message delivery. Use at your own risk! Or help us - improve with a PR! - req_acks: A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout: Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send: If True, messages are send in batches - batch_send_every_n: If set, messages are send in batches of this size - batch_send_every_t: If set, messages are send after this timeout + client (KafkaClient): instance to use for broker communications. + codec (kafka.protocol.ALL_CODECS): compression codec to use. + req_acks (int, optional): A value indicating the acknowledgements that + the server must receive before responding to the request, + defaults to 1 (local ack). + ack_timeout (int, optional): millisecond timeout to wait for the + configured req_acks, defaults to 1000. + async (bool, optional): send message using a background thread, + defaults to False. + batch_send_every_n (int, optional): If async is True, messages are + sent in batches of this size, defaults to 20. + batch_send_every_t (int or float, optional): If async is True, + messages are sent immediately after this timeout in seconds, even + if there are fewer than batch_send_every_n, defaults to 20. + async_retry_limit (int, optional): number of retries for failed messages + or None for unlimited, defaults to None / unlimited. + async_retry_backoff_ms (int, optional): milliseconds to backoff on + failed messages, defaults to 100. + async_retry_on_timeouts (bool, optional): whether to retry on + RequestTimeoutError, defaults to True. + async_queue_maxsize (int, optional): limit to the size of the + internal message queue in number of messages (not size), defaults + to 0 (no limit). + async_queue_put_timeout (int or float, optional): timeout seconds + for queue.put in send_messages for async producers -- will only + apply if async_queue_maxsize > 0 and the queue is Full, + defaults to 0 (fail immediately on full queue). + async_log_messages_on_error (bool, optional): set to False and the + async producer will only log hash() contents on failed produce + requests, defaults to True (log full messages). Hash logging + will not allow you to identify the specific message that failed, + but it will allow you to match failures with retries. + + Deprecated Arguments: + batch_send (bool, optional): If True, messages are sent by a background + thread in batches, defaults to False. Deprecated, use 'async' """ - ACK_NOT_REQUIRED = 0 # No ack is required ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed - DEFAULT_ACK_TIMEOUT = 1000 - def __init__(self, client, async=False, + def __init__(self, client, req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT, codec=None, - batch_send=False, + async=False, + batch_send=False, # deprecated, use async batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, async_retry_limit=ASYNC_RETRY_LIMIT, @@ -201,14 +223,10 @@ def __init__(self, client, async=False, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): - if batch_send: - async = True + if async: assert batch_send_every_n > 0 assert batch_send_every_t > 0 assert async_queue_maxsize >= 0 - else: - batch_send_every_n = 1 - batch_send_every_t = 3600 self.client = client self.async = async diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 6bb2285c4..2de4dccbd 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -49,8 +49,8 @@ def __init__(self, client, partitioner=None, async=False, self.partitioner_class = partitioner self.partitioners = {} - super(KeyedProducer, self).__init__(client, async, req_acks, - ack_timeout, codec, batch_send, + super(KeyedProducer, self).__init__(client, req_acks, ack_timeout, + codec, async, batch_send, batch_send_every_n, batch_send_every_t, async_retry_limit, diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 78cc21ceb..280a02edc 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -54,8 +54,8 @@ def __init__(self, client, async=False, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): self.partition_cycles = {} self.random_start = random_start - super(SimpleProducer, self).__init__(client, async, req_acks, - ack_timeout, codec, batch_send, + super(SimpleProducer, self).__init__(client, req_acks, ack_timeout, + codec, async, batch_send, batch_send_every_n, batch_send_every_t, async_retry_limit, diff --git a/test/test_producer.py b/test/test_producer.py index 85a5a2e1e..c12af02aa 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -56,23 +56,6 @@ def partitions(topic): producer.send_messages(topic, b'hi') assert client.send_produce_request.called - @patch('kafka.producer.base._send_upstream') - def test_producer_async_queue_overfilled_batch_send(self, mock): - queue_size = 2 - producer = Producer(MagicMock(), batch_send=True, - async_queue_maxsize=queue_size) - - topic = b'test-topic' - partition = 0 - message = b'test-message' - - with self.assertRaises(AsyncProducerQueueFull): - message_list = [message] * (queue_size + 1) - producer.send_messages(topic, partition, *message_list) - self.assertEqual(producer.queue.qsize(), queue_size) - for _ in xrange(producer.queue.qsize()): - producer.queue.get() - @patch('kafka.producer.base._send_upstream') def test_producer_async_queue_overfilled(self, mock): queue_size = 2 diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 099b97577..3c414e18f 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -221,7 +221,7 @@ def test_batched_simple_producer__triggers_by_message(self): batch_interval = 5 producer = SimpleProducer( self.client, - batch_send=True, + async=True, batch_send_every_n=batch_messages, batch_send_every_t=batch_interval, random_start=False) @@ -287,7 +287,7 @@ def test_batched_simple_producer__triggers_by_time(self): batch_interval = 5 producer = SimpleProducer( self.client, - batch_send=True, + async=True, batch_send_every_n=100, batch_send_every_t=batch_interval, random_start=False) From c76bc9dd179044811a5e0d3fde2e437f1ee6d46c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 5 Jun 2015 14:45:44 -0700 Subject: [PATCH 10/12] Update Producer class docstring --- kafka/producer/base.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 9f4942b79..a0bf47ca5 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -44,10 +44,31 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, retry_options, stop_event, log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): - """ - Listen on the queue for a specified number of messages or till - a specified timeout and send them upstream to the brokers in one - request + """Private method to manage producing messages asynchronously + + Listens on the queue for a specified number of messages or until + a specified timeout and then sends messages to the brokers in grouped + requests (one per broker). + + Messages placed on the queue should be tuples that conform to this format: + ((topic, partition), message, key) + + Currently does not mark messages with task_done. Do not attempt to join()! + + Arguments: + queue (threading.Queue): the queue from which to get messages + client (KafkaClient): instance to use for communicating with brokers + codec (kafka.protocol.ALL_CODECS): compression codec to use + batch_time (int): interval in seconds to send message batches + batch_size (int): count of messages that will trigger an immediate send + req_acks: required acks to use with ProduceRequests. see server protocol + ack_timeout: timeout to wait for required acks. see server protocol + retry_options (RetryOptions): settings for retry limits, backoff etc + stop_event (threading.Event): event to monitor for shutdown signal. + when this event is 'set', the producer will stop sending messages. + log_messages_on_error (bool, optional): log stringified message-contents + on any produce error, otherwise only log a hash() of the contents, + defaults to True. """ request_tries = {} client.reinit() From 2ba22bf4cebf5e25351816b38cd3cb70e2ea4cb8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 5 Jun 2015 23:09:50 -0700 Subject: [PATCH 11/12] Dont stop async producer until all pending messages have been processed --- kafka/producer/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index a0bf47ca5..0fd742da9 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -73,7 +73,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, request_tries = {} client.reinit() - while not stop_event.is_set(): + while not (stop_event.is_set() and queue.empty() and not request_tries): timeout = batch_time count = batch_size send_at = time.time() + timeout From 1d5f4b1f889737ef3ba04d8303a02a4957a2d183 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 5 Jun 2015 23:22:37 -0700 Subject: [PATCH 12/12] Add async_stop_timeout parameter to tune how long to let the producer keep trying to send messages before timing out. Log an error if async producer was stopped before all messages sent. --- kafka/producer/base.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0fd742da9..18af3428f 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -39,11 +39,13 @@ ASYNC_LOG_MESSAGES_ON_ERROR = True STOP_ASYNC_PRODUCER = -1 +ASYNC_STOP_TIMEOUT_SECS = 30 def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, retry_options, stop_event, - log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): + log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + stop_timeout=ASYNC_STOP_TIMEOUT_SECS): """Private method to manage producing messages asynchronously Listens on the queue for a specified number of messages or until @@ -69,11 +71,23 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, log_messages_on_error (bool, optional): log stringified message-contents on any produce error, otherwise only log a hash() of the contents, defaults to True. + stop_timeout (int or float, optional): number of seconds to continue + retrying messages after stop_event is set, defaults to 30. """ request_tries = {} client.reinit() + stop_at = None while not (stop_event.is_set() and queue.empty() and not request_tries): + + # Handle stop_timeout + if stop_event.is_set(): + if not stop_at: + stop_at = stop_timeout + time.time() + if time.time() > stop_at: + log.debug('Async producer stopping due to stop_timeout') + break + timeout = batch_time count = batch_size send_at = time.time() + timeout @@ -181,6 +195,10 @@ def _handle_error(error_cls, request): orig_req.messages if log_messages_on_error else hash(orig_req.messages)) + if request_tries or not queue.empty(): + log.error('Stopped producer with {0} unsent messages' + .format(len(request_tries) + queue.qsize())) + class Producer(object): """ @@ -219,6 +237,9 @@ class Producer(object): requests, defaults to True (log full messages). Hash logging will not allow you to identify the specific message that failed, but it will allow you to match failures with retries. + async_stop_timeout (int or float, optional): seconds to continue + attempting to send queued messages after producer.stop(), + defaults to 30. Deprecated Arguments: batch_send (bool, optional): If True, messages are sent by a background @@ -242,7 +263,8 @@ def __init__(self, client, async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, - async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): if async: assert batch_send_every_n > 0 @@ -277,7 +299,8 @@ def __init__(self, client, batch_send_every_t, batch_send_every_n, self.req_acks, self.ack_timeout, async_retry_options, self.thread_stop_event), - kwargs={'log_messages_on_error': async_log_messages_on_error} + kwargs={'log_messages_on_error': async_log_messages_on_error, + 'stop_timeout': async_stop_timeout} ) # Thread will die if main thread exits