Skip to content

Resize buffer (rather than minbytes) - Fix issue 73 #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

Resize buffer (rather than minbytes) - Fix issue 73 #74

wants to merge 1 commit into from

Conversation

motherhubbard
Copy link

Fix for issue 73.

Also:
Change buffer growth factor to be 2x (stops decimals when growing the
buffer and gives a nicer multiple).
Stop buffer growth from starting at 1 byte (when we already know the
previous buffer (default of 4k) was too small

Fix for issue 73.

Also:
Change buffer growth factor to be 2x (stops decimals when growing the
buffer and gives a nicer multiple).
Stop buffer growth from starting at 1 byte (when we already know the
previous buffer (default of 4k) was too small
rdiomar added a commit to rdiomar/kafka-python that referenced this pull request Dec 19, 2013
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on dpkp#74 -
  don't increase min_bytes if the consumer fetch buffer size is too small.

Notes:

Change MultiProcessConsumer and _mp_consume() accordingly.

Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.

Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:

>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
    return self.get(False)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
    raise Empty
Queue.Empty
rdiomar added a commit to rdiomar/kafka-python that referenced this pull request Jan 6, 2014
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on dpkp#74 -
  don't increase min_bytes if the consumer fetch buffer size is too small.

Notes:

Change MultiProcessConsumer and _mp_consume() accordingly.

Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.

Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:

>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
    return self.get(False)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
    raise Empty
Queue.Empty
@rdiomar rdiomar mentioned this pull request Jan 7, 2014
@rdiomar
Copy link
Collaborator

rdiomar commented Jan 16, 2014

This was addressed as part of #88. The fix there was based on this request, so thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants