-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add producer batch send queue size limit #304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -139,7 +143,8 @@ def __init__(self, client, async=False, | |||
log.warning("async producer does not guarantee message delivery!") | |||
log.warning("Current implementation does not retry Failed messages") | |||
log.warning("Use at your own risk! (or help improve with a PR!)") | |||
self.queue = Queue() # Messages are sent through this queue | |||
# Messages are sent through this queue | |||
self.queue = Queue(maxsize=batch_send_queue_maxsize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used both in batch mode as well as in simple async mode -- the variable name probably should not be specific to batch mode
@dpkp how producer should behave with async mode enabled? |
should be the same behavior, i just think that the max queue size variable should probably be labeled as an |
@@ -200,6 +200,10 @@ class KafkaConfigurationError(KafkaError): | |||
pass | |||
|
|||
|
|||
class BatchQueueOverfilledError(KafkaError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about AsyncProducerQueueFull
consider whether PR 283 is a more robust approach (includes backoff + retry) |
+1 |
What is the status of this PR? Is there anything I can help you guys with in order to get this merged? |
ping @dpkp |
What's the status of this PR? it's been 4+ months, same with #283 @dpkp - you mentioned something about Rdio having their own, perhaps you can get that committed to upstream? or get this patch in? |
I'd like to get #331 merged for this issue instead. If you have a second to review, would appreciate more eyeballs and/or testing. |
#331 is merged and supports this via |
As noted in #297 unlimited queue size can cause memory leak.
My patches add ability to specify queue size.
By default queue size is unlimited.
If queue become full next send message will raise an
BatchQueueOverfilledError
.