Skip to content

Various changes/fixes #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jan 13, 2014
Merged

Various changes/fixes #88

merged 31 commits into from
Jan 13, 2014

Conversation

rdiomar
Copy link
Collaborator

@rdiomar rdiomar commented Jan 7, 2014

This set of changes addresses several bugs and issues, and fixes most of the tests. It also changes the behavior of SimpleConsumer in a few ways. Here is a summary of all the changes, mostly copied from the major commits:

  • Allow customizing socket timeouts.
    Previously, if you try to consume a message with a timeout greater than 10 seconds, but you don't receive data in those 10 seconds, a socket.timeout exception is raised. This allows a higher socket timeout to be set, or even None for no timeout.
  • Read the correct number of bytes from kafka.
    According to the protocol documentation, the 4 byte integer at the beginning of a response represents the size of the payload only, not including those bytes. See the Kafka docs here.
  • Guarantee reading the expected number of bytes from the socket every time.
  • Remove bufsize from client and conn.
    Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented
  • SimpleConsumer flow changes:
    • Combine partition fetch requests into a single request
    • Put the messages received in a queue and update offsets
    • Grab as many messages from the queue as requested
    • When the queue is empty, request more
    • timeout param for get_messages() is the actual timeout for getting those messages
    • Based on Resize buffer (rather than minbytes) - Fix issue 73 #74 -
      don't increase min_bytes if the consumer fetch buffer size is too small.
    • Notes:
      • Previously, when querying each partition separately, it was possible to block waiting for messages on partition 0 even if there are new ones in partition 1. These changes allow us to block while aiting for messages on all partitions, and reduce total number of kafka requests.
      • Use Queue.Queue for single proc Queue instead of already imported multiprocessing.Queue because the latter doesn't seem to guarantee immediate availability of items after a put.
  • Remove SimpleConsumer queue size limit since it can cause the iterator to block forever if it's reached.
  • Fix error handling in client.
  • Add an optional limit to how much the consumer fetch buffer size can grow.
  • Add and fix integration tests.
  • Add and fix unit tests.
    This is pretty much a rewrite. The tests that involve offset requests/responses are not implemented since that API is not supported in kafka 0.8 yet. Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.
  • Style fixes and comments.

rdiomar added 24 commits January 6, 2014 15:14
Previously, if you try to consume a message with a timeout greater than 10 seconds,
but you don't receive data in those 10 seconds, a socket.timeout exception is raised.
This allows a higher socket timeout to be set, or even None for no timeout.
According to the protocol documentation, the 4 byte integer at the beginning
of a response represents the size of the payload only, not including those bytes.
See http://goo.gl/rg5uom
…y time

* Remove bufsize from client and conn, since they're not actually enforced

Notes:

This commit changes behavior a bit by raising a BufferUnderflowError when
no data is received for the message size rather than a ConnectionError.

Since bufsize in the socket is not actually enforced, but it is used by the consumer
when creating requests, moving it there until a better solution is implemented.
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on dpkp#74 -
  don't increase min_bytes if the consumer fetch buffer size is too small.

Notes:

Change MultiProcessConsumer and _mp_consume() accordingly.

Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.

Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:

>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
    return self.get(False)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
    raise Empty
Queue.Empty
… iterator to exit when reached.

Also put constant timeout values in pre-defined constants
Will remove once any error handling issues are resolved.
This is pretty much a rewrite. The tests that involve offset requests/responses
are not implemented since that API is not supported in kafka 0.8 yet.
Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.
We always store the offset of the next available message, so we shouldn't decrement the offset deltas
when seeking by an extra 1
…data

Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
This differentiates between errors that occur when sending the request
and receiving the response, and adds BufferUnderflowError handling.
…tch size is too small

Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
…ed in integration tests

If some of the tests stop brokers then error out, the teardown method will try to close the
same brokers and fail. This change allows it to continue.
This is better since the tests stop/start brokers, and if something goes wrong
they can affect eachother.
for partition in partitions:
requests.append(FetchRequest(self.topic, partition,
self.offsets[partition],
self.buffer_size))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't buffer_size being passed into as the max message size here? Shouldn't buffer_size be used for min_bytes below (line 374), and this should be a max message size parameter that doesn't grow on error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer_size is the value passed to kafka to tell it how much data we can handle. We don't actually have a real buffer, but we have to pass something to kafka. If that size is smaller than the next available message, kafka will not send any messages. The client can either increase the buffer size, skip an offset, or give up and go home. We increase the buffer size up to self.max_buffer_size if it's not None.

min_bytes is how much data kafka should wait for to be available before responding. It is set to 0 or 1 in the FetchContext based on weather or not we want to block (i.e. have at least 1 message before returning). If we pass buffer_size for min_bytes, and buffer_size is, say, 4k, then kafka will wait for 4k of data to be available before responding, which is not really what we want.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If buffer_size is what we're sending to Kafka as the max data we can handle, why isn't it always max_buffer_size?

I don't know if this expand-window-and-retry flow makes sense for either parameter we are sending to Kafka (see my comment in another thread about this)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah increasing the buffer_size as needed looked weird to me too, I just added max_buffer_size as a quick fix to avoid getting to numbers that are too large. Since this is not a bug, and this pull request already has a million commits, I'm more inclined to make this change separately.

@dpkp
Copy link
Owner

dpkp commented Jan 8, 2014

great cleanup. +1. and all tests pass for me!

bytes_left = num_bytes
resp = ''
log.debug("About to read %d bytes from Kafka", num_bytes)
if self._dirty:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistically, I think that this check should probably move into recv instead of _read_bytes (i.e. give this method less responsibility)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I put it here is because it's right before calling self._socket.recv(), just as it is is right before calling self._socket.sendall() in send().

@turtlesoupy
Copy link
Contributor

Also 👍 merge ASAP!

Both errors are handled the same way when raised and caught, so this makes sense.
rdiomar referenced this pull request in waliaashish85/kafka-python Jan 9, 2014
@rdiomar
Copy link
Collaborator Author

rdiomar commented Jan 13, 2014

Everyone cool with getting this merged?

@turtlesoupy
Copy link
Contributor

@rdiomar although I'm not looking forward to my rebase, go for it!

rdiomar added a commit that referenced this pull request Jan 13, 2014
Various changes/fixes, including:
* Allow customizing socket timeouts
* Read the correct number of bytes from kafka
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn
* SimpleConsumer flow changes
* Fix some error handling
* Add optional upper limit to consumer fetch buffer size
* Add and fix unit and integration tests
@rdiomar rdiomar merged commit 87c7f9d into dpkp:master Jan 13, 2014
@dpkp dpkp mentioned this pull request Apr 8, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants