-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Support for async producer #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The Java/Scala Kafka client supports a mechanism for sending messages asynchronously by using a queue and a thread. Messages are put on the queue and the worker thread keeps sending it to the broker. This ticket implements this feature in python We use multiprocessing instead of threads to send the messages
Conflicts: kafka/producer.py
@mahendra what do you think about a combo consumer/producer Queue? This is how I did it in the 0.7x version of the library. This is what I had in mind as a solution to the whole async/multiprocess question
|
Oops - did not mean to close |
@mumrah I thought you had merged this in. Yeah, I had a look at the existing queues.py. It's good. My thoughts are:
The above is what I tried to do in this ticket. The whole thing can be made blocking/non-blocking by passing a async parameter. Or maybe, we can make it block. For the producer, I would prefer not to have a separate implementation.
The reason for my comments is that for the blocking producer and non-blocking consumer, the current code works great. We don't need to add extra overheads of Event or Queue for these things. Adding these makes the code go through all sorts of locking and synchronization semantics which maybe a bit of an overkill. The complex cases are - non-blocking producer and blocking consumer. This we can try to solve using Queues. For blocking consumer, we can just use a Queue and a new API def get_messages(self, blocking=False, timeout=None):
if blocking:
# Use some Queue logic with timeout
else:
# Iterate over (self) Or we can add a parameter to SimpleConsumer to indicate blocking Let me know what you think. PS: My idea is to keep the most common cases as simple and sync/lock free as possible. For the uncommon cases, we can use them. |
Add support for two options in the producer - req_acks and ack_timeout The acks, if any, are passed to the caller directly
Also, ensure that the case of 'no-acks' works fine In conn.send(), do not wait for the response. Wait for it only on conn.recv(). This behaviour is fine now since the connection is not shared among consumer threads etc.
@mumrah - Let me know if you were able to go through this pull request. This implements the following features.
I think, this patch will make the kafka python client as close to the Java/Scala clients bundled with Kafka. Let me know your thoughts. |
@mahendra few comments:
Eventually, it would be nice to allow the async producer to batch messages automatically. This would be similar logic to the offset committer (send messages every N seconds or every M messages). These are important values to tune for performance (though I'm not sure how high performance this library could really be). Looking good! |
@mumrah Thanks for the review.
Batching would be a very easy thing to do in our case. Will implement in later this week. PS: I am sure, we can make the library high performing :-) - Needs a bit more tuning and maybe gevent to boot |
Yea, you're right according to That just seems crazy not to even get back a simple response (even On 6/25/13 9:20 AM, Mahendra M wrote:
|
Also improve on the logic for stopping the async Processor instance. Ensure that unsend messages are sent before it is stopped.
@mumrah with some re-org, I have added batch mode support also. Working on the test cases now. Note: I have a common function _send_upstream() to handle both the batched mode and the normal async mode. It is a bit complicated for the normal async mode. If you want, I can implement a separate function for normal async operation as follows def _send_upstream(self, queue):
while True:
partition, msg = queue.get()
if partition == STOP_ASYNC_PRODUCER:
break
req = ProduceRequest(....)
self.client.send_produce_request(....) |
@mumrah any docs of how to get a test setup going? I ran python setup.py test after setting environment variables (ZOOKEEPER_URI and KAFKA_URI) - quite a few errors. Also, if I set KAFKA_ROOT to my kafka code base, it throws a lot of them (both on master branch) |
@mumrah facepalm!! :-) yep, got it working. have implemented test-cases and updated the pull request. Do have a look. |
@mumrah - let me know if you want a simpler _send_upstream for non-batch async messages. |
also, maybe we should update the README to mention all the new features of the library. |
Support for async producer Merged locally, tests pass, +1
The Java/Scala Kafka client supports a mechanism for sending messages asynchronously by using a queue and a thread. Messages are put on the queue and the worker thread keeps sending it to the broker.
This ticket implements this feature in python
We use multiprocessing instead of threads to send the messages