-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Various changes/fixes #88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Previously, if you try to consume a message with a timeout greater than 10 seconds, but you don't receive data in those 10 seconds, a socket.timeout exception is raised. This allows a higher socket timeout to be set, or even None for no timeout.
According to the protocol documentation, the 4 byte integer at the beginning of a response represents the size of the payload only, not including those bytes. See http://goo.gl/rg5uom
…y time * Remove bufsize from client and conn, since they're not actually enforced Notes: This commit changes behavior a bit by raising a BufferUnderflowError when no data is received for the message size rather than a ConnectionError. Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented.
* Combine partition fetch requests into a single request * Put the messages received in a queue and update offsets * Grab as many messages from the queue as requested * When the queue is empty, request more * timeout param for get_messages() is the actual timeout for getting those messages * Based on dpkp#74 - don't increase min_bytes if the consumer fetch buffer size is too small. Notes: Change MultiProcessConsumer and _mp_consume() accordingly. Previously, when querying each partition separately, it was possible to block waiting for messages on partition 0 even if there are new ones in partition 1. These changes allow us to block while waiting for messages on all partitions, and reduce total number of kafka requests. Use Queue.Queue for single proc Queue instead of already imported multiprocessing.Queue because the latter doesn't seem to guarantee immediate availability of items after a put: >>> from multiprocessing import Queue >>> q = Queue() >>> q.put(1); q.get_nowait() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait return self.get(False) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get raise Empty Queue.Empty
to block forever if it's reached.
… iterator to exit when reached. Also put constant timeout values in pre-defined constants
Will remove once any error handling issues are resolved.
This is pretty much a rewrite. The tests that involve offset requests/responses are not implemented since that API is not supported in kafka 0.8 yet. Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.
We always store the offset of the next available message, so we shouldn't decrement the offset deltas when seeking by an extra 1
…data Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
This differentiates between errors that occur when sending the request and receiving the response, and adds BufferUnderflowError handling.
…tch size is too small Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
…ed in integration tests If some of the tests stop brokers then error out, the teardown method will try to close the same brokers and fail. This change allows it to continue.
This is better since the tests stop/start brokers, and if something goes wrong they can affect eachother.
for partition in partitions: | ||
requests.append(FetchRequest(self.topic, partition, | ||
self.offsets[partition], | ||
self.buffer_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't buffer_size being passed into as the max message size here? Shouldn't buffer_size be used for min_bytes below (line 374), and this should be a max message size parameter that doesn't grow on error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffer_size is the value passed to kafka to tell it how much data we can handle. We don't actually have a real buffer, but we have to pass something to kafka. If that size is smaller than the next available message, kafka will not send any messages. The client can either increase the buffer size, skip an offset, or give up and go home. We increase the buffer size up to self.max_buffer_size if it's not None.
min_bytes is how much data kafka should wait for to be available before responding. It is set to 0 or 1 in the FetchContext based on weather or not we want to block (i.e. have at least 1 message before returning). If we pass buffer_size for min_bytes, and buffer_size is, say, 4k, then kafka will wait for 4k of data to be available before responding, which is not really what we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If buffer_size is what we're sending to Kafka as the max data we can handle, why isn't it always max_buffer_size?
I don't know if this expand-window-and-retry flow makes sense for either parameter we are sending to Kafka (see my comment in another thread about this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah increasing the buffer_size as needed looked weird to me too, I just added max_buffer_size as a quick fix to avoid getting to numbers that are too large. Since this is not a bug, and this pull request already has a million commits, I'm more inclined to make this change separately.
* If the connection is dirty, reinit * If we get a BufferUnderflowError, the server could have gone away, so mark it dirty
great cleanup. +1. and all tests pass for me! |
bytes_left = num_bytes | ||
resp = '' | ||
log.debug("About to read %d bytes from Kafka", num_bytes) | ||
if self._dirty: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stylistically, I think that this check should probably move into recv instead of _read_bytes
(i.e. give this method less responsibility)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I put it here is because it's right before calling self._socket.recv()
, just as it is is right before calling self._socket.sendall()
in send()
.
Also 👍 merge ASAP! |
Both errors are handled the same way when raised and caught, so this makes sense.
Everyone cool with getting this merged? |
@rdiomar although I'm not looking forward to my rebase, go for it! |
Various changes/fixes, including: * Allow customizing socket timeouts * Read the correct number of bytes from kafka * Guarantee reading the expected number of bytes from the socket every time * Remove bufsize from client and conn * SimpleConsumer flow changes * Fix some error handling * Add optional upper limit to consumer fetch buffer size * Add and fix unit and integration tests
This set of changes addresses several bugs and issues, and fixes most of the tests. It also changes the behavior of SimpleConsumer in a few ways. Here is a summary of all the changes, mostly copied from the major commits:
Previously, if you try to consume a message with a timeout greater than 10 seconds, but you don't receive data in those 10 seconds, a socket.timeout exception is raised. This allows a higher socket timeout to be set, or even None for no timeout.
According to the protocol documentation, the 4 byte integer at the beginning of a response represents the size of the payload only, not including those bytes. See the Kafka docs here.
Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented
don't increase min_bytes if the consumer fetch buffer size is too small.
This is pretty much a rewrite. The tests that involve offset requests/responses are not implemented since that API is not supported in kafka 0.8 yet. Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.