Skip to content

Add producer batch send queue size limit #304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed

Add producer batch send queue size limit #304

wants to merge 3 commits into from

Conversation

ediskandarov
Copy link
Contributor

As noted in #297 unlimited queue size can cause memory leak.

My patches add ability to specify queue size.
By default queue size is unlimited.

If queue become full next send message will raise an BatchQueueOverfilledError.

@@ -139,7 +143,8 @@ def __init__(self, client, async=False,
log.warning("async producer does not guarantee message delivery!")
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
# Messages are sent through this queue
self.queue = Queue(maxsize=batch_send_queue_maxsize)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used both in batch mode as well as in simple async mode -- the variable name probably should not be specific to batch mode

@ediskandarov
Copy link
Contributor Author

@dpkp how producer should behave with async mode enabled?

@dpkp
Copy link
Owner

dpkp commented Jan 23, 2015

should be the same behavior, i just think that the max queue size variable should probably be labeled as an async_ kwarg and applied in the same way whether async mode is enabled directly via async=True or indirectly via batch_send=True

@@ -200,6 +200,10 @@ class KafkaConfigurationError(KafkaError):
pass


class BatchQueueOverfilledError(KafkaError):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about AsyncProducerQueueFull

@dpkp
Copy link
Owner

dpkp commented Feb 9, 2015

consider whether PR 283 is a more robust approach (includes backoff + retry)

@dpkp dpkp added the producer label Feb 9, 2015
@rogaha
Copy link

rogaha commented Apr 20, 2015

+1

@rogaha
Copy link

rogaha commented Apr 20, 2015

What is the status of this PR? Is there anything I can help you guys with in order to get this merged?

@rogaha
Copy link

rogaha commented May 7, 2015

ping @dpkp

@toli
Copy link

toli commented May 12, 2015

What's the status of this PR? it's been 4+ months, same with #283
We are facing a similar problem at at work, and i'd prefer not to have custom patches and just use vanilla upstream code...

@dpkp - you mentioned something about Rdio having their own, perhaps you can get that committed to upstream? or get this patch in?

@dpkp
Copy link
Owner

dpkp commented May 13, 2015

I'd like to get #331 merged for this issue instead. If you have a second to review, would appreciate more eyeballs and/or testing.

@dpkp
Copy link
Owner

dpkp commented Jun 8, 2015

#331 is merged and supports this via async_queue_maxsize and async_queue_put_timeout configuration parameters to the producer

@dpkp dpkp closed this Jun 8, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants