Skip to content

Fixups for PR 331 (async producer) #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ SimpleProducer
# Notes:
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
producer = SimpleProducer(kafka, batch_send=True,
producer = SimpleProducer(kafka, async=True,
batch_send_every_n=20,
batch_send_every_t=60)

Expand Down
259 changes: 173 additions & 86 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
from kafka.common import (
ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions,
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
)
from kafka.common import (
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)

from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
Expand All @@ -33,33 +32,75 @@
# unlimited
ASYNC_QUEUE_MAXSIZE = 0
ASYNC_QUEUE_PUT_TIMEOUT = 0
# no retries by default
ASYNC_RETRY_LIMIT = 0
ASYNC_RETRY_BACKOFF_MS = 0
ASYNC_RETRY_ON_TIMEOUTS = False
# unlimited retries by default
ASYNC_RETRY_LIMIT = None
ASYNC_RETRY_BACKOFF_MS = 100
ASYNC_RETRY_ON_TIMEOUTS = True
ASYNC_LOG_MESSAGES_ON_ERROR = True

STOP_ASYNC_PRODUCER = -1
ASYNC_STOP_TIMEOUT_SECS = 30


def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
req_acks, ack_timeout, retry_options, stop_event,
log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
"""Private method to manage producing messages asynchronously

Listens on the queue for a specified number of messages or until
a specified timeout and then sends messages to the brokers in grouped
requests (one per broker).

Messages placed on the queue should be tuples that conform to this format:
((topic, partition), message, key)

Currently does not mark messages with task_done. Do not attempt to join()!

Arguments:
queue (threading.Queue): the queue from which to get messages
client (KafkaClient): instance to use for communicating with brokers
codec (kafka.protocol.ALL_CODECS): compression codec to use
batch_time (int): interval in seconds to send message batches
batch_size (int): count of messages that will trigger an immediate send
req_acks: required acks to use with ProduceRequests. see server protocol
ack_timeout: timeout to wait for required acks. see server protocol
retry_options (RetryOptions): settings for retry limits, backoff etc
stop_event (threading.Event): event to monitor for shutdown signal.
when this event is 'set', the producer will stop sending messages.
log_messages_on_error (bool, optional): log stringified message-contents
on any produce error, otherwise only log a hash() of the contents,
defaults to True.
stop_timeout (int or float, optional): number of seconds to continue
retrying messages after stop_event is set, defaults to 30.
"""
reqs = {}
request_tries = {}
client.reinit()
stop_at = None

while not stop_event.is_set():
timeout = batch_time
while not (stop_event.is_set() and queue.empty() and not request_tries):

# Handle stop_timeout
if stop_event.is_set():
if not stop_at:
stop_at = stop_timeout + time.time()
if time.time() > stop_at:
log.debug('Async producer stopping due to stop_timeout')
break

# it's a simplification: we're comparing message sets and
# messages: each set can contain [1..batch_size] messages
count = batch_size - len(reqs)
timeout = batch_time
count = batch_size
send_at = time.time() + timeout
msgset = defaultdict(list)

# Merging messages will require a bit more work to manage correctly
# for now, dont look for new batches if we have old ones to retry
if request_tries:
count = 0
log.debug('Skipping new batch collection to handle retries')
else:
log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout))

# Keep fetching till we gather enough messages or a
# timeout is reached
while count > 0 and timeout >= 0:
Expand All @@ -84,104 +125,151 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
tuple(messages))
reqs[req] = 0
request_tries[req] = 0

if not reqs:
if not request_tries:
continue

reqs_to_retry, error_cls = [], None
do_backoff, do_refresh = False, False

def _handle_error(error_cls, reqs, all_retries):
if ((error_cls == RequestTimedOutError and
retry_options.retry_on_timeouts) or
error_cls in RETRY_ERROR_TYPES):
all_retries += reqs
if error_cls in RETRY_BACKOFF_ERROR_TYPES:
do_backoff = True
if error_cls in RETRY_REFRESH_ERROR_TYPES:
do_refresh = True

try:
reply = client.send_produce_request(reqs.keys(),
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
for i, response in enumerate(reply):
if isinstance(response, FailedPayloadsError):
_handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
_handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)

except Exception as ex:
error_cls = kafka_errors.get(type(ex), UnknownError)
_handle_error(error_cls, reqs.keys(), reqs_to_retry)
retry_state = {
'do_backoff': False,
'do_refresh': False
}

def _handle_error(error_cls, request):
if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)):
reqs_to_retry.append(request)
if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES):
retry_state['do_backoff'] |= True
if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
retry_state['do_refresh'] |= True

reply = client.send_produce_request(request_tries.keys(),
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
for i, response in enumerate(reply):
error_cls = None
if isinstance(response, FailedPayloadsError):
error_cls = response.__class__
orig_req = response.payload

elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
orig_req = request_tries.keys()[i]

if error_cls:
_handle_error(error_cls, orig_req)
log.error('Error sending ProduceRequest to %s:%d with msgs %s',
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))

if not reqs_to_retry:
reqs = {}
request_tries = {}
continue

# doing backoff before next retry
if do_backoff and retry_options.backoff_ms:
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
if retry_state['do_backoff'] and retry_options.backoff_ms:
log.warn('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)

# refresh topic metadata before next retry
if do_refresh:
if retry_state['do_refresh']:
log.warn('Async producer forcing metadata refresh metadata before retrying')
client.load_metadata_for_topics()

reqs = dict((key, count + 1) for (key, count) in reqs.items()
if key in reqs_to_retry and count < retry_options.limit)
# Apply retry limit, dropping messages that are over
request_tries = dict(
(key, count + 1)
for (key, count) in request_tries.items()
if key in reqs_to_retry
and (retry_options.limit is None
or (count < retry_options.limit))
)

# Log messages we are going to retry
for orig_req in request_tries.keys():
log.info('Retrying ProduceRequest to %s:%d with msgs %s',
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))

if request_tries or not queue.empty():
log.error('Stopped producer with {0} unsent messages'
.format(len(request_tries) + queue.qsize()))


class Producer(object):
"""
Base class to be used by producers

Arguments:
client: The Kafka client instance to use
async: If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
WARNING!!! current implementation of async producer does not
guarantee message delivery. Use at your own risk! Or help us
improve with a PR!
req_acks: A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
batch_send: If True, messages are send in batches
batch_send_every_n: If set, messages are send in batches of this size
batch_send_every_t: If set, messages are send after this timeout
client (KafkaClient): instance to use for broker communications.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
req_acks (int, optional): A value indicating the acknowledgements that
the server must receive before responding to the request,
defaults to 1 (local ack).
ack_timeout (int, optional): millisecond timeout to wait for the
configured req_acks, defaults to 1000.
async (bool, optional): send message using a background thread,
defaults to False.
batch_send_every_n (int, optional): If async is True, messages are
sent in batches of this size, defaults to 20.
batch_send_every_t (int or float, optional): If async is True,
messages are sent immediately after this timeout in seconds, even
if there are fewer than batch_send_every_n, defaults to 20.
async_retry_limit (int, optional): number of retries for failed messages
or None for unlimited, defaults to None / unlimited.
async_retry_backoff_ms (int, optional): milliseconds to backoff on
failed messages, defaults to 100.
async_retry_on_timeouts (bool, optional): whether to retry on
RequestTimeoutError, defaults to True.
async_queue_maxsize (int, optional): limit to the size of the
internal message queue in number of messages (not size), defaults
to 0 (no limit).
async_queue_put_timeout (int or float, optional): timeout seconds
for queue.put in send_messages for async producers -- will only
apply if async_queue_maxsize > 0 and the queue is Full,
defaults to 0 (fail immediately on full queue).
async_log_messages_on_error (bool, optional): set to False and the
async producer will only log hash() contents on failed produce
requests, defaults to True (log full messages). Hash logging
will not allow you to identify the specific message that failed,
but it will allow you to match failures with retries.
async_stop_timeout (int or float, optional): seconds to continue
attempting to send queued messages after producer.stop(),
defaults to 30.

Deprecated Arguments:
batch_send (bool, optional): If True, messages are sent by a background
thread in batches, defaults to False. Deprecated, use 'async'
"""

ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed

DEFAULT_ACK_TIMEOUT = 1000

def __init__(self, client, async=False,
def __init__(self, client,
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
async=False,
batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):

if batch_send:
async = True
if async:
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert async_queue_maxsize >= 0
else:
batch_send_every_n = 1
batch_send_every_t = 3600

self.client = client
self.async = async
Expand All @@ -205,16 +293,15 @@ def __init__(self, client, async=False,
backoff_ms=async_retry_backoff_ms,
retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
self.client.copy(),
self.codec,
batch_send_every_t,
batch_send_every_n,
self.req_acks,
self.ack_timeout,
async_retry_options,
self.thread_stop_event))
self.thread = Thread(
target=_send_upstream,
args=(self.queue, self.client.copy(), self.codec,
batch_send_every_t, batch_send_every_n,
self.req_acks, self.ack_timeout,
async_retry_options, self.thread_stop_event),
kwargs={'log_messages_on_error': async_log_messages_on_error,
'stop_timeout': async_stop_timeout}
)

# Thread will die if main thread exits
self.thread.daemon = True
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/keyed.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(self, client, partitioner=None, async=False,
self.partitioner_class = partitioner
self.partitioners = {}

super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
codec, async, batch_send,
batch_send_every_n,
batch_send_every_t,
async_retry_limit,
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def __init__(self, client, async=False,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
codec, async, batch_send,
batch_send_every_n,
batch_send_every_t,
async_retry_limit,
Expand Down
17 changes: 0 additions & 17 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,6 @@ def partitions(topic):
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called

@patch('kafka.producer.base._send_upstream')
def test_producer_async_queue_overfilled_batch_send(self, mock):
queue_size = 2
producer = Producer(MagicMock(), batch_send=True,
async_queue_maxsize=queue_size)

topic = b'test-topic'
partition = 0
message = b'test-message'

with self.assertRaises(AsyncProducerQueueFull):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)
self.assertEqual(producer.queue.qsize(), queue_size)
for _ in xrange(producer.queue.qsize()):
producer.queue.get()

@patch('kafka.producer.base._send_upstream')
def test_producer_async_queue_overfilled(self, mock):
queue_size = 2
Expand Down
Loading